-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhandler.go
More file actions
163 lines (148 loc) · 4.11 KB
/
Copy pathhandler.go
File metadata and controls
163 lines (148 loc) · 4.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package rhttp
import (
"github.com/oktalz/reverse-http/internal/h2"
"golang.org/x/net/http2"
)
// serve reads frames from the worker side of the reverse connection and
// dispatches them. Shared frame logic (SETTINGS / PING / WINDOW_UPDATE /
// RST_STREAM / GOAWAY / DATA / CONTINUATION) goes through h2.Conn methods;
// HEADERS is worker-specific (each completed header block spawns a goroutine
// running the configured http.Handler).
func serve(c *connection, readyCh chan struct{}) error {
defer func() {
// Wake any handler goroutines blocked on the send window before the
// connection goes away — they should see a "conn dead" error rather
// than spinning forever on the cond.
c.Send.MarkDead()
c.FailAllStreams(h2.ErrConnClosed)
// Best-effort GOAWAY so the peer learns the last-stream-id it can
// expect to be processed.
_ = c.SendGoAway(c.LastStreamID, http2.ErrCodeNo)
c.ShutdownWriter()
_ = c.Close()
}()
// Wait for the peer's initial SETTINGS before declaring the conn ready;
// otherwise our SETTINGS_INITIAL_WINDOW_SIZE / MAX_FRAME_SIZE could be
// stale when the first stream is accepted.
for {
f, err := c.Framer.ReadFrame()
if err != nil {
return err
}
sf, ok := f.(*http2.SettingsFrame)
if !ok || sf.IsAck() {
continue
}
if err := c.ApplyPeerSettings(sf); err != nil {
return err
}
c.SendSettingsAck()
break
}
close(readyCh)
for {
// Pool.Shutdown signals teardown by closing shutdownCh AND calling
// SetReadDeadline(now). Check it on every iteration so a quiet conn
// still exits promptly.
select {
case <-c.shutdownCh:
return nil
default:
}
c.SetIdleDeadline()
f, err := c.Framer.ReadFrame()
if err != nil {
if h2.IsNetTimeout(err) {
select {
case <-c.shutdownCh:
return nil
default:
}
if err := c.SendOrCheckPing(); err != nil {
return err
}
continue
}
return err
}
// Per RFC 7540 §6.10, once a HEADERS without END_HEADERS has been
// received, only CONTINUATION frames for the same stream are
// allowed until END_HEADERS arrives. Anything else is a
// connection-level PROTOCOL_ERROR.
if c.InContinuation() {
cf, ok := f.(*http2.ContinuationFrame)
if !ok || cf.Header().StreamID != c.ContStreamID() {
return h2.ErrContInterleaved
}
decoded, err := c.ConsumeContinuation(cf)
if err != nil {
return err
}
if decoded != nil {
c.dispatchInboundHeaders(decoded)
}
continue
}
if err := c.handleFrame(f); err != nil {
return err
}
}
}
// handleFrame dispatches a single framer frame outside CONTINUATION state.
func (c *connection) handleFrame(f http2.Frame) error {
switch f := f.(type) {
case *http2.SettingsFrame:
if f.IsAck() {
return nil
}
if err := c.ApplyPeerSettings(f); err != nil {
return err
}
c.SendSettingsAck()
return nil
case *http2.WindowUpdateFrame:
c.HandleWindowUpdate(f.Header().StreamID, int32(f.Increment))
return nil
case *http2.HeadersFrame:
// Existing streams shouldn't see a second HEADERS frame in this
// library (no trailers support); ignore to stay forgiving.
if c.GetStream(f.StreamID) != nil {
return nil
}
decoded, err := c.StartHeaders(f)
if err != nil {
return err
}
if decoded != nil {
c.dispatchInboundHeaders(decoded)
}
return nil
case *http2.DataFrame:
c.HandleInboundData(f.Header().StreamID, f.Data(), f.StreamEnded())
return nil
case *http2.PingFrame:
if f.IsAck() {
c.HandlePingAck(f.Data)
return nil
}
c.SendPing(true, f.Data)
return nil
case *http2.GoAwayFrame:
return h2.ErrGoaway
case *http2.RSTStreamFrame:
c.HandleRSTStream(f.Header().StreamID)
return nil
}
return nil
}
// dispatchInboundHeaders takes a complete decoded HEADERS block and starts a
// worker-side stream goroutine to run the http.Handler. The decoded.Header
// map returns to the pool after the request is served (see serveStream).
func (c *connection) dispatchInboundHeaders(decoded *h2.DecodedHeaders) {
s := h2.Acquire(decoded.StreamID, c.Conn, c.Send.InitialStreamWindow())
c.AddStream(s)
if decoded.EndStream {
s.CloseData()
}
go c.serveStream(s, decoded)
}