blob: 9d1f2fadd3dfff8c3946a668648b3122a83c61af [file] [log] [blame]
Serge Bazanskicc25bdf2018-10-25 14:02:58 +02001// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Transport code.
6
7package http2
8
9import (
10 "bufio"
11 "bytes"
12 "compress/gzip"
13 "crypto/rand"
14 "crypto/tls"
15 "errors"
16 "fmt"
17 "io"
18 "io/ioutil"
19 "log"
20 "math"
21 mathrand "math/rand"
22 "net"
23 "net/http"
24 "net/textproto"
25 "sort"
26 "strconv"
27 "strings"
28 "sync"
29 "time"
30
31 "golang.org/x/net/http/httpguts"
32 "golang.org/x/net/http2/hpack"
33 "golang.org/x/net/idna"
34)
35
36const (
37 // transportDefaultConnFlow is how many connection-level flow control
38 // tokens we give the server at start-up, past the default 64k.
39 transportDefaultConnFlow = 1 << 30
40
41 // transportDefaultStreamFlow is how many stream-level flow
42 // control tokens we announce to the peer, and how many bytes
43 // we buffer per stream.
44 transportDefaultStreamFlow = 4 << 20
45
46 // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send
47 // a stream-level WINDOW_UPDATE for at a time.
48 transportDefaultStreamMinRefresh = 4 << 10
49
50 defaultUserAgent = "Go-http-client/2.0"
51)
52
53// Transport is an HTTP/2 Transport.
54//
55// A Transport internally caches connections to servers. It is safe
56// for concurrent use by multiple goroutines.
57type Transport struct {
58 // DialTLS specifies an optional dial function for creating
59 // TLS connections for requests.
60 //
61 // If DialTLS is nil, tls.Dial is used.
62 //
63 // If the returned net.Conn has a ConnectionState method like tls.Conn,
64 // it will be used to set http.Response.TLS.
65 DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
66
67 // TLSClientConfig specifies the TLS configuration to use with
68 // tls.Client. If nil, the default configuration is used.
69 TLSClientConfig *tls.Config
70
71 // ConnPool optionally specifies an alternate connection pool to use.
72 // If nil, the default is used.
73 ConnPool ClientConnPool
74
75 // DisableCompression, if true, prevents the Transport from
76 // requesting compression with an "Accept-Encoding: gzip"
77 // request header when the Request contains no existing
78 // Accept-Encoding value. If the Transport requests gzip on
79 // its own and gets a gzipped response, it's transparently
80 // decoded in the Response.Body. However, if the user
81 // explicitly requested gzip it is not automatically
82 // uncompressed.
83 DisableCompression bool
84
85 // AllowHTTP, if true, permits HTTP/2 requests using the insecure,
86 // plain-text "http" scheme. Note that this does not enable h2c support.
87 AllowHTTP bool
88
89 // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
90 // send in the initial settings frame. It is how many bytes
91 // of response headers are allowed. Unlike the http2 spec, zero here
92 // means to use a default limit (currently 10MB). If you actually
93 // want to advertise an ulimited value to the peer, Transport
94 // interprets the highest possible value here (0xffffffff or 1<<32-1)
95 // to mean no limit.
96 MaxHeaderListSize uint32
97
98 // t1, if non-nil, is the standard library Transport using
99 // this transport. Its settings are used (but not its
100 // RoundTrip method, etc).
101 t1 *http.Transport
102
103 connPoolOnce sync.Once
104 connPoolOrDef ClientConnPool // non-nil version of ConnPool
105}
106
107func (t *Transport) maxHeaderListSize() uint32 {
108 if t.MaxHeaderListSize == 0 {
109 return 10 << 20
110 }
111 if t.MaxHeaderListSize == 0xffffffff {
112 return 0
113 }
114 return t.MaxHeaderListSize
115}
116
117func (t *Transport) disableCompression() bool {
118 return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
119}
120
121var errTransportVersion = errors.New("http2: ConfigureTransport is only supported starting at Go 1.6")
122
123// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
124// It requires Go 1.6 or later and returns an error if the net/http package is too old
125// or if t1 has already been HTTP/2-enabled.
126func ConfigureTransport(t1 *http.Transport) error {
127 _, err := configureTransport(t1) // in configure_transport.go (go1.6) or not_go16.go
128 return err
129}
130
131func (t *Transport) connPool() ClientConnPool {
132 t.connPoolOnce.Do(t.initConnPool)
133 return t.connPoolOrDef
134}
135
136func (t *Transport) initConnPool() {
137 if t.ConnPool != nil {
138 t.connPoolOrDef = t.ConnPool
139 } else {
140 t.connPoolOrDef = &clientConnPool{t: t}
141 }
142}
143
144// ClientConn is the state of a single HTTP/2 client connection to an
145// HTTP/2 server.
146type ClientConn struct {
147 t *Transport
148 tconn net.Conn // usually *tls.Conn, except specialized impls
149 tlsState *tls.ConnectionState // nil only for specialized impls
150 singleUse bool // whether being used for a single http.Request
151
152 // readLoop goroutine fields:
153 readerDone chan struct{} // closed on error
154 readerErr error // set before readerDone is closed
155
156 idleTimeout time.Duration // or 0 for never
157 idleTimer *time.Timer
158
159 mu sync.Mutex // guards following
160 cond *sync.Cond // hold mu; broadcast on flow/closed changes
161 flow flow // our conn-level flow control quota (cs.flow is per stream)
162 inflow flow // peer's conn-level flow control
163 closing bool
164 closed bool
165 wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
166 goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
167 goAwayDebug string // goAway frame's debug data, retained as a string
168 streams map[uint32]*clientStream // client-initiated
169 nextStreamID uint32
170 pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
171 pings map[[8]byte]chan struct{} // in flight ping data to notification channel
172 bw *bufio.Writer
173 br *bufio.Reader
174 fr *Framer
175 lastActive time.Time
176 // Settings from peer: (also guarded by mu)
177 maxFrameSize uint32
178 maxConcurrentStreams uint32
179 peerMaxHeaderListSize uint64
180 initialWindowSize uint32
181
182 hbuf bytes.Buffer // HPACK encoder writes into this
183 henc *hpack.Encoder
184 freeBuf [][]byte
185
186 wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
187 werr error // first write error that has occurred
188}
189
190// clientStream is the state for a single HTTP/2 stream. One of these
191// is created for each Transport.RoundTrip call.
192type clientStream struct {
193 cc *ClientConn
194 req *http.Request
195 trace *clientTrace // or nil
196 ID uint32
197 resc chan resAndError
198 bufPipe pipe // buffered pipe with the flow-controlled response payload
199 startedWrite bool // started request body write; guarded by cc.mu
200 requestedGzip bool
201 on100 func() // optional code to run if get a 100 continue response
202
203 flow flow // guarded by cc.mu
204 inflow flow // guarded by cc.mu
205 bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
206 readErr error // sticky read error; owned by transportResponseBody.Read
207 stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
208 didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu
209
210 peerReset chan struct{} // closed on peer reset
211 resetErr error // populated before peerReset is closed
212
213 done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu
214
215 // owned by clientConnReadLoop:
216 firstByte bool // got the first response byte
217 pastHeaders bool // got first MetaHeadersFrame (actual headers)
218 pastTrailers bool // got optional second MetaHeadersFrame (trailers)
219 num1xx uint8 // number of 1xx responses seen
220
221 trailer http.Header // accumulated trailers
222 resTrailer *http.Header // client's Response.Trailer
223}
224
225// awaitRequestCancel waits for the user to cancel a request or for the done
226// channel to be signaled. A non-nil error is returned only if the request was
227// canceled.
228func awaitRequestCancel(req *http.Request, done <-chan struct{}) error {
229 ctx := reqContext(req)
230 if req.Cancel == nil && ctx.Done() == nil {
231 return nil
232 }
233 select {
234 case <-req.Cancel:
235 return errRequestCanceled
236 case <-ctx.Done():
237 return ctx.Err()
238 case <-done:
239 return nil
240 }
241}
242
243var got1xxFuncForTests func(int, textproto.MIMEHeader) error
244
245// get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
246// if any. It returns nil if not set or if the Go version is too old.
247func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
248 if fn := got1xxFuncForTests; fn != nil {
249 return fn
250 }
251 return traceGot1xxResponseFunc(cs.trace)
252}
253
254// awaitRequestCancel waits for the user to cancel a request, its context to
255// expire, or for the request to be done (any way it might be removed from the
256// cc.streams map: peer reset, successful completion, TCP connection breakage,
257// etc). If the request is canceled, then cs will be canceled and closed.
258func (cs *clientStream) awaitRequestCancel(req *http.Request) {
259 if err := awaitRequestCancel(req, cs.done); err != nil {
260 cs.cancelStream()
261 cs.bufPipe.CloseWithError(err)
262 }
263}
264
265func (cs *clientStream) cancelStream() {
266 cc := cs.cc
267 cc.mu.Lock()
268 didReset := cs.didReset
269 cs.didReset = true
270 cc.mu.Unlock()
271
272 if !didReset {
273 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
274 cc.forgetStreamID(cs.ID)
275 }
276}
277
278// checkResetOrDone reports any error sent in a RST_STREAM frame by the
279// server, or errStreamClosed if the stream is complete.
280func (cs *clientStream) checkResetOrDone() error {
281 select {
282 case <-cs.peerReset:
283 return cs.resetErr
284 case <-cs.done:
285 return errStreamClosed
286 default:
287 return nil
288 }
289}
290
291func (cs *clientStream) getStartedWrite() bool {
292 cc := cs.cc
293 cc.mu.Lock()
294 defer cc.mu.Unlock()
295 return cs.startedWrite
296}
297
298func (cs *clientStream) abortRequestBodyWrite(err error) {
299 if err == nil {
300 panic("nil error")
301 }
302 cc := cs.cc
303 cc.mu.Lock()
304 cs.stopReqBody = err
305 cc.cond.Broadcast()
306 cc.mu.Unlock()
307}
308
309type stickyErrWriter struct {
310 w io.Writer
311 err *error
312}
313
314func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
315 if *sew.err != nil {
316 return 0, *sew.err
317 }
318 n, err = sew.w.Write(p)
319 *sew.err = err
320 return
321}
322
323// noCachedConnError is the concrete type of ErrNoCachedConn, which
324// needs to be detected by net/http regardless of whether it's its
325// bundled version (in h2_bundle.go with a rewritten type name) or
326// from a user's x/net/http2. As such, as it has a unique method name
327// (IsHTTP2NoCachedConnError) that net/http sniffs for via func
328// isNoCachedConnError.
329type noCachedConnError struct{}
330
331func (noCachedConnError) IsHTTP2NoCachedConnError() {}
332func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
333
334// isNoCachedConnError reports whether err is of type noCachedConnError
335// or its equivalent renamed type in net/http2's h2_bundle.go. Both types
336// may coexist in the same running program.
337func isNoCachedConnError(err error) bool {
338 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
339 return ok
340}
341
342var ErrNoCachedConn error = noCachedConnError{}
343
344// RoundTripOpt are options for the Transport.RoundTripOpt method.
345type RoundTripOpt struct {
346 // OnlyCachedConn controls whether RoundTripOpt may
347 // create a new TCP connection. If set true and
348 // no cached connection is available, RoundTripOpt
349 // will return ErrNoCachedConn.
350 OnlyCachedConn bool
351}
352
353func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
354 return t.RoundTripOpt(req, RoundTripOpt{})
355}
356
357// authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
358// and returns a host:port. The port 443 is added if needed.
359func authorityAddr(scheme string, authority string) (addr string) {
360 host, port, err := net.SplitHostPort(authority)
361 if err != nil { // authority didn't have a port
362 port = "443"
363 if scheme == "http" {
364 port = "80"
365 }
366 host = authority
367 }
368 if a, err := idna.ToASCII(host); err == nil {
369 host = a
370 }
371 // IPv6 address literal, without a port:
372 if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
373 return host + ":" + port
374 }
375 return net.JoinHostPort(host, port)
376}
377
378// RoundTripOpt is like RoundTrip, but takes options.
379func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
380 if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) {
381 return nil, errors.New("http2: unsupported scheme")
382 }
383
384 addr := authorityAddr(req.URL.Scheme, req.URL.Host)
385 for retry := 0; ; retry++ {
386 cc, err := t.connPool().GetClientConn(req, addr)
387 if err != nil {
388 t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
389 return nil, err
390 }
391 traceGotConn(req, cc)
392 res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req)
393 if err != nil && retry <= 6 {
394 if req, err = shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil {
395 // After the first retry, do exponential backoff with 10% jitter.
396 if retry == 0 {
397 continue
398 }
399 backoff := float64(uint(1) << (uint(retry) - 1))
400 backoff += backoff * (0.1 * mathrand.Float64())
401 select {
402 case <-time.After(time.Second * time.Duration(backoff)):
403 continue
404 case <-reqContext(req).Done():
405 return nil, reqContext(req).Err()
406 }
407 }
408 }
409 if err != nil {
410 t.vlogf("RoundTrip failure: %v", err)
411 return nil, err
412 }
413 return res, nil
414 }
415}
416
417// CloseIdleConnections closes any connections which were previously
418// connected from previous requests but are now sitting idle.
419// It does not interrupt any connections currently in use.
420func (t *Transport) CloseIdleConnections() {
421 if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
422 cp.closeIdleConnections()
423 }
424}
425
426var (
427 errClientConnClosed = errors.New("http2: client conn is closed")
428 errClientConnUnusable = errors.New("http2: client conn not usable")
429 errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
430)
431
432// shouldRetryRequest is called by RoundTrip when a request fails to get
433// response headers. It is always called with a non-nil error.
434// It returns either a request to retry (either the same request, or a
435// modified clone), or an error if the request can't be replayed.
436func shouldRetryRequest(req *http.Request, err error, afterBodyWrite bool) (*http.Request, error) {
437 if !canRetryError(err) {
438 return nil, err
439 }
440 // If the Body is nil (or http.NoBody), it's safe to reuse
441 // this request and its Body.
442 if req.Body == nil || reqBodyIsNoBody(req.Body) {
443 return req, nil
444 }
445
446 // If the request body can be reset back to its original
447 // state via the optional req.GetBody, do that.
448 getBody := reqGetBody(req) // Go 1.8: getBody = req.GetBody
449 if getBody != nil {
450 // TODO: consider a req.Body.Close here? or audit that all caller paths do?
451 body, err := getBody()
452 if err != nil {
453 return nil, err
454 }
455 newReq := *req
456 newReq.Body = body
457 return &newReq, nil
458 }
459
460 // The Request.Body can't reset back to the beginning, but we
461 // don't seem to have started to read from it yet, so reuse
462 // the request directly. The "afterBodyWrite" means the
463 // bodyWrite process has started, which becomes true before
464 // the first Read.
465 if !afterBodyWrite {
466 return req, nil
467 }
468
469 return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
470}
471
472func canRetryError(err error) bool {
473 if err == errClientConnUnusable || err == errClientConnGotGoAway {
474 return true
475 }
476 if se, ok := err.(StreamError); ok {
477 return se.Code == ErrCodeRefusedStream
478 }
479 return false
480}
481
482func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
483 host, _, err := net.SplitHostPort(addr)
484 if err != nil {
485 return nil, err
486 }
487 tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host))
488 if err != nil {
489 return nil, err
490 }
491 return t.newClientConn(tconn, singleUse)
492}
493
494func (t *Transport) newTLSConfig(host string) *tls.Config {
495 cfg := new(tls.Config)
496 if t.TLSClientConfig != nil {
497 *cfg = *cloneTLSConfig(t.TLSClientConfig)
498 }
499 if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
500 cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
501 }
502 if cfg.ServerName == "" {
503 cfg.ServerName = host
504 }
505 return cfg
506}
507
508func (t *Transport) dialTLS() func(string, string, *tls.Config) (net.Conn, error) {
509 if t.DialTLS != nil {
510 return t.DialTLS
511 }
512 return t.dialTLSDefault
513}
514
515func (t *Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.Conn, error) {
516 cn, err := tls.Dial(network, addr, cfg)
517 if err != nil {
518 return nil, err
519 }
520 if err := cn.Handshake(); err != nil {
521 return nil, err
522 }
523 if !cfg.InsecureSkipVerify {
524 if err := cn.VerifyHostname(cfg.ServerName); err != nil {
525 return nil, err
526 }
527 }
528 state := cn.ConnectionState()
529 if p := state.NegotiatedProtocol; p != NextProtoTLS {
530 return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
531 }
532 if !state.NegotiatedProtocolIsMutual {
533 return nil, errors.New("http2: could not negotiate protocol mutually")
534 }
535 return cn, nil
536}
537
538// disableKeepAlives reports whether connections should be closed as
539// soon as possible after handling the first request.
540func (t *Transport) disableKeepAlives() bool {
541 return t.t1 != nil && t.t1.DisableKeepAlives
542}
543
544func (t *Transport) expectContinueTimeout() time.Duration {
545 if t.t1 == nil {
546 return 0
547 }
548 return transportExpectContinueTimeout(t.t1)
549}
550
551func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
552 return t.newClientConn(c, false)
553}
554
555func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
556 cc := &ClientConn{
557 t: t,
558 tconn: c,
559 readerDone: make(chan struct{}),
560 nextStreamID: 1,
561 maxFrameSize: 16 << 10, // spec default
562 initialWindowSize: 65535, // spec default
563 maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
564 peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
565 streams: make(map[uint32]*clientStream),
566 singleUse: singleUse,
567 wantSettingsAck: true,
568 pings: make(map[[8]byte]chan struct{}),
569 }
570 if d := t.idleConnTimeout(); d != 0 {
571 cc.idleTimeout = d
572 cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
573 }
574 if VerboseLogs {
575 t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
576 }
577
578 cc.cond = sync.NewCond(&cc.mu)
579 cc.flow.add(int32(initialWindowSize))
580
581 // TODO: adjust this writer size to account for frame size +
582 // MTU + crypto/tls record padding.
583 cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr})
584 cc.br = bufio.NewReader(c)
585 cc.fr = NewFramer(cc.bw, cc.br)
586 cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
587 cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
588
589 // TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on
590 // henc in response to SETTINGS frames?
591 cc.henc = hpack.NewEncoder(&cc.hbuf)
592
593 if t.AllowHTTP {
594 cc.nextStreamID = 3
595 }
596
597 if cs, ok := c.(connectionStater); ok {
598 state := cs.ConnectionState()
599 cc.tlsState = &state
600 }
601
602 initialSettings := []Setting{
603 {ID: SettingEnablePush, Val: 0},
604 {ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
605 }
606 if max := t.maxHeaderListSize(); max != 0 {
607 initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
608 }
609
610 cc.bw.Write(clientPreface)
611 cc.fr.WriteSettings(initialSettings...)
612 cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
613 cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
614 cc.bw.Flush()
615 if cc.werr != nil {
616 return nil, cc.werr
617 }
618
619 go cc.readLoop()
620 return cc, nil
621}
622
623func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
624 cc.mu.Lock()
625 defer cc.mu.Unlock()
626
627 old := cc.goAway
628 cc.goAway = f
629
630 // Merge the previous and current GoAway error frames.
631 if cc.goAwayDebug == "" {
632 cc.goAwayDebug = string(f.DebugData())
633 }
634 if old != nil && old.ErrCode != ErrCodeNo {
635 cc.goAway.ErrCode = old.ErrCode
636 }
637 last := f.LastStreamID
638 for streamID, cs := range cc.streams {
639 if streamID > last {
640 select {
641 case cs.resc <- resAndError{err: errClientConnGotGoAway}:
642 default:
643 }
644 }
645 }
646}
647
648// CanTakeNewRequest reports whether the connection can take a new request,
649// meaning it has not been closed or received or sent a GOAWAY.
650func (cc *ClientConn) CanTakeNewRequest() bool {
651 cc.mu.Lock()
652 defer cc.mu.Unlock()
653 return cc.canTakeNewRequestLocked()
654}
655
656// clientConnIdleState describes the suitability of a client
657// connection to initiate a new RoundTrip request.
658type clientConnIdleState struct {
659 canTakeNewRequest bool
660 freshConn bool // whether it's unused by any previous request
661}
662
663func (cc *ClientConn) idleState() clientConnIdleState {
664 cc.mu.Lock()
665 defer cc.mu.Unlock()
666 return cc.idleStateLocked()
667}
668
669func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
670 if cc.singleUse && cc.nextStreamID > 1 {
671 return
672 }
673 st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing &&
674 int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32
675 st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest
676 return
677}
678
679func (cc *ClientConn) canTakeNewRequestLocked() bool {
680 st := cc.idleStateLocked()
681 return st.canTakeNewRequest
682}
683
684// onIdleTimeout is called from a time.AfterFunc goroutine. It will
685// only be called when we're idle, but because we're coming from a new
686// goroutine, there could be a new request coming in at the same time,
687// so this simply calls the synchronized closeIfIdle to shut down this
688// connection. The timer could just call closeIfIdle, but this is more
689// clear.
690func (cc *ClientConn) onIdleTimeout() {
691 cc.closeIfIdle()
692}
693
694func (cc *ClientConn) closeIfIdle() {
695 cc.mu.Lock()
696 if len(cc.streams) > 0 {
697 cc.mu.Unlock()
698 return
699 }
700 cc.closed = true
701 nextID := cc.nextStreamID
702 // TODO: do clients send GOAWAY too? maybe? Just Close:
703 cc.mu.Unlock()
704
705 if VerboseLogs {
706 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
707 }
708 cc.tconn.Close()
709}
710
711var shutdownEnterWaitStateHook = func() {}
712
713// Shutdown gracefully close the client connection, waiting for running streams to complete.
714// Public implementation is in go17.go and not_go17.go
715func (cc *ClientConn) shutdown(ctx contextContext) error {
716 if err := cc.sendGoAway(); err != nil {
717 return err
718 }
719 // Wait for all in-flight streams to complete or connection to close
720 done := make(chan error, 1)
721 cancelled := false // guarded by cc.mu
722 go func() {
723 cc.mu.Lock()
724 defer cc.mu.Unlock()
725 for {
726 if len(cc.streams) == 0 || cc.closed {
727 cc.closed = true
728 done <- cc.tconn.Close()
729 break
730 }
731 if cancelled {
732 break
733 }
734 cc.cond.Wait()
735 }
736 }()
737 shutdownEnterWaitStateHook()
738 select {
739 case err := <-done:
740 return err
741 case <-ctx.Done():
742 cc.mu.Lock()
743 // Free the goroutine above
744 cancelled = true
745 cc.cond.Broadcast()
746 cc.mu.Unlock()
747 return ctx.Err()
748 }
749}
750
751func (cc *ClientConn) sendGoAway() error {
752 cc.mu.Lock()
753 defer cc.mu.Unlock()
754 cc.wmu.Lock()
755 defer cc.wmu.Unlock()
756 if cc.closing {
757 // GOAWAY sent already
758 return nil
759 }
760 // Send a graceful shutdown frame to server
761 maxStreamID := cc.nextStreamID
762 if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
763 return err
764 }
765 if err := cc.bw.Flush(); err != nil {
766 return err
767 }
768 // Prevent new requests
769 cc.closing = true
770 return nil
771}
772
773// Close closes the client connection immediately.
774//
775// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
776func (cc *ClientConn) Close() error {
777 cc.mu.Lock()
778 defer cc.cond.Broadcast()
779 defer cc.mu.Unlock()
780 err := errors.New("http2: client connection force closed via ClientConn.Close")
781 for id, cs := range cc.streams {
782 select {
783 case cs.resc <- resAndError{err: err}:
784 default:
785 }
786 cs.bufPipe.CloseWithError(err)
787 delete(cc.streams, id)
788 }
789 cc.closed = true
790 return cc.tconn.Close()
791}
792
793const maxAllocFrameSize = 512 << 10
794
795// frameBuffer returns a scratch buffer suitable for writing DATA frames.
796// They're capped at the min of the peer's max frame size or 512KB
797// (kinda arbitrarily), but definitely capped so we don't allocate 4GB
798// bufers.
799func (cc *ClientConn) frameScratchBuffer() []byte {
800 cc.mu.Lock()
801 size := cc.maxFrameSize
802 if size > maxAllocFrameSize {
803 size = maxAllocFrameSize
804 }
805 for i, buf := range cc.freeBuf {
806 if len(buf) >= int(size) {
807 cc.freeBuf[i] = nil
808 cc.mu.Unlock()
809 return buf[:size]
810 }
811 }
812 cc.mu.Unlock()
813 return make([]byte, size)
814}
815
816func (cc *ClientConn) putFrameScratchBuffer(buf []byte) {
817 cc.mu.Lock()
818 defer cc.mu.Unlock()
819 const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate.
820 if len(cc.freeBuf) < maxBufs {
821 cc.freeBuf = append(cc.freeBuf, buf)
822 return
823 }
824 for i, old := range cc.freeBuf {
825 if old == nil {
826 cc.freeBuf[i] = buf
827 return
828 }
829 }
830 // forget about it.
831}
832
833// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
834// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
835var errRequestCanceled = errors.New("net/http: request canceled")
836
837func commaSeparatedTrailers(req *http.Request) (string, error) {
838 keys := make([]string, 0, len(req.Trailer))
839 for k := range req.Trailer {
840 k = http.CanonicalHeaderKey(k)
841 switch k {
842 case "Transfer-Encoding", "Trailer", "Content-Length":
843 return "", &badStringError{"invalid Trailer key", k}
844 }
845 keys = append(keys, k)
846 }
847 if len(keys) > 0 {
848 sort.Strings(keys)
849 return strings.Join(keys, ","), nil
850 }
851 return "", nil
852}
853
854func (cc *ClientConn) responseHeaderTimeout() time.Duration {
855 if cc.t.t1 != nil {
856 return cc.t.t1.ResponseHeaderTimeout
857 }
858 // No way to do this (yet?) with just an http2.Transport. Probably
859 // no need. Request.Cancel this is the new way. We only need to support
860 // this for compatibility with the old http.Transport fields when
861 // we're doing transparent http2.
862 return 0
863}
864
865// checkConnHeaders checks whether req has any invalid connection-level headers.
866// per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields.
867// Certain headers are special-cased as okay but not transmitted later.
868func checkConnHeaders(req *http.Request) error {
869 if v := req.Header.Get("Upgrade"); v != "" {
870 return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
871 }
872 if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
873 return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
874 }
875 if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) {
876 return fmt.Errorf("http2: invalid Connection request header: %q", vv)
877 }
878 return nil
879}
880
881// actualContentLength returns a sanitized version of
882// req.ContentLength, where 0 actually means zero (not unknown) and -1
883// means unknown.
884func actualContentLength(req *http.Request) int64 {
885 if req.Body == nil || reqBodyIsNoBody(req.Body) {
886 return 0
887 }
888 if req.ContentLength != 0 {
889 return req.ContentLength
890 }
891 return -1
892}
893
894func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
895 resp, _, err := cc.roundTrip(req)
896 return resp, err
897}
898
899func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAfterReqBodyWrite bool, err error) {
900 if err := checkConnHeaders(req); err != nil {
901 return nil, false, err
902 }
903 if cc.idleTimer != nil {
904 cc.idleTimer.Stop()
905 }
906
907 trailers, err := commaSeparatedTrailers(req)
908 if err != nil {
909 return nil, false, err
910 }
911 hasTrailers := trailers != ""
912
913 cc.mu.Lock()
914 if err := cc.awaitOpenSlotForRequest(req); err != nil {
915 cc.mu.Unlock()
916 return nil, false, err
917 }
918
919 body := req.Body
920 contentLen := actualContentLength(req)
921 hasBody := contentLen != 0
922
923 // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
924 var requestedGzip bool
925 if !cc.t.disableCompression() &&
926 req.Header.Get("Accept-Encoding") == "" &&
927 req.Header.Get("Range") == "" &&
928 req.Method != "HEAD" {
929 // Request gzip only, not deflate. Deflate is ambiguous and
930 // not as universally supported anyway.
931 // See: http://www.gzip.org/zlib/zlib_faq.html#faq38
932 //
933 // Note that we don't request this for HEAD requests,
934 // due to a bug in nginx:
935 // http://trac.nginx.org/nginx/ticket/358
936 // https://golang.org/issue/5522
937 //
938 // We don't request gzip if the request is for a range, since
939 // auto-decoding a portion of a gzipped document will just fail
940 // anyway. See https://golang.org/issue/8923
941 requestedGzip = true
942 }
943
944 // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
945 // sent by writeRequestBody below, along with any Trailers,
946 // again in form HEADERS{1}, CONTINUATION{0,})
947 hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen)
948 if err != nil {
949 cc.mu.Unlock()
950 return nil, false, err
951 }
952
953 cs := cc.newStream()
954 cs.req = req
955 cs.trace = requestTrace(req)
956 cs.requestedGzip = requestedGzip
957 bodyWriter := cc.t.getBodyWriterState(cs, body)
958 cs.on100 = bodyWriter.on100
959
960 cc.wmu.Lock()
961 endStream := !hasBody && !hasTrailers
962 werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
963 cc.wmu.Unlock()
964 traceWroteHeaders(cs.trace)
965 cc.mu.Unlock()
966
967 if werr != nil {
968 if hasBody {
969 req.Body.Close() // per RoundTripper contract
970 bodyWriter.cancel()
971 }
972 cc.forgetStreamID(cs.ID)
973 // Don't bother sending a RST_STREAM (our write already failed;
974 // no need to keep writing)
975 traceWroteRequest(cs.trace, werr)
976 return nil, false, werr
977 }
978
979 var respHeaderTimer <-chan time.Time
980 if hasBody {
981 bodyWriter.scheduleBodyWrite()
982 } else {
983 traceWroteRequest(cs.trace, nil)
984 if d := cc.responseHeaderTimeout(); d != 0 {
985 timer := time.NewTimer(d)
986 defer timer.Stop()
987 respHeaderTimer = timer.C
988 }
989 }
990
991 readLoopResCh := cs.resc
992 bodyWritten := false
993 ctx := reqContext(req)
994
995 handleReadLoopResponse := func(re resAndError) (*http.Response, bool, error) {
996 res := re.res
997 if re.err != nil || res.StatusCode > 299 {
998 // On error or status code 3xx, 4xx, 5xx, etc abort any
999 // ongoing write, assuming that the server doesn't care
1000 // about our request body. If the server replied with 1xx or
1001 // 2xx, however, then assume the server DOES potentially
1002 // want our body (e.g. full-duplex streaming:
1003 // golang.org/issue/13444). If it turns out the server
1004 // doesn't, they'll RST_STREAM us soon enough. This is a
1005 // heuristic to avoid adding knobs to Transport. Hopefully
1006 // we can keep it.
1007 bodyWriter.cancel()
1008 cs.abortRequestBodyWrite(errStopReqBodyWrite)
1009 }
1010 if re.err != nil {
1011 cc.forgetStreamID(cs.ID)
1012 return nil, cs.getStartedWrite(), re.err
1013 }
1014 res.Request = req
1015 res.TLS = cc.tlsState
1016 return res, false, nil
1017 }
1018
1019 for {
1020 select {
1021 case re := <-readLoopResCh:
1022 return handleReadLoopResponse(re)
1023 case <-respHeaderTimer:
1024 if !hasBody || bodyWritten {
1025 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
1026 } else {
1027 bodyWriter.cancel()
1028 cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
1029 }
1030 cc.forgetStreamID(cs.ID)
1031 return nil, cs.getStartedWrite(), errTimeout
1032 case <-ctx.Done():
1033 if !hasBody || bodyWritten {
1034 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
1035 } else {
1036 bodyWriter.cancel()
1037 cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
1038 }
1039 cc.forgetStreamID(cs.ID)
1040 return nil, cs.getStartedWrite(), ctx.Err()
1041 case <-req.Cancel:
1042 if !hasBody || bodyWritten {
1043 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
1044 } else {
1045 bodyWriter.cancel()
1046 cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
1047 }
1048 cc.forgetStreamID(cs.ID)
1049 return nil, cs.getStartedWrite(), errRequestCanceled
1050 case <-cs.peerReset:
1051 // processResetStream already removed the
1052 // stream from the streams map; no need for
1053 // forgetStreamID.
1054 return nil, cs.getStartedWrite(), cs.resetErr
1055 case err := <-bodyWriter.resc:
1056 // Prefer the read loop's response, if available. Issue 16102.
1057 select {
1058 case re := <-readLoopResCh:
1059 return handleReadLoopResponse(re)
1060 default:
1061 }
1062 if err != nil {
1063 return nil, cs.getStartedWrite(), err
1064 }
1065 bodyWritten = true
1066 if d := cc.responseHeaderTimeout(); d != 0 {
1067 timer := time.NewTimer(d)
1068 defer timer.Stop()
1069 respHeaderTimer = timer.C
1070 }
1071 }
1072 }
1073}
1074
1075// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
1076// Must hold cc.mu.
1077func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error {
1078 var waitingForConn chan struct{}
1079 var waitingForConnErr error // guarded by cc.mu
1080 for {
1081 cc.lastActive = time.Now()
1082 if cc.closed || !cc.canTakeNewRequestLocked() {
1083 if waitingForConn != nil {
1084 close(waitingForConn)
1085 }
1086 return errClientConnUnusable
1087 }
1088 if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
1089 if waitingForConn != nil {
1090 close(waitingForConn)
1091 }
1092 return nil
1093 }
1094 // Unfortunately, we cannot wait on a condition variable and channel at
1095 // the same time, so instead, we spin up a goroutine to check if the
1096 // request is canceled while we wait for a slot to open in the connection.
1097 if waitingForConn == nil {
1098 waitingForConn = make(chan struct{})
1099 go func() {
1100 if err := awaitRequestCancel(req, waitingForConn); err != nil {
1101 cc.mu.Lock()
1102 waitingForConnErr = err
1103 cc.cond.Broadcast()
1104 cc.mu.Unlock()
1105 }
1106 }()
1107 }
1108 cc.pendingRequests++
1109 cc.cond.Wait()
1110 cc.pendingRequests--
1111 if waitingForConnErr != nil {
1112 return waitingForConnErr
1113 }
1114 }
1115}
1116
1117// requires cc.wmu be held
1118func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
1119 first := true // first frame written (HEADERS is first, then CONTINUATION)
1120 for len(hdrs) > 0 && cc.werr == nil {
1121 chunk := hdrs
1122 if len(chunk) > maxFrameSize {
1123 chunk = chunk[:maxFrameSize]
1124 }
1125 hdrs = hdrs[len(chunk):]
1126 endHeaders := len(hdrs) == 0
1127 if first {
1128 cc.fr.WriteHeaders(HeadersFrameParam{
1129 StreamID: streamID,
1130 BlockFragment: chunk,
1131 EndStream: endStream,
1132 EndHeaders: endHeaders,
1133 })
1134 first = false
1135 } else {
1136 cc.fr.WriteContinuation(streamID, endHeaders, chunk)
1137 }
1138 }
1139 // TODO(bradfitz): this Flush could potentially block (as
1140 // could the WriteHeaders call(s) above), which means they
1141 // wouldn't respond to Request.Cancel being readable. That's
1142 // rare, but this should probably be in a goroutine.
1143 cc.bw.Flush()
1144 return cc.werr
1145}
1146
1147// internal error values; they don't escape to callers
1148var (
1149 // abort request body write; don't send cancel
1150 errStopReqBodyWrite = errors.New("http2: aborting request body write")
1151
1152 // abort request body write, but send stream reset of cancel.
1153 errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
1154)
1155
1156func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
1157 cc := cs.cc
1158 sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
1159 buf := cc.frameScratchBuffer()
1160 defer cc.putFrameScratchBuffer(buf)
1161
1162 defer func() {
1163 traceWroteRequest(cs.trace, err)
1164 // TODO: write h12Compare test showing whether
1165 // Request.Body is closed by the Transport,
1166 // and in multiple cases: server replies <=299 and >299
1167 // while still writing request body
1168 cerr := bodyCloser.Close()
1169 if err == nil {
1170 err = cerr
1171 }
1172 }()
1173
1174 req := cs.req
1175 hasTrailers := req.Trailer != nil
1176
1177 var sawEOF bool
1178 for !sawEOF {
1179 n, err := body.Read(buf)
1180 if err == io.EOF {
1181 sawEOF = true
1182 err = nil
1183 } else if err != nil {
1184 return err
1185 }
1186
1187 remain := buf[:n]
1188 for len(remain) > 0 && err == nil {
1189 var allowed int32
1190 allowed, err = cs.awaitFlowControl(len(remain))
1191 switch {
1192 case err == errStopReqBodyWrite:
1193 return err
1194 case err == errStopReqBodyWriteAndCancel:
1195 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
1196 return err
1197 case err != nil:
1198 return err
1199 }
1200 cc.wmu.Lock()
1201 data := remain[:allowed]
1202 remain = remain[allowed:]
1203 sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
1204 err = cc.fr.WriteData(cs.ID, sentEnd, data)
1205 if err == nil {
1206 // TODO(bradfitz): this flush is for latency, not bandwidth.
1207 // Most requests won't need this. Make this opt-in or
1208 // opt-out? Use some heuristic on the body type? Nagel-like
1209 // timers? Based on 'n'? Only last chunk of this for loop,
1210 // unless flow control tokens are low? For now, always.
1211 // If we change this, see comment below.
1212 err = cc.bw.Flush()
1213 }
1214 cc.wmu.Unlock()
1215 }
1216 if err != nil {
1217 return err
1218 }
1219 }
1220
1221 if sentEnd {
1222 // Already sent END_STREAM (which implies we have no
1223 // trailers) and flushed, because currently all
1224 // WriteData frames above get a flush. So we're done.
1225 return nil
1226 }
1227
1228 var trls []byte
1229 if hasTrailers {
1230 cc.mu.Lock()
1231 trls, err = cc.encodeTrailers(req)
1232 cc.mu.Unlock()
1233 if err != nil {
1234 cc.writeStreamReset(cs.ID, ErrCodeInternal, err)
1235 cc.forgetStreamID(cs.ID)
1236 return err
1237 }
1238 }
1239
1240 cc.mu.Lock()
1241 maxFrameSize := int(cc.maxFrameSize)
1242 cc.mu.Unlock()
1243
1244 cc.wmu.Lock()
1245 defer cc.wmu.Unlock()
1246
1247 // Two ways to send END_STREAM: either with trailers, or
1248 // with an empty DATA frame.
1249 if len(trls) > 0 {
1250 err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
1251 } else {
1252 err = cc.fr.WriteData(cs.ID, true, nil)
1253 }
1254 if ferr := cc.bw.Flush(); ferr != nil && err == nil {
1255 err = ferr
1256 }
1257 return err
1258}
1259
1260// awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
1261// control tokens from the server.
1262// It returns either the non-zero number of tokens taken or an error
1263// if the stream is dead.
1264func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
1265 cc := cs.cc
1266 cc.mu.Lock()
1267 defer cc.mu.Unlock()
1268 for {
1269 if cc.closed {
1270 return 0, errClientConnClosed
1271 }
1272 if cs.stopReqBody != nil {
1273 return 0, cs.stopReqBody
1274 }
1275 if err := cs.checkResetOrDone(); err != nil {
1276 return 0, err
1277 }
1278 if a := cs.flow.available(); a > 0 {
1279 take := a
1280 if int(take) > maxBytes {
1281
1282 take = int32(maxBytes) // can't truncate int; take is int32
1283 }
1284 if take > int32(cc.maxFrameSize) {
1285 take = int32(cc.maxFrameSize)
1286 }
1287 cs.flow.take(take)
1288 return take, nil
1289 }
1290 cc.cond.Wait()
1291 }
1292}
1293
1294type badStringError struct {
1295 what string
1296 str string
1297}
1298
1299func (e *badStringError) Error() string { return fmt.Sprintf("%s %q", e.what, e.str) }
1300
1301// requires cc.mu be held.
1302func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) {
1303 cc.hbuf.Reset()
1304
1305 host := req.Host
1306 if host == "" {
1307 host = req.URL.Host
1308 }
1309 host, err := httpguts.PunycodeHostPort(host)
1310 if err != nil {
1311 return nil, err
1312 }
1313
1314 var path string
1315 if req.Method != "CONNECT" {
1316 path = req.URL.RequestURI()
1317 if !validPseudoPath(path) {
1318 orig := path
1319 path = strings.TrimPrefix(path, req.URL.Scheme+"://"+host)
1320 if !validPseudoPath(path) {
1321 if req.URL.Opaque != "" {
1322 return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", orig, req.URL.Opaque)
1323 } else {
1324 return nil, fmt.Errorf("invalid request :path %q", orig)
1325 }
1326 }
1327 }
1328 }
1329
1330 // Check for any invalid headers and return an error before we
1331 // potentially pollute our hpack state. (We want to be able to
1332 // continue to reuse the hpack encoder for future requests)
1333 for k, vv := range req.Header {
1334 if !httpguts.ValidHeaderFieldName(k) {
1335 return nil, fmt.Errorf("invalid HTTP header name %q", k)
1336 }
1337 for _, v := range vv {
1338 if !httpguts.ValidHeaderFieldValue(v) {
1339 return nil, fmt.Errorf("invalid HTTP header value %q for header %q", v, k)
1340 }
1341 }
1342 }
1343
1344 enumerateHeaders := func(f func(name, value string)) {
1345 // 8.1.2.3 Request Pseudo-Header Fields
1346 // The :path pseudo-header field includes the path and query parts of the
1347 // target URI (the path-absolute production and optionally a '?' character
1348 // followed by the query production (see Sections 3.3 and 3.4 of
1349 // [RFC3986]).
1350 f(":authority", host)
1351 f(":method", req.Method)
1352 if req.Method != "CONNECT" {
1353 f(":path", path)
1354 f(":scheme", req.URL.Scheme)
1355 }
1356 if trailers != "" {
1357 f("trailer", trailers)
1358 }
1359
1360 var didUA bool
1361 for k, vv := range req.Header {
1362 if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") {
1363 // Host is :authority, already sent.
1364 // Content-Length is automatic, set below.
1365 continue
1366 } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") ||
1367 strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") ||
1368 strings.EqualFold(k, "keep-alive") {
1369 // Per 8.1.2.2 Connection-Specific Header
1370 // Fields, don't send connection-specific
1371 // fields. We have already checked if any
1372 // are error-worthy so just ignore the rest.
1373 continue
1374 } else if strings.EqualFold(k, "user-agent") {
1375 // Match Go's http1 behavior: at most one
1376 // User-Agent. If set to nil or empty string,
1377 // then omit it. Otherwise if not mentioned,
1378 // include the default (below).
1379 didUA = true
1380 if len(vv) < 1 {
1381 continue
1382 }
1383 vv = vv[:1]
1384 if vv[0] == "" {
1385 continue
1386 }
1387
1388 }
1389
1390 for _, v := range vv {
1391 f(k, v)
1392 }
1393 }
1394 if shouldSendReqContentLength(req.Method, contentLength) {
1395 f("content-length", strconv.FormatInt(contentLength, 10))
1396 }
1397 if addGzipHeader {
1398 f("accept-encoding", "gzip")
1399 }
1400 if !didUA {
1401 f("user-agent", defaultUserAgent)
1402 }
1403 }
1404
1405 // Do a first pass over the headers counting bytes to ensure
1406 // we don't exceed cc.peerMaxHeaderListSize. This is done as a
1407 // separate pass before encoding the headers to prevent
1408 // modifying the hpack state.
1409 hlSize := uint64(0)
1410 enumerateHeaders(func(name, value string) {
1411 hf := hpack.HeaderField{Name: name, Value: value}
1412 hlSize += uint64(hf.Size())
1413 })
1414
1415 if hlSize > cc.peerMaxHeaderListSize {
1416 return nil, errRequestHeaderListSize
1417 }
1418
1419 trace := requestTrace(req)
1420 traceHeaders := traceHasWroteHeaderField(trace)
1421
1422 // Header list size is ok. Write the headers.
1423 enumerateHeaders(func(name, value string) {
1424 name = strings.ToLower(name)
1425 cc.writeHeader(name, value)
1426 if traceHeaders {
1427 traceWroteHeaderField(trace, name, value)
1428 }
1429 })
1430
1431 return cc.hbuf.Bytes(), nil
1432}
1433
1434// shouldSendReqContentLength reports whether the http2.Transport should send
1435// a "content-length" request header. This logic is basically a copy of the net/http
1436// transferWriter.shouldSendContentLength.
1437// The contentLength is the corrected contentLength (so 0 means actually 0, not unknown).
1438// -1 means unknown.
1439func shouldSendReqContentLength(method string, contentLength int64) bool {
1440 if contentLength > 0 {
1441 return true
1442 }
1443 if contentLength < 0 {
1444 return false
1445 }
1446 // For zero bodies, whether we send a content-length depends on the method.
1447 // It also kinda doesn't matter for http2 either way, with END_STREAM.
1448 switch method {
1449 case "POST", "PUT", "PATCH":
1450 return true
1451 default:
1452 return false
1453 }
1454}
1455
1456// requires cc.mu be held.
1457func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
1458 cc.hbuf.Reset()
1459
1460 hlSize := uint64(0)
1461 for k, vv := range req.Trailer {
1462 for _, v := range vv {
1463 hf := hpack.HeaderField{Name: k, Value: v}
1464 hlSize += uint64(hf.Size())
1465 }
1466 }
1467 if hlSize > cc.peerMaxHeaderListSize {
1468 return nil, errRequestHeaderListSize
1469 }
1470
1471 for k, vv := range req.Trailer {
1472 // Transfer-Encoding, etc.. have already been filtered at the
1473 // start of RoundTrip
1474 lowKey := strings.ToLower(k)
1475 for _, v := range vv {
1476 cc.writeHeader(lowKey, v)
1477 }
1478 }
1479 return cc.hbuf.Bytes(), nil
1480}
1481
1482func (cc *ClientConn) writeHeader(name, value string) {
1483 if VerboseLogs {
1484 log.Printf("http2: Transport encoding header %q = %q", name, value)
1485 }
1486 cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
1487}
1488
1489type resAndError struct {
1490 res *http.Response
1491 err error
1492}
1493
1494// requires cc.mu be held.
1495func (cc *ClientConn) newStream() *clientStream {
1496 cs := &clientStream{
1497 cc: cc,
1498 ID: cc.nextStreamID,
1499 resc: make(chan resAndError, 1),
1500 peerReset: make(chan struct{}),
1501 done: make(chan struct{}),
1502 }
1503 cs.flow.add(int32(cc.initialWindowSize))
1504 cs.flow.setConnFlow(&cc.flow)
1505 cs.inflow.add(transportDefaultStreamFlow)
1506 cs.inflow.setConnFlow(&cc.inflow)
1507 cc.nextStreamID += 2
1508 cc.streams[cs.ID] = cs
1509 return cs
1510}
1511
1512func (cc *ClientConn) forgetStreamID(id uint32) {
1513 cc.streamByID(id, true)
1514}
1515
1516func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
1517 cc.mu.Lock()
1518 defer cc.mu.Unlock()
1519 cs := cc.streams[id]
1520 if andRemove && cs != nil && !cc.closed {
1521 cc.lastActive = time.Now()
1522 delete(cc.streams, id)
1523 if len(cc.streams) == 0 && cc.idleTimer != nil {
1524 cc.idleTimer.Reset(cc.idleTimeout)
1525 }
1526 close(cs.done)
1527 // Wake up checkResetOrDone via clientStream.awaitFlowControl and
1528 // wake up RoundTrip if there is a pending request.
1529 cc.cond.Broadcast()
1530 }
1531 return cs
1532}
1533
1534// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
1535type clientConnReadLoop struct {
1536 cc *ClientConn
1537 closeWhenIdle bool
1538}
1539
1540// readLoop runs in its own goroutine and reads and dispatches frames.
1541func (cc *ClientConn) readLoop() {
1542 rl := &clientConnReadLoop{cc: cc}
1543 defer rl.cleanup()
1544 cc.readerErr = rl.run()
1545 if ce, ok := cc.readerErr.(ConnectionError); ok {
1546 cc.wmu.Lock()
1547 cc.fr.WriteGoAway(0, ErrCode(ce), nil)
1548 cc.wmu.Unlock()
1549 }
1550}
1551
1552// GoAwayError is returned by the Transport when the server closes the
1553// TCP connection after sending a GOAWAY frame.
1554type GoAwayError struct {
1555 LastStreamID uint32
1556 ErrCode ErrCode
1557 DebugData string
1558}
1559
1560func (e GoAwayError) Error() string {
1561 return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
1562 e.LastStreamID, e.ErrCode, e.DebugData)
1563}
1564
1565func isEOFOrNetReadError(err error) bool {
1566 if err == io.EOF {
1567 return true
1568 }
1569 ne, ok := err.(*net.OpError)
1570 return ok && ne.Op == "read"
1571}
1572
1573func (rl *clientConnReadLoop) cleanup() {
1574 cc := rl.cc
1575 defer cc.tconn.Close()
1576 defer cc.t.connPool().MarkDead(cc)
1577 defer close(cc.readerDone)
1578
1579 if cc.idleTimer != nil {
1580 cc.idleTimer.Stop()
1581 }
1582
1583 // Close any response bodies if the server closes prematurely.
1584 // TODO: also do this if we've written the headers but not
1585 // gotten a response yet.
1586 err := cc.readerErr
1587 cc.mu.Lock()
1588 if cc.goAway != nil && isEOFOrNetReadError(err) {
1589 err = GoAwayError{
1590 LastStreamID: cc.goAway.LastStreamID,
1591 ErrCode: cc.goAway.ErrCode,
1592 DebugData: cc.goAwayDebug,
1593 }
1594 } else if err == io.EOF {
1595 err = io.ErrUnexpectedEOF
1596 }
1597 for _, cs := range cc.streams {
1598 cs.bufPipe.CloseWithError(err) // no-op if already closed
1599 select {
1600 case cs.resc <- resAndError{err: err}:
1601 default:
1602 }
1603 close(cs.done)
1604 }
1605 cc.closed = true
1606 cc.cond.Broadcast()
1607 cc.mu.Unlock()
1608}
1609
1610func (rl *clientConnReadLoop) run() error {
1611 cc := rl.cc
1612 rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
1613 gotReply := false // ever saw a HEADERS reply
1614 gotSettings := false
1615 for {
1616 f, err := cc.fr.ReadFrame()
1617 if err != nil {
1618 cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
1619 }
1620 if se, ok := err.(StreamError); ok {
1621 if cs := cc.streamByID(se.StreamID, false); cs != nil {
1622 cs.cc.writeStreamReset(cs.ID, se.Code, err)
1623 cs.cc.forgetStreamID(cs.ID)
1624 if se.Cause == nil {
1625 se.Cause = cc.fr.errDetail
1626 }
1627 rl.endStreamError(cs, se)
1628 }
1629 continue
1630 } else if err != nil {
1631 return err
1632 }
1633 if VerboseLogs {
1634 cc.vlogf("http2: Transport received %s", summarizeFrame(f))
1635 }
1636 if !gotSettings {
1637 if _, ok := f.(*SettingsFrame); !ok {
1638 cc.logf("protocol error: received %T before a SETTINGS frame", f)
1639 return ConnectionError(ErrCodeProtocol)
1640 }
1641 gotSettings = true
1642 }
1643 maybeIdle := false // whether frame might transition us to idle
1644
1645 switch f := f.(type) {
1646 case *MetaHeadersFrame:
1647 err = rl.processHeaders(f)
1648 maybeIdle = true
1649 gotReply = true
1650 case *DataFrame:
1651 err = rl.processData(f)
1652 maybeIdle = true
1653 case *GoAwayFrame:
1654 err = rl.processGoAway(f)
1655 maybeIdle = true
1656 case *RSTStreamFrame:
1657 err = rl.processResetStream(f)
1658 maybeIdle = true
1659 case *SettingsFrame:
1660 err = rl.processSettings(f)
1661 case *PushPromiseFrame:
1662 err = rl.processPushPromise(f)
1663 case *WindowUpdateFrame:
1664 err = rl.processWindowUpdate(f)
1665 case *PingFrame:
1666 err = rl.processPing(f)
1667 default:
1668 cc.logf("Transport: unhandled response frame type %T", f)
1669 }
1670 if err != nil {
1671 if VerboseLogs {
1672 cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
1673 }
1674 return err
1675 }
1676 if rl.closeWhenIdle && gotReply && maybeIdle {
1677 cc.closeIfIdle()
1678 }
1679 }
1680}
1681
1682func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
1683 cc := rl.cc
1684 cs := cc.streamByID(f.StreamID, false)
1685 if cs == nil {
1686 // We'd get here if we canceled a request while the
1687 // server had its response still in flight. So if this
1688 // was just something we canceled, ignore it.
1689 return nil
1690 }
1691 if f.StreamEnded() {
1692 // Issue 20521: If the stream has ended, streamByID() causes
1693 // clientStream.done to be closed, which causes the request's bodyWriter
1694 // to be closed with an errStreamClosed, which may be received by
1695 // clientConn.RoundTrip before the result of processing these headers.
1696 // Deferring stream closure allows the header processing to occur first.
1697 // clientConn.RoundTrip may still receive the bodyWriter error first, but
1698 // the fix for issue 16102 prioritises any response.
1699 //
1700 // Issue 22413: If there is no request body, we should close the
1701 // stream before writing to cs.resc so that the stream is closed
1702 // immediately once RoundTrip returns.
1703 if cs.req.Body != nil {
1704 defer cc.forgetStreamID(f.StreamID)
1705 } else {
1706 cc.forgetStreamID(f.StreamID)
1707 }
1708 }
1709 if !cs.firstByte {
1710 if cs.trace != nil {
1711 // TODO(bradfitz): move first response byte earlier,
1712 // when we first read the 9 byte header, not waiting
1713 // until all the HEADERS+CONTINUATION frames have been
1714 // merged. This works for now.
1715 traceFirstResponseByte(cs.trace)
1716 }
1717 cs.firstByte = true
1718 }
1719 if !cs.pastHeaders {
1720 cs.pastHeaders = true
1721 } else {
1722 return rl.processTrailers(cs, f)
1723 }
1724
1725 res, err := rl.handleResponse(cs, f)
1726 if err != nil {
1727 if _, ok := err.(ConnectionError); ok {
1728 return err
1729 }
1730 // Any other error type is a stream error.
1731 cs.cc.writeStreamReset(f.StreamID, ErrCodeProtocol, err)
1732 cc.forgetStreamID(cs.ID)
1733 cs.resc <- resAndError{err: err}
1734 return nil // return nil from process* funcs to keep conn alive
1735 }
1736 if res == nil {
1737 // (nil, nil) special case. See handleResponse docs.
1738 return nil
1739 }
1740 cs.resTrailer = &res.Trailer
1741 cs.resc <- resAndError{res: res}
1742 return nil
1743}
1744
1745// may return error types nil, or ConnectionError. Any other error value
1746// is a StreamError of type ErrCodeProtocol. The returned error in that case
1747// is the detail.
1748//
1749// As a special case, handleResponse may return (nil, nil) to skip the
1750// frame (currently only used for 1xx responses).
1751func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) {
1752 if f.Truncated {
1753 return nil, errResponseHeaderListSize
1754 }
1755
1756 status := f.PseudoValue("status")
1757 if status == "" {
1758 return nil, errors.New("malformed response from server: missing status pseudo header")
1759 }
1760 statusCode, err := strconv.Atoi(status)
1761 if err != nil {
1762 return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
1763 }
1764
1765 header := make(http.Header)
1766 res := &http.Response{
1767 Proto: "HTTP/2.0",
1768 ProtoMajor: 2,
1769 Header: header,
1770 StatusCode: statusCode,
1771 Status: status + " " + http.StatusText(statusCode),
1772 }
1773 for _, hf := range f.RegularFields() {
1774 key := http.CanonicalHeaderKey(hf.Name)
1775 if key == "Trailer" {
1776 t := res.Trailer
1777 if t == nil {
1778 t = make(http.Header)
1779 res.Trailer = t
1780 }
1781 foreachHeaderElement(hf.Value, func(v string) {
1782 t[http.CanonicalHeaderKey(v)] = nil
1783 })
1784 } else {
1785 header[key] = append(header[key], hf.Value)
1786 }
1787 }
1788
1789 if statusCode >= 100 && statusCode <= 199 {
1790 cs.num1xx++
1791 const max1xxResponses = 5 // arbitrary bound on number of informational responses, same as net/http
1792 if cs.num1xx > max1xxResponses {
1793 return nil, errors.New("http2: too many 1xx informational responses")
1794 }
1795 if fn := cs.get1xxTraceFunc(); fn != nil {
1796 if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
1797 return nil, err
1798 }
1799 }
1800 if statusCode == 100 {
1801 traceGot100Continue(cs.trace)
1802 if cs.on100 != nil {
1803 cs.on100() // forces any write delay timer to fire
1804 }
1805 }
1806 cs.pastHeaders = false // do it all again
1807 return nil, nil
1808 }
1809
1810 streamEnded := f.StreamEnded()
1811 isHead := cs.req.Method == "HEAD"
1812 if !streamEnded || isHead {
1813 res.ContentLength = -1
1814 if clens := res.Header["Content-Length"]; len(clens) == 1 {
1815 if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {
1816 res.ContentLength = clen64
1817 } else {
1818 // TODO: care? unlike http/1, it won't mess up our framing, so it's
1819 // more safe smuggling-wise to ignore.
1820 }
1821 } else if len(clens) > 1 {
1822 // TODO: care? unlike http/1, it won't mess up our framing, so it's
1823 // more safe smuggling-wise to ignore.
1824 }
1825 }
1826
1827 if streamEnded || isHead {
1828 res.Body = noBody
1829 return res, nil
1830 }
1831
1832 cs.bufPipe = pipe{b: &dataBuffer{expected: res.ContentLength}}
1833 cs.bytesRemain = res.ContentLength
1834 res.Body = transportResponseBody{cs}
1835 go cs.awaitRequestCancel(cs.req)
1836
1837 if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
1838 res.Header.Del("Content-Encoding")
1839 res.Header.Del("Content-Length")
1840 res.ContentLength = -1
1841 res.Body = &gzipReader{body: res.Body}
1842 setResponseUncompressed(res)
1843 }
1844 return res, nil
1845}
1846
1847func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
1848 if cs.pastTrailers {
1849 // Too many HEADERS frames for this stream.
1850 return ConnectionError(ErrCodeProtocol)
1851 }
1852 cs.pastTrailers = true
1853 if !f.StreamEnded() {
1854 // We expect that any headers for trailers also
1855 // has END_STREAM.
1856 return ConnectionError(ErrCodeProtocol)
1857 }
1858 if len(f.PseudoFields()) > 0 {
1859 // No pseudo header fields are defined for trailers.
1860 // TODO: ConnectionError might be overly harsh? Check.
1861 return ConnectionError(ErrCodeProtocol)
1862 }
1863
1864 trailer := make(http.Header)
1865 for _, hf := range f.RegularFields() {
1866 key := http.CanonicalHeaderKey(hf.Name)
1867 trailer[key] = append(trailer[key], hf.Value)
1868 }
1869 cs.trailer = trailer
1870
1871 rl.endStream(cs)
1872 return nil
1873}
1874
1875// transportResponseBody is the concrete type of Transport.RoundTrip's
1876// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
1877// On Close it sends RST_STREAM if EOF wasn't already seen.
1878type transportResponseBody struct {
1879 cs *clientStream
1880}
1881
1882func (b transportResponseBody) Read(p []byte) (n int, err error) {
1883 cs := b.cs
1884 cc := cs.cc
1885
1886 if cs.readErr != nil {
1887 return 0, cs.readErr
1888 }
1889 n, err = b.cs.bufPipe.Read(p)
1890 if cs.bytesRemain != -1 {
1891 if int64(n) > cs.bytesRemain {
1892 n = int(cs.bytesRemain)
1893 if err == nil {
1894 err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
1895 cc.writeStreamReset(cs.ID, ErrCodeProtocol, err)
1896 }
1897 cs.readErr = err
1898 return int(cs.bytesRemain), err
1899 }
1900 cs.bytesRemain -= int64(n)
1901 if err == io.EOF && cs.bytesRemain > 0 {
1902 err = io.ErrUnexpectedEOF
1903 cs.readErr = err
1904 return n, err
1905 }
1906 }
1907 if n == 0 {
1908 // No flow control tokens to send back.
1909 return
1910 }
1911
1912 cc.mu.Lock()
1913 defer cc.mu.Unlock()
1914
1915 var connAdd, streamAdd int32
1916 // Check the conn-level first, before the stream-level.
1917 if v := cc.inflow.available(); v < transportDefaultConnFlow/2 {
1918 connAdd = transportDefaultConnFlow - v
1919 cc.inflow.add(connAdd)
1920 }
1921 if err == nil { // No need to refresh if the stream is over or failed.
1922 // Consider any buffered body data (read from the conn but not
1923 // consumed by the client) when computing flow control for this
1924 // stream.
1925 v := int(cs.inflow.available()) + cs.bufPipe.Len()
1926 if v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh {
1927 streamAdd = int32(transportDefaultStreamFlow - v)
1928 cs.inflow.add(streamAdd)
1929 }
1930 }
1931 if connAdd != 0 || streamAdd != 0 {
1932 cc.wmu.Lock()
1933 defer cc.wmu.Unlock()
1934 if connAdd != 0 {
1935 cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
1936 }
1937 if streamAdd != 0 {
1938 cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
1939 }
1940 cc.bw.Flush()
1941 }
1942 return
1943}
1944
1945var errClosedResponseBody = errors.New("http2: response body closed")
1946
1947func (b transportResponseBody) Close() error {
1948 cs := b.cs
1949 cc := cs.cc
1950
1951 serverSentStreamEnd := cs.bufPipe.Err() == io.EOF
1952 unread := cs.bufPipe.Len()
1953
1954 if unread > 0 || !serverSentStreamEnd {
1955 cc.mu.Lock()
1956 cc.wmu.Lock()
1957 if !serverSentStreamEnd {
1958 cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
1959 cs.didReset = true
1960 }
1961 // Return connection-level flow control.
1962 if unread > 0 {
1963 cc.inflow.add(int32(unread))
1964 cc.fr.WriteWindowUpdate(0, uint32(unread))
1965 }
1966 cc.bw.Flush()
1967 cc.wmu.Unlock()
1968 cc.mu.Unlock()
1969 }
1970
1971 cs.bufPipe.BreakWithError(errClosedResponseBody)
1972 cc.forgetStreamID(cs.ID)
1973 return nil
1974}
1975
1976func (rl *clientConnReadLoop) processData(f *DataFrame) error {
1977 cc := rl.cc
1978 cs := cc.streamByID(f.StreamID, f.StreamEnded())
1979 data := f.Data()
1980 if cs == nil {
1981 cc.mu.Lock()
1982 neverSent := cc.nextStreamID
1983 cc.mu.Unlock()
1984 if f.StreamID >= neverSent {
1985 // We never asked for this.
1986 cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
1987 return ConnectionError(ErrCodeProtocol)
1988 }
1989 // We probably did ask for this, but canceled. Just ignore it.
1990 // TODO: be stricter here? only silently ignore things which
1991 // we canceled, but not things which were closed normally
1992 // by the peer? Tough without accumulating too much state.
1993
1994 // But at least return their flow control:
1995 if f.Length > 0 {
1996 cc.mu.Lock()
1997 cc.inflow.add(int32(f.Length))
1998 cc.mu.Unlock()
1999
2000 cc.wmu.Lock()
2001 cc.fr.WriteWindowUpdate(0, uint32(f.Length))
2002 cc.bw.Flush()
2003 cc.wmu.Unlock()
2004 }
2005 return nil
2006 }
2007 if !cs.firstByte {
2008 cc.logf("protocol error: received DATA before a HEADERS frame")
2009 rl.endStreamError(cs, StreamError{
2010 StreamID: f.StreamID,
2011 Code: ErrCodeProtocol,
2012 })
2013 return nil
2014 }
2015 if f.Length > 0 {
2016 if cs.req.Method == "HEAD" && len(data) > 0 {
2017 cc.logf("protocol error: received DATA on a HEAD request")
2018 rl.endStreamError(cs, StreamError{
2019 StreamID: f.StreamID,
2020 Code: ErrCodeProtocol,
2021 })
2022 return nil
2023 }
2024 // Check connection-level flow control.
2025 cc.mu.Lock()
2026 if cs.inflow.available() >= int32(f.Length) {
2027 cs.inflow.take(int32(f.Length))
2028 } else {
2029 cc.mu.Unlock()
2030 return ConnectionError(ErrCodeFlowControl)
2031 }
2032 // Return any padded flow control now, since we won't
2033 // refund it later on body reads.
2034 var refund int
2035 if pad := int(f.Length) - len(data); pad > 0 {
2036 refund += pad
2037 }
2038 // Return len(data) now if the stream is already closed,
2039 // since data will never be read.
2040 didReset := cs.didReset
2041 if didReset {
2042 refund += len(data)
2043 }
2044 if refund > 0 {
2045 cc.inflow.add(int32(refund))
2046 cc.wmu.Lock()
2047 cc.fr.WriteWindowUpdate(0, uint32(refund))
2048 if !didReset {
2049 cs.inflow.add(int32(refund))
2050 cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
2051 }
2052 cc.bw.Flush()
2053 cc.wmu.Unlock()
2054 }
2055 cc.mu.Unlock()
2056
2057 if len(data) > 0 && !didReset {
2058 if _, err := cs.bufPipe.Write(data); err != nil {
2059 rl.endStreamError(cs, err)
2060 return err
2061 }
2062 }
2063 }
2064
2065 if f.StreamEnded() {
2066 rl.endStream(cs)
2067 }
2068 return nil
2069}
2070
2071var errInvalidTrailers = errors.New("http2: invalid trailers")
2072
2073func (rl *clientConnReadLoop) endStream(cs *clientStream) {
2074 // TODO: check that any declared content-length matches, like
2075 // server.go's (*stream).endStream method.
2076 rl.endStreamError(cs, nil)
2077}
2078
2079func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
2080 var code func()
2081 if err == nil {
2082 err = io.EOF
2083 code = cs.copyTrailers
2084 }
2085 if isConnectionCloseRequest(cs.req) {
2086 rl.closeWhenIdle = true
2087 }
2088 cs.bufPipe.closeWithErrorAndCode(err, code)
2089
2090 select {
2091 case cs.resc <- resAndError{err: err}:
2092 default:
2093 }
2094}
2095
2096func (cs *clientStream) copyTrailers() {
2097 for k, vv := range cs.trailer {
2098 t := cs.resTrailer
2099 if *t == nil {
2100 *t = make(http.Header)
2101 }
2102 (*t)[k] = vv
2103 }
2104}
2105
2106func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2107 cc := rl.cc
2108 cc.t.connPool().MarkDead(cc)
2109 if f.ErrCode != 0 {
2110 // TODO: deal with GOAWAY more. particularly the error code
2111 cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
2112 }
2113 cc.setGoAway(f)
2114 return nil
2115}
2116
2117func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2118 cc := rl.cc
2119 cc.mu.Lock()
2120 defer cc.mu.Unlock()
2121
2122 if f.IsAck() {
2123 if cc.wantSettingsAck {
2124 cc.wantSettingsAck = false
2125 return nil
2126 }
2127 return ConnectionError(ErrCodeProtocol)
2128 }
2129
2130 err := f.ForeachSetting(func(s Setting) error {
2131 switch s.ID {
2132 case SettingMaxFrameSize:
2133 cc.maxFrameSize = s.Val
2134 case SettingMaxConcurrentStreams:
2135 cc.maxConcurrentStreams = s.Val
2136 case SettingMaxHeaderListSize:
2137 cc.peerMaxHeaderListSize = uint64(s.Val)
2138 case SettingInitialWindowSize:
2139 // Values above the maximum flow-control
2140 // window size of 2^31-1 MUST be treated as a
2141 // connection error (Section 5.4.1) of type
2142 // FLOW_CONTROL_ERROR.
2143 if s.Val > math.MaxInt32 {
2144 return ConnectionError(ErrCodeFlowControl)
2145 }
2146
2147 // Adjust flow control of currently-open
2148 // frames by the difference of the old initial
2149 // window size and this one.
2150 delta := int32(s.Val) - int32(cc.initialWindowSize)
2151 for _, cs := range cc.streams {
2152 cs.flow.add(delta)
2153 }
2154 cc.cond.Broadcast()
2155
2156 cc.initialWindowSize = s.Val
2157 default:
2158 // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
2159 cc.vlogf("Unhandled Setting: %v", s)
2160 }
2161 return nil
2162 })
2163 if err != nil {
2164 return err
2165 }
2166
2167 cc.wmu.Lock()
2168 defer cc.wmu.Unlock()
2169
2170 cc.fr.WriteSettingsAck()
2171 cc.bw.Flush()
2172 return cc.werr
2173}
2174
2175func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
2176 cc := rl.cc
2177 cs := cc.streamByID(f.StreamID, false)
2178 if f.StreamID != 0 && cs == nil {
2179 return nil
2180 }
2181
2182 cc.mu.Lock()
2183 defer cc.mu.Unlock()
2184
2185 fl := &cc.flow
2186 if cs != nil {
2187 fl = &cs.flow
2188 }
2189 if !fl.add(int32(f.Increment)) {
2190 return ConnectionError(ErrCodeFlowControl)
2191 }
2192 cc.cond.Broadcast()
2193 return nil
2194}
2195
2196func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
2197 cs := rl.cc.streamByID(f.StreamID, true)
2198 if cs == nil {
2199 // TODO: return error if server tries to RST_STEAM an idle stream
2200 return nil
2201 }
2202 select {
2203 case <-cs.peerReset:
2204 // Already reset.
2205 // This is the only goroutine
2206 // which closes this, so there
2207 // isn't a race.
2208 default:
2209 err := streamError(cs.ID, f.ErrCode)
2210 cs.resetErr = err
2211 close(cs.peerReset)
2212 cs.bufPipe.CloseWithError(err)
2213 cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
2214 }
2215 return nil
2216}
2217
2218// Ping sends a PING frame to the server and waits for the ack.
2219// Public implementation is in go17.go and not_go17.go
2220func (cc *ClientConn) ping(ctx contextContext) error {
2221 c := make(chan struct{})
2222 // Generate a random payload
2223 var p [8]byte
2224 for {
2225 if _, err := rand.Read(p[:]); err != nil {
2226 return err
2227 }
2228 cc.mu.Lock()
2229 // check for dup before insert
2230 if _, found := cc.pings[p]; !found {
2231 cc.pings[p] = c
2232 cc.mu.Unlock()
2233 break
2234 }
2235 cc.mu.Unlock()
2236 }
2237 cc.wmu.Lock()
2238 if err := cc.fr.WritePing(false, p); err != nil {
2239 cc.wmu.Unlock()
2240 return err
2241 }
2242 if err := cc.bw.Flush(); err != nil {
2243 cc.wmu.Unlock()
2244 return err
2245 }
2246 cc.wmu.Unlock()
2247 select {
2248 case <-c:
2249 return nil
2250 case <-ctx.Done():
2251 return ctx.Err()
2252 case <-cc.readerDone:
2253 // connection closed
2254 return cc.readerErr
2255 }
2256}
2257
2258func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
2259 if f.IsAck() {
2260 cc := rl.cc
2261 cc.mu.Lock()
2262 defer cc.mu.Unlock()
2263 // If ack, notify listener if any
2264 if c, ok := cc.pings[f.Data]; ok {
2265 close(c)
2266 delete(cc.pings, f.Data)
2267 }
2268 return nil
2269 }
2270 cc := rl.cc
2271 cc.wmu.Lock()
2272 defer cc.wmu.Unlock()
2273 if err := cc.fr.WritePing(true, f.Data); err != nil {
2274 return err
2275 }
2276 return cc.bw.Flush()
2277}
2278
2279func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
2280 // We told the peer we don't want them.
2281 // Spec says:
2282 // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
2283 // setting of the peer endpoint is set to 0. An endpoint that
2284 // has set this setting and has received acknowledgement MUST
2285 // treat the receipt of a PUSH_PROMISE frame as a connection
2286 // error (Section 5.4.1) of type PROTOCOL_ERROR."
2287 return ConnectionError(ErrCodeProtocol)
2288}
2289
2290func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
2291 // TODO: map err to more interesting error codes, once the
2292 // HTTP community comes up with some. But currently for
2293 // RST_STREAM there's no equivalent to GOAWAY frame's debug
2294 // data, and the error codes are all pretty vague ("cancel").
2295 cc.wmu.Lock()
2296 cc.fr.WriteRSTStream(streamID, code)
2297 cc.bw.Flush()
2298 cc.wmu.Unlock()
2299}
2300
2301var (
2302 errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
2303 errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit")
2304 errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers")
2305)
2306
2307func (cc *ClientConn) logf(format string, args ...interface{}) {
2308 cc.t.logf(format, args...)
2309}
2310
2311func (cc *ClientConn) vlogf(format string, args ...interface{}) {
2312 cc.t.vlogf(format, args...)
2313}
2314
2315func (t *Transport) vlogf(format string, args ...interface{}) {
2316 if VerboseLogs {
2317 t.logf(format, args...)
2318 }
2319}
2320
2321func (t *Transport) logf(format string, args ...interface{}) {
2322 log.Printf(format, args...)
2323}
2324
2325var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
2326
2327func strSliceContains(ss []string, s string) bool {
2328 for _, v := range ss {
2329 if v == s {
2330 return true
2331 }
2332 }
2333 return false
2334}
2335
2336type erringRoundTripper struct{ err error }
2337
2338func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }
2339
2340// gzipReader wraps a response body so it can lazily
2341// call gzip.NewReader on the first call to Read
2342type gzipReader struct {
2343 body io.ReadCloser // underlying Response.Body
2344 zr *gzip.Reader // lazily-initialized gzip reader
2345 zerr error // sticky error
2346}
2347
2348func (gz *gzipReader) Read(p []byte) (n int, err error) {
2349 if gz.zerr != nil {
2350 return 0, gz.zerr
2351 }
2352 if gz.zr == nil {
2353 gz.zr, err = gzip.NewReader(gz.body)
2354 if err != nil {
2355 gz.zerr = err
2356 return 0, err
2357 }
2358 }
2359 return gz.zr.Read(p)
2360}
2361
2362func (gz *gzipReader) Close() error {
2363 return gz.body.Close()
2364}
2365
2366type errorReader struct{ err error }
2367
2368func (r errorReader) Read(p []byte) (int, error) { return 0, r.err }
2369
2370// bodyWriterState encapsulates various state around the Transport's writing
2371// of the request body, particularly regarding doing delayed writes of the body
2372// when the request contains "Expect: 100-continue".
2373type bodyWriterState struct {
2374 cs *clientStream
2375 timer *time.Timer // if non-nil, we're doing a delayed write
2376 fnonce *sync.Once // to call fn with
2377 fn func() // the code to run in the goroutine, writing the body
2378 resc chan error // result of fn's execution
2379 delay time.Duration // how long we should delay a delayed write for
2380}
2381
2382func (t *Transport) getBodyWriterState(cs *clientStream, body io.Reader) (s bodyWriterState) {
2383 s.cs = cs
2384 if body == nil {
2385 return
2386 }
2387 resc := make(chan error, 1)
2388 s.resc = resc
2389 s.fn = func() {
2390 cs.cc.mu.Lock()
2391 cs.startedWrite = true
2392 cs.cc.mu.Unlock()
2393 resc <- cs.writeRequestBody(body, cs.req.Body)
2394 }
2395 s.delay = t.expectContinueTimeout()
2396 if s.delay == 0 ||
2397 !httpguts.HeaderValuesContainsToken(
2398 cs.req.Header["Expect"],
2399 "100-continue") {
2400 return
2401 }
2402 s.fnonce = new(sync.Once)
2403
2404 // Arm the timer with a very large duration, which we'll
2405 // intentionally lower later. It has to be large now because
2406 // we need a handle to it before writing the headers, but the
2407 // s.delay value is defined to not start until after the
2408 // request headers were written.
2409 const hugeDuration = 365 * 24 * time.Hour
2410 s.timer = time.AfterFunc(hugeDuration, func() {
2411 s.fnonce.Do(s.fn)
2412 })
2413 return
2414}
2415
2416func (s bodyWriterState) cancel() {
2417 if s.timer != nil {
2418 s.timer.Stop()
2419 }
2420}
2421
2422func (s bodyWriterState) on100() {
2423 if s.timer == nil {
2424 // If we didn't do a delayed write, ignore the server's
2425 // bogus 100 continue response.
2426 return
2427 }
2428 s.timer.Stop()
2429 go func() { s.fnonce.Do(s.fn) }()
2430}
2431
2432// scheduleBodyWrite starts writing the body, either immediately (in
2433// the common case) or after the delay timeout. It should not be
2434// called until after the headers have been written.
2435func (s bodyWriterState) scheduleBodyWrite() {
2436 if s.timer == nil {
2437 // We're not doing a delayed write (see
2438 // getBodyWriterState), so just start the writing
2439 // goroutine immediately.
2440 go s.fn()
2441 return
2442 }
2443 traceWait100Continue(s.cs.trace)
2444 if s.timer.Stop() {
2445 s.timer.Reset(s.delay)
2446 }
2447}
2448
2449// isConnectionCloseRequest reports whether req should use its own
2450// connection for a single request and then close the connection.
2451func isConnectionCloseRequest(req *http.Request) bool {
2452 return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
2453}