blob: 1be518a62fb48819259fa15b1d6999d984658c05 [file] [log] [blame]
Serge Bazanskicc25bdf2018-10-25 14:02:58 +02001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC). It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport
23
24import (
25 "errors"
26 "fmt"
27 "io"
28 "net"
29 "sync"
30 "sync/atomic"
31
32 "golang.org/x/net/context"
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/credentials"
35 "google.golang.org/grpc/keepalive"
36 "google.golang.org/grpc/metadata"
37 "google.golang.org/grpc/stats"
38 "google.golang.org/grpc/status"
39 "google.golang.org/grpc/tap"
40)
41
42// recvMsg represents the received msg from the transport. All transport
43// protocol specific info has been removed.
44type recvMsg struct {
45 data []byte
46 // nil: received some data
47 // io.EOF: stream is completed. data is nil.
48 // other non-nil error: transport failure. data is nil.
49 err error
50}
51
52// recvBuffer is an unbounded channel of recvMsg structs.
53// Note recvBuffer differs from controlBuffer only in that recvBuffer
54// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
55// recvBuffer is written to much more often than
56// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
57type recvBuffer struct {
58 c chan recvMsg
59 mu sync.Mutex
60 backlog []recvMsg
61 err error
62}
63
64func newRecvBuffer() *recvBuffer {
65 b := &recvBuffer{
66 c: make(chan recvMsg, 1),
67 }
68 return b
69}
70
71func (b *recvBuffer) put(r recvMsg) {
72 b.mu.Lock()
73 if b.err != nil {
74 b.mu.Unlock()
75 // An error had occurred earlier, don't accept more
76 // data or errors.
77 return
78 }
79 b.err = r.err
80 if len(b.backlog) == 0 {
81 select {
82 case b.c <- r:
83 b.mu.Unlock()
84 return
85 default:
86 }
87 }
88 b.backlog = append(b.backlog, r)
89 b.mu.Unlock()
90}
91
92func (b *recvBuffer) load() {
93 b.mu.Lock()
94 if len(b.backlog) > 0 {
95 select {
96 case b.c <- b.backlog[0]:
97 b.backlog[0] = recvMsg{}
98 b.backlog = b.backlog[1:]
99 default:
100 }
101 }
102 b.mu.Unlock()
103}
104
105// get returns the channel that receives a recvMsg in the buffer.
106//
107// Upon receipt of a recvMsg, the caller should call load to send another
108// recvMsg onto the channel if there is any.
109func (b *recvBuffer) get() <-chan recvMsg {
110 return b.c
111}
112
113//
114// recvBufferReader implements io.Reader interface to read the data from
115// recvBuffer.
116type recvBufferReader struct {
117 ctx context.Context
118 ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
119 recv *recvBuffer
120 last []byte // Stores the remaining data in the previous calls.
121 err error
122}
123
124// Read reads the next len(p) bytes from last. If last is drained, it tries to
125// read additional data from recv. It blocks if there no additional data available
126// in recv. If Read returns any non-nil error, it will continue to return that error.
127func (r *recvBufferReader) Read(p []byte) (n int, err error) {
128 if r.err != nil {
129 return 0, r.err
130 }
131 n, r.err = r.read(p)
132 return n, r.err
133}
134
135func (r *recvBufferReader) read(p []byte) (n int, err error) {
136 if r.last != nil && len(r.last) > 0 {
137 // Read remaining data left in last call.
138 copied := copy(p, r.last)
139 r.last = r.last[copied:]
140 return copied, nil
141 }
142 select {
143 case <-r.ctxDone:
144 return 0, ContextErr(r.ctx.Err())
145 case m := <-r.recv.get():
146 r.recv.load()
147 if m.err != nil {
148 return 0, m.err
149 }
150 copied := copy(p, m.data)
151 r.last = m.data[copied:]
152 return copied, nil
153 }
154}
155
156type streamState uint32
157
158const (
159 streamActive streamState = iota
160 streamWriteDone // EndStream sent
161 streamReadDone // EndStream received
162 streamDone // the entire stream is finished.
163)
164
165// Stream represents an RPC in the transport layer.
166type Stream struct {
167 id uint32
168 st ServerTransport // nil for client side Stream
169 ctx context.Context // the associated context of the stream
170 cancel context.CancelFunc // always nil for client side Stream
171 done chan struct{} // closed at the end of stream to unblock writers. On the client side.
172 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
173 method string // the associated RPC method of the stream
174 recvCompress string
175 sendCompress string
176 buf *recvBuffer
177 trReader io.Reader
178 fc *inFlow
179 wq *writeQuota
180
181 // Callback to state application's intentions to read data. This
182 // is used to adjust flow control, if needed.
183 requestRead func(int)
184
185 headerChan chan struct{} // closed to indicate the end of header metadata.
186 headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
187
188 // hdrMu protects header and trailer metadata on the server-side.
189 hdrMu sync.Mutex
190 header metadata.MD // the received header metadata.
191 trailer metadata.MD // the key-value map of trailer metadata.
192
193 noHeaders bool // set if the client never received headers (set only after the stream is done).
194
195 // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
196 headerSent uint32
197
198 state streamState
199
200 // On client-side it is the status error received from the server.
201 // On server-side it is unused.
202 status *status.Status
203
204 bytesReceived uint32 // indicates whether any bytes have been received on this stream
205 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
206
207 // contentSubtype is the content-subtype for requests.
208 // this must be lowercase or the behavior is undefined.
209 contentSubtype string
210}
211
212// isHeaderSent is only valid on the server-side.
213func (s *Stream) isHeaderSent() bool {
214 return atomic.LoadUint32(&s.headerSent) == 1
215}
216
217// updateHeaderSent updates headerSent and returns true
218// if it was alreay set. It is valid only on server-side.
219func (s *Stream) updateHeaderSent() bool {
220 return atomic.SwapUint32(&s.headerSent, 1) == 1
221}
222
223func (s *Stream) swapState(st streamState) streamState {
224 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
225}
226
227func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
228 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
229}
230
231func (s *Stream) getState() streamState {
232 return streamState(atomic.LoadUint32((*uint32)(&s.state)))
233}
234
235func (s *Stream) waitOnHeader() error {
236 if s.headerChan == nil {
237 // On the server headerChan is always nil since a stream originates
238 // only after having received headers.
239 return nil
240 }
241 select {
242 case <-s.ctx.Done():
243 return ContextErr(s.ctx.Err())
244 case <-s.headerChan:
245 return nil
246 }
247}
248
249// RecvCompress returns the compression algorithm applied to the inbound
250// message. It is empty string if there is no compression applied.
251func (s *Stream) RecvCompress() string {
252 if err := s.waitOnHeader(); err != nil {
253 return ""
254 }
255 return s.recvCompress
256}
257
258// SetSendCompress sets the compression algorithm to the stream.
259func (s *Stream) SetSendCompress(str string) {
260 s.sendCompress = str
261}
262
263// Done returns a channel which is closed when it receives the final status
264// from the server.
265func (s *Stream) Done() <-chan struct{} {
266 return s.done
267}
268
269// Header acquires the key-value pairs of header metadata once it
270// is available. It blocks until i) the metadata is ready or ii) there is no
271// header metadata or iii) the stream is canceled/expired.
272func (s *Stream) Header() (metadata.MD, error) {
273 err := s.waitOnHeader()
274 // Even if the stream is closed, header is returned if available.
275 select {
276 case <-s.headerChan:
277 if s.header == nil {
278 return nil, nil
279 }
280 return s.header.Copy(), nil
281 default:
282 }
283 return nil, err
284}
285
286// TrailersOnly blocks until a header or trailers-only frame is received and
287// then returns true if the stream was trailers-only. If the stream ends
288// before headers are received, returns true, nil. If a context error happens
289// first, returns it as a status error. Client-side only.
290func (s *Stream) TrailersOnly() (bool, error) {
291 err := s.waitOnHeader()
292 if err != nil {
293 return false, err
294 }
295 // if !headerDone, some other connection error occurred.
296 return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil
297}
298
299// Trailer returns the cached trailer metedata. Note that if it is not called
300// after the entire stream is done, it could return an empty MD. Client
301// side only.
302// It can be safely read only after stream has ended that is either read
303// or write have returned io.EOF.
304func (s *Stream) Trailer() metadata.MD {
305 c := s.trailer.Copy()
306 return c
307}
308
309// ContentSubtype returns the content-subtype for a request. For example, a
310// content-subtype of "proto" will result in a content-type of
311// "application/grpc+proto". This will always be lowercase. See
312// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
313// more details.
314func (s *Stream) ContentSubtype() string {
315 return s.contentSubtype
316}
317
318// Context returns the context of the stream.
319func (s *Stream) Context() context.Context {
320 return s.ctx
321}
322
323// Method returns the method for the stream.
324func (s *Stream) Method() string {
325 return s.method
326}
327
328// Status returns the status received from the server.
329// Status can be read safely only after the stream has ended,
330// that is, after Done() is closed.
331func (s *Stream) Status() *status.Status {
332 return s.status
333}
334
335// SetHeader sets the header metadata. This can be called multiple times.
336// Server side only.
337// This should not be called in parallel to other data writes.
338func (s *Stream) SetHeader(md metadata.MD) error {
339 if md.Len() == 0 {
340 return nil
341 }
342 if s.isHeaderSent() || s.getState() == streamDone {
343 return ErrIllegalHeaderWrite
344 }
345 s.hdrMu.Lock()
346 s.header = metadata.Join(s.header, md)
347 s.hdrMu.Unlock()
348 return nil
349}
350
351// SendHeader sends the given header metadata. The given metadata is
352// combined with any metadata set by previous calls to SetHeader and
353// then written to the transport stream.
354func (s *Stream) SendHeader(md metadata.MD) error {
355 return s.st.WriteHeader(s, md)
356}
357
358// SetTrailer sets the trailer metadata which will be sent with the RPC status
359// by the server. This can be called multiple times. Server side only.
360// This should not be called parallel to other data writes.
361func (s *Stream) SetTrailer(md metadata.MD) error {
362 if md.Len() == 0 {
363 return nil
364 }
365 if s.getState() == streamDone {
366 return ErrIllegalHeaderWrite
367 }
368 s.hdrMu.Lock()
369 s.trailer = metadata.Join(s.trailer, md)
370 s.hdrMu.Unlock()
371 return nil
372}
373
374func (s *Stream) write(m recvMsg) {
375 s.buf.put(m)
376}
377
378// Read reads all p bytes from the wire for this stream.
379func (s *Stream) Read(p []byte) (n int, err error) {
380 // Don't request a read if there was an error earlier
381 if er := s.trReader.(*transportReader).er; er != nil {
382 return 0, er
383 }
384 s.requestRead(len(p))
385 return io.ReadFull(s.trReader, p)
386}
387
388// tranportReader reads all the data available for this Stream from the transport and
389// passes them into the decoder, which converts them into a gRPC message stream.
390// The error is io.EOF when the stream is done or another non-nil error if
391// the stream broke.
392type transportReader struct {
393 reader io.Reader
394 // The handler to control the window update procedure for both this
395 // particular stream and the associated transport.
396 windowHandler func(int)
397 er error
398}
399
400func (t *transportReader) Read(p []byte) (n int, err error) {
401 n, err = t.reader.Read(p)
402 if err != nil {
403 t.er = err
404 return
405 }
406 t.windowHandler(n)
407 return
408}
409
410// BytesReceived indicates whether any bytes have been received on this stream.
411func (s *Stream) BytesReceived() bool {
412 return atomic.LoadUint32(&s.bytesReceived) == 1
413}
414
415// Unprocessed indicates whether the server did not process this stream --
416// i.e. it sent a refused stream or GOAWAY including this stream ID.
417func (s *Stream) Unprocessed() bool {
418 return atomic.LoadUint32(&s.unprocessed) == 1
419}
420
421// GoString is implemented by Stream so context.String() won't
422// race when printing %#v.
423func (s *Stream) GoString() string {
424 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
425}
426
427// state of transport
428type transportState int
429
430const (
431 reachable transportState = iota
432 closing
433 draining
434)
435
436// ServerConfig consists of all the configurations to establish a server transport.
437type ServerConfig struct {
438 MaxStreams uint32
439 AuthInfo credentials.AuthInfo
440 InTapHandle tap.ServerInHandle
441 StatsHandler stats.Handler
442 KeepaliveParams keepalive.ServerParameters
443 KeepalivePolicy keepalive.EnforcementPolicy
444 InitialWindowSize int32
445 InitialConnWindowSize int32
446 WriteBufferSize int
447 ReadBufferSize int
448 ChannelzParentID int64
449 MaxHeaderListSize *uint32
450}
451
452// NewServerTransport creates a ServerTransport with conn or non-nil error
453// if it fails.
454func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
455 return newHTTP2Server(conn, config)
456}
457
458// ConnectOptions covers all relevant options for communicating with the server.
459type ConnectOptions struct {
460 // UserAgent is the application user agent.
461 UserAgent string
462 // Dialer specifies how to dial a network address.
463 Dialer func(context.Context, string) (net.Conn, error)
464 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
465 FailOnNonTempDialError bool
466 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
467 PerRPCCredentials []credentials.PerRPCCredentials
468 // TransportCredentials stores the Authenticator required to setup a client
469 // connection. Only one of TransportCredentials and CredsBundle is non-nil.
470 TransportCredentials credentials.TransportCredentials
471 // CredsBundle is the credentials bundle to be used. Only one of
472 // TransportCredentials and CredsBundle is non-nil.
473 CredsBundle credentials.Bundle
474 // KeepaliveParams stores the keepalive parameters.
475 KeepaliveParams keepalive.ClientParameters
476 // StatsHandler stores the handler for stats.
477 StatsHandler stats.Handler
478 // InitialWindowSize sets the initial window size for a stream.
479 InitialWindowSize int32
480 // InitialConnWindowSize sets the initial window size for a connection.
481 InitialConnWindowSize int32
482 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
483 WriteBufferSize int
484 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
485 ReadBufferSize int
486 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
487 ChannelzParentID int64
488 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
489 MaxHeaderListSize *uint32
490}
491
492// TargetInfo contains the information of the target such as network address and metadata.
493type TargetInfo struct {
494 Addr string
495 Metadata interface{}
496 Authority string
497}
498
499// NewClientTransport establishes the transport with the required ConnectOptions
500// and returns it to the caller.
501func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
502 return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess, onGoAway, onClose)
503}
504
505// Options provides additional hints and information for message
506// transmission.
507type Options struct {
508 // Last indicates whether this write is the last piece for
509 // this stream.
510 Last bool
511}
512
513// CallHdr carries the information of a particular RPC.
514type CallHdr struct {
515 // Host specifies the peer's host.
516 Host string
517
518 // Method specifies the operation to perform.
519 Method string
520
521 // SendCompress specifies the compression algorithm applied on
522 // outbound message.
523 SendCompress string
524
525 // Creds specifies credentials.PerRPCCredentials for a call.
526 Creds credentials.PerRPCCredentials
527
528 // ContentSubtype specifies the content-subtype for a request. For example, a
529 // content-subtype of "proto" will result in a content-type of
530 // "application/grpc+proto". The value of ContentSubtype must be all
531 // lowercase, otherwise the behavior is undefined. See
532 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
533 // for more details.
534 ContentSubtype string
535
536 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
537}
538
539// ClientTransport is the common interface for all gRPC client-side transport
540// implementations.
541type ClientTransport interface {
542 // Close tears down this transport. Once it returns, the transport
543 // should not be accessed any more. The caller must make sure this
544 // is called only once.
545 Close() error
546
547 // GracefulClose starts to tear down the transport. It stops accepting
548 // new RPCs and wait the completion of the pending RPCs.
549 GracefulClose() error
550
551 // Write sends the data for the given stream. A nil stream indicates
552 // the write is to be performed on the transport as a whole.
553 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
554
555 // NewStream creates a Stream for an RPC.
556 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
557
558 // CloseStream clears the footprint of a stream when the stream is
559 // not needed any more. The err indicates the error incurred when
560 // CloseStream is called. Must be called when a stream is finished
561 // unless the associated transport is closing.
562 CloseStream(stream *Stream, err error)
563
564 // Error returns a channel that is closed when some I/O error
565 // happens. Typically the caller should have a goroutine to monitor
566 // this in order to take action (e.g., close the current transport
567 // and create a new one) in error case. It should not return nil
568 // once the transport is initiated.
569 Error() <-chan struct{}
570
571 // GoAway returns a channel that is closed when ClientTransport
572 // receives the draining signal from the server (e.g., GOAWAY frame in
573 // HTTP/2).
574 GoAway() <-chan struct{}
575
576 // GetGoAwayReason returns the reason why GoAway frame was received.
577 GetGoAwayReason() GoAwayReason
578
579 // IncrMsgSent increments the number of message sent through this transport.
580 IncrMsgSent()
581
582 // IncrMsgRecv increments the number of message received through this transport.
583 IncrMsgRecv()
584}
585
586// ServerTransport is the common interface for all gRPC server-side transport
587// implementations.
588//
589// Methods may be called concurrently from multiple goroutines, but
590// Write methods for a given Stream will be called serially.
591type ServerTransport interface {
592 // HandleStreams receives incoming streams using the given handler.
593 HandleStreams(func(*Stream), func(context.Context, string) context.Context)
594
595 // WriteHeader sends the header metadata for the given stream.
596 // WriteHeader may not be called on all streams.
597 WriteHeader(s *Stream, md metadata.MD) error
598
599 // Write sends the data for the given stream.
600 // Write may not be called on all streams.
601 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
602
603 // WriteStatus sends the status of a stream to the client. WriteStatus is
604 // the final call made on a stream and always occurs.
605 WriteStatus(s *Stream, st *status.Status) error
606
607 // Close tears down the transport. Once it is called, the transport
608 // should not be accessed any more. All the pending streams and their
609 // handlers will be terminated asynchronously.
610 Close() error
611
612 // RemoteAddr returns the remote network address.
613 RemoteAddr() net.Addr
614
615 // Drain notifies the client this ServerTransport stops accepting new RPCs.
616 Drain()
617
618 // IncrMsgSent increments the number of message sent through this transport.
619 IncrMsgSent()
620
621 // IncrMsgRecv increments the number of message received through this transport.
622 IncrMsgRecv()
623}
624
625// connectionErrorf creates an ConnectionError with the specified error description.
626func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
627 return ConnectionError{
628 Desc: fmt.Sprintf(format, a...),
629 temp: temp,
630 err: e,
631 }
632}
633
634// ConnectionError is an error that results in the termination of the
635// entire connection and the retry of all the active streams.
636type ConnectionError struct {
637 Desc string
638 temp bool
639 err error
640}
641
642func (e ConnectionError) Error() string {
643 return fmt.Sprintf("connection error: desc = %q", e.Desc)
644}
645
646// Temporary indicates if this connection error is temporary or fatal.
647func (e ConnectionError) Temporary() bool {
648 return e.temp
649}
650
651// Origin returns the original error of this connection error.
652func (e ConnectionError) Origin() error {
653 // Never return nil error here.
654 // If the original error is nil, return itself.
655 if e.err == nil {
656 return e
657 }
658 return e.err
659}
660
661var (
662 // ErrConnClosing indicates that the transport is closing.
663 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
664 // errStreamDrain indicates that the stream is rejected because the
665 // connection is draining. This could be caused by goaway or balancer
666 // removing the address.
667 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
668 // errStreamDone is returned from write at the client side to indiacte application
669 // layer of an error.
670 errStreamDone = errors.New("the stream is done")
671 // StatusGoAway indicates that the server sent a GOAWAY that included this
672 // stream's ID in unprocessed RPCs.
673 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
674)
675
676// GoAwayReason contains the reason for the GoAway frame received.
677type GoAwayReason uint8
678
679const (
680 // GoAwayInvalid indicates that no GoAway frame is received.
681 GoAwayInvalid GoAwayReason = 0
682 // GoAwayNoReason is the default value when GoAway frame is received.
683 GoAwayNoReason GoAwayReason = 1
684 // GoAwayTooManyPings indicates that a GoAway frame with
685 // ErrCodeEnhanceYourCalm was received and that the debug data said
686 // "too_many_pings".
687 GoAwayTooManyPings GoAwayReason = 2
688)
689
690// channelzData is used to store channelz related data for http2Client and http2Server.
691// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
692// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
693// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
694type channelzData struct {
695 kpCount int64
696 // The number of streams that have started, including already finished ones.
697 streamsStarted int64
698 // Client side: The number of streams that have ended successfully by receiving
699 // EoS bit set frame from server.
700 // Server side: The number of streams that have ended successfully by sending
701 // frame with EoS bit set.
702 streamsSucceeded int64
703 streamsFailed int64
704 // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
705 // instead of time.Time since it's more costly to atomically update time.Time variable than int64
706 // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
707 lastStreamCreatedTime int64
708 msgSent int64
709 msgRecv int64
710 lastMsgSentTime int64
711 lastMsgRecvTime int64
712}