blob: b71eb311215f5411b93dfed10c0f3ab9f5961aa1 [file] [log] [blame]
Serge Bazanskicc25bdf2018-10-25 14:02:58 +02001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "errors"
23 "io"
24 "math"
25 "strconv"
26 "sync"
27 "time"
28
29 "golang.org/x/net/context"
30 "golang.org/x/net/trace"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/encoding"
34 "google.golang.org/grpc/grpclog"
35 "google.golang.org/grpc/internal/channelz"
36 "google.golang.org/grpc/internal/grpcrand"
37 "google.golang.org/grpc/internal/transport"
38 "google.golang.org/grpc/metadata"
39 "google.golang.org/grpc/stats"
40 "google.golang.org/grpc/status"
41)
42
43// StreamHandler defines the handler called by gRPC server to complete the
44// execution of a streaming RPC. If a StreamHandler returns an error, it
45// should be produced by the status package, or else gRPC will use
46// codes.Unknown as the status code and err.Error() as the status message
47// of the RPC.
48type StreamHandler func(srv interface{}, stream ServerStream) error
49
50// StreamDesc represents a streaming RPC service's method specification.
51type StreamDesc struct {
52 StreamName string
53 Handler StreamHandler
54
55 // At least one of these is true.
56 ServerStreams bool
57 ClientStreams bool
58}
59
60// Stream defines the common interface a client or server stream has to satisfy.
61//
62// Deprecated: See ClientStream and ServerStream documentation instead.
63type Stream interface {
64 // Deprecated: See ClientStream and ServerStream documentation instead.
65 Context() context.Context
66 // Deprecated: See ClientStream and ServerStream documentation instead.
67 SendMsg(m interface{}) error
68 // Deprecated: See ClientStream and ServerStream documentation instead.
69 RecvMsg(m interface{}) error
70}
71
72// ClientStream defines the client-side behavior of a streaming RPC.
73//
74// All errors returned from ClientStream methods are compatible with the
75// status package.
76type ClientStream interface {
77 // Header returns the header metadata received from the server if there
78 // is any. It blocks if the metadata is not ready to read.
79 Header() (metadata.MD, error)
80 // Trailer returns the trailer metadata from the server, if there is any.
81 // It must only be called after stream.CloseAndRecv has returned, or
82 // stream.Recv has returned a non-nil error (including io.EOF).
83 Trailer() metadata.MD
84 // CloseSend closes the send direction of the stream. It closes the stream
85 // when non-nil error is met.
86 CloseSend() error
87 // Context returns the context for this stream.
88 //
89 // It should not be called until after Header or RecvMsg has returned. Once
90 // called, subsequent client-side retries are disabled.
91 Context() context.Context
92 // SendMsg is generally called by generated code. On error, SendMsg aborts
93 // the stream. If the error was generated by the client, the status is
94 // returned directly; otherwise, io.EOF is returned and the status of
95 // the stream may be discovered using RecvMsg.
96 //
97 // SendMsg blocks until:
98 // - There is sufficient flow control to schedule m with the transport, or
99 // - The stream is done, or
100 // - The stream breaks.
101 //
102 // SendMsg does not wait until the message is received by the server. An
103 // untimely stream closure may result in lost messages. To ensure delivery,
104 // users should ensure the RPC completed successfully using RecvMsg.
105 //
106 // It is safe to have a goroutine calling SendMsg and another goroutine
107 // calling RecvMsg on the same stream at the same time, but it is not safe
108 // to call SendMsg on the same stream in different goroutines.
109 SendMsg(m interface{}) error
110 // RecvMsg blocks until it receives a message into m or the stream is
111 // done. It returns io.EOF when the stream completes successfully. On
112 // any other error, the stream is aborted and the error contains the RPC
113 // status.
114 //
115 // It is safe to have a goroutine calling SendMsg and another goroutine
116 // calling RecvMsg on the same stream at the same time, but it is not
117 // safe to call RecvMsg on the same stream in different goroutines.
118 RecvMsg(m interface{}) error
119}
120
121// NewStream creates a new Stream for the client side. This is typically
122// called by generated code. ctx is used for the lifetime of the stream.
123//
124// To ensure resources are not leaked due to the stream returned, one of the following
125// actions must be performed:
126//
127// 1. Call Close on the ClientConn.
128// 2. Cancel the context provided.
129// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
130// client-streaming RPC, for instance, might use the helper function
131// CloseAndRecv (note that CloseSend does not Recv, therefore is not
132// guaranteed to release all resources).
133// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
134//
135// If none of the above happen, a goroutine and a context will be leaked, and grpc
136// will not call the optionally-configured stats handler with a stats.End message.
137func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
138 // allow interceptor to see all applicable call options, which means those
139 // configured as defaults from dial option as well as per-call options
140 opts = combine(cc.dopts.callOptions, opts)
141
142 if cc.dopts.streamInt != nil {
143 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
144 }
145 return newClientStream(ctx, desc, cc, method, opts...)
146}
147
148// NewClientStream is a wrapper for ClientConn.NewStream.
149func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
150 return cc.NewStream(ctx, desc, method, opts...)
151}
152
153func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
154 if channelz.IsOn() {
155 cc.incrCallsStarted()
156 defer func() {
157 if err != nil {
158 cc.incrCallsFailed()
159 }
160 }()
161 }
162 c := defaultCallInfo()
163 mc := cc.GetMethodConfig(method)
164 if mc.WaitForReady != nil {
165 c.failFast = !*mc.WaitForReady
166 }
167
168 // Possible context leak:
169 // The cancel function for the child context we create will only be called
170 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
171 // an error is generated by SendMsg.
172 // https://github.com/grpc/grpc-go/issues/1818.
173 var cancel context.CancelFunc
174 if mc.Timeout != nil && *mc.Timeout >= 0 {
175 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
176 } else {
177 ctx, cancel = context.WithCancel(ctx)
178 }
179 defer func() {
180 if err != nil {
181 cancel()
182 }
183 }()
184
185 for _, o := range opts {
186 if err := o.before(c); err != nil {
187 return nil, toRPCErr(err)
188 }
189 }
190 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
191 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
192 if err := setCallInfoCodec(c); err != nil {
193 return nil, err
194 }
195
196 callHdr := &transport.CallHdr{
197 Host: cc.authority,
198 Method: method,
199 ContentSubtype: c.contentSubtype,
200 }
201
202 // Set our outgoing compression according to the UseCompressor CallOption, if
203 // set. In that case, also find the compressor from the encoding package.
204 // Otherwise, use the compressor configured by the WithCompressor DialOption,
205 // if set.
206 var cp Compressor
207 var comp encoding.Compressor
208 if ct := c.compressorType; ct != "" {
209 callHdr.SendCompress = ct
210 if ct != encoding.Identity {
211 comp = encoding.GetCompressor(ct)
212 if comp == nil {
213 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
214 }
215 }
216 } else if cc.dopts.cp != nil {
217 callHdr.SendCompress = cc.dopts.cp.Type()
218 cp = cc.dopts.cp
219 }
220 if c.creds != nil {
221 callHdr.Creds = c.creds
222 }
223 var trInfo traceInfo
224 if EnableTracing {
225 trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
226 trInfo.firstLine.client = true
227 if deadline, ok := ctx.Deadline(); ok {
228 trInfo.firstLine.deadline = deadline.Sub(time.Now())
229 }
230 trInfo.tr.LazyLog(&trInfo.firstLine, false)
231 ctx = trace.NewContext(ctx, trInfo.tr)
232 }
233 ctx = newContextWithRPCInfo(ctx, c.failFast)
234 sh := cc.dopts.copts.StatsHandler
235 var beginTime time.Time
236 if sh != nil {
237 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
238 beginTime = time.Now()
239 begin := &stats.Begin{
240 Client: true,
241 BeginTime: beginTime,
242 FailFast: c.failFast,
243 }
244 sh.HandleRPC(ctx, begin)
245 }
246
247 cs := &clientStream{
248 callHdr: callHdr,
249 ctx: ctx,
250 methodConfig: &mc,
251 opts: opts,
252 callInfo: c,
253 cc: cc,
254 desc: desc,
255 codec: c.codec,
256 cp: cp,
257 comp: comp,
258 cancel: cancel,
259 beginTime: beginTime,
260 firstAttempt: true,
261 }
262 if !cc.dopts.disableRetry {
263 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
264 }
265
266 cs.callInfo.stream = cs
267 // Only this initial attempt has stats/tracing.
268 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
269 if err := cs.newAttemptLocked(sh, trInfo); err != nil {
270 cs.finish(err)
271 return nil, err
272 }
273
274 op := func(a *csAttempt) error { return a.newStream() }
275 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
276 cs.finish(err)
277 return nil, err
278 }
279
280 if desc != unaryStreamDesc {
281 // Listen on cc and stream contexts to cleanup when the user closes the
282 // ClientConn or cancels the stream context. In all other cases, an error
283 // should already be injected into the recv buffer by the transport, which
284 // the client will eventually receive, and then we will cancel the stream's
285 // context in clientStream.finish.
286 go func() {
287 select {
288 case <-cc.ctx.Done():
289 cs.finish(ErrClientConnClosing)
290 case <-ctx.Done():
291 cs.finish(toRPCErr(ctx.Err()))
292 }
293 }()
294 }
295 return cs, nil
296}
297
298func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error {
299 cs.attempt = &csAttempt{
300 cs: cs,
301 dc: cs.cc.dopts.dc,
302 statsHandler: sh,
303 trInfo: trInfo,
304 }
305
306 if err := cs.ctx.Err(); err != nil {
307 return toRPCErr(err)
308 }
309 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
310 if err != nil {
311 return err
312 }
313 cs.attempt.t = t
314 cs.attempt.done = done
315 return nil
316}
317
318func (a *csAttempt) newStream() error {
319 cs := a.cs
320 cs.callHdr.PreviousAttempts = cs.numRetries
321 s, err := a.t.NewStream(cs.ctx, cs.callHdr)
322 if err != nil {
323 return toRPCErr(err)
324 }
325 cs.attempt.s = s
326 cs.attempt.p = &parser{r: s}
327 return nil
328}
329
330// clientStream implements a client side Stream.
331type clientStream struct {
332 callHdr *transport.CallHdr
333 opts []CallOption
334 callInfo *callInfo
335 cc *ClientConn
336 desc *StreamDesc
337
338 codec baseCodec
339 cp Compressor
340 comp encoding.Compressor
341
342 cancel context.CancelFunc // cancels all attempts
343
344 sentLast bool // sent an end stream
345 beginTime time.Time
346
347 methodConfig *MethodConfig
348
349 ctx context.Context // the application's context, wrapped by stats/tracing
350
351 retryThrottler *retryThrottler // The throttler active when the RPC began.
352
353 mu sync.Mutex
354 firstAttempt bool // if true, transparent retry is valid
355 numRetries int // exclusive of transparent retry attempt(s)
356 numRetriesSincePushback int // retries since pushback; to reset backoff
357 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
358 attempt *csAttempt // the active client stream attempt
359 // TODO(hedging): hedging will have multiple attempts simultaneously.
360 committed bool // active attempt committed for retry?
361 buffer []func(a *csAttempt) error // operations to replay on retry
362 bufferSize int // current size of buffer
363}
364
365// csAttempt implements a single transport stream attempt within a
366// clientStream.
367type csAttempt struct {
368 cs *clientStream
369 t transport.ClientTransport
370 s *transport.Stream
371 p *parser
372 done func(balancer.DoneInfo)
373
374 finished bool
375 dc Decompressor
376 decomp encoding.Compressor
377 decompSet bool
378
379 mu sync.Mutex // guards trInfo.tr
380 // trInfo.tr is set when created (if EnableTracing is true),
381 // and cleared when the finish method is called.
382 trInfo traceInfo
383
384 statsHandler stats.Handler
385}
386
387func (cs *clientStream) commitAttemptLocked() {
388 cs.committed = true
389 cs.buffer = nil
390}
391
392func (cs *clientStream) commitAttempt() {
393 cs.mu.Lock()
394 cs.commitAttemptLocked()
395 cs.mu.Unlock()
396}
397
398// shouldRetry returns nil if the RPC should be retried; otherwise it returns
399// the error that should be returned by the operation.
400func (cs *clientStream) shouldRetry(err error) error {
401 if cs.attempt.s == nil && !cs.callInfo.failFast {
402 // In the event of any error from NewStream (attempt.s == nil), we
403 // never attempted to write anything to the wire, so we can retry
404 // indefinitely for non-fail-fast RPCs.
405 return nil
406 }
407 if cs.finished || cs.committed {
408 // RPC is finished or committed; cannot retry.
409 return err
410 }
411 // Wait for the trailers.
412 if cs.attempt.s != nil {
413 <-cs.attempt.s.Done()
414 }
415 if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
416 // First attempt, wait-for-ready, stream unprocessed: transparently retry.
417 cs.firstAttempt = false
418 return nil
419 }
420 cs.firstAttempt = false
421 if cs.cc.dopts.disableRetry {
422 return err
423 }
424
425 pushback := 0
426 hasPushback := false
427 if cs.attempt.s != nil {
428 if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil {
429 // Context error; stop now.
430 return toErr
431 } else if !to {
432 return err
433 }
434
435 // TODO(retry): Move down if the spec changes to not check server pushback
436 // before considering this a failure for throttling.
437 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
438 if len(sps) == 1 {
439 var e error
440 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
441 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
442 cs.retryThrottler.throttle() // This counts as a failure for throttling.
443 return err
444 }
445 hasPushback = true
446 } else if len(sps) > 1 {
447 grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
448 cs.retryThrottler.throttle() // This counts as a failure for throttling.
449 return err
450 }
451 }
452
453 var code codes.Code
454 if cs.attempt.s != nil {
455 code = cs.attempt.s.Status().Code()
456 } else {
457 code = status.Convert(err).Code()
458 }
459
460 rp := cs.methodConfig.retryPolicy
461 if rp == nil || !rp.retryableStatusCodes[code] {
462 return err
463 }
464
465 // Note: the ordering here is important; we count this as a failure
466 // only if the code matched a retryable code.
467 if cs.retryThrottler.throttle() {
468 return err
469 }
470 if cs.numRetries+1 >= rp.maxAttempts {
471 return err
472 }
473
474 var dur time.Duration
475 if hasPushback {
476 dur = time.Millisecond * time.Duration(pushback)
477 cs.numRetriesSincePushback = 0
478 } else {
479 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
480 cur := float64(rp.initialBackoff) * fact
481 if max := float64(rp.maxBackoff); cur > max {
482 cur = max
483 }
484 dur = time.Duration(grpcrand.Int63n(int64(cur)))
485 cs.numRetriesSincePushback++
486 }
487
488 // TODO(dfawley): we could eagerly fail here if dur puts us past the
489 // deadline, but unsure if it is worth doing.
490 t := time.NewTimer(dur)
491 select {
492 case <-t.C:
493 cs.numRetries++
494 return nil
495 case <-cs.ctx.Done():
496 t.Stop()
497 return status.FromContextError(cs.ctx.Err()).Err()
498 }
499}
500
501// Returns nil if a retry was performed and succeeded; error otherwise.
502func (cs *clientStream) retryLocked(lastErr error) error {
503 for {
504 cs.attempt.finish(lastErr)
505 if err := cs.shouldRetry(lastErr); err != nil {
506 cs.commitAttemptLocked()
507 return err
508 }
509 if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
510 return err
511 }
512 if lastErr = cs.replayBufferLocked(); lastErr == nil {
513 return nil
514 }
515 }
516}
517
518func (cs *clientStream) Context() context.Context {
519 cs.commitAttempt()
520 // No need to lock before using attempt, since we know it is committed and
521 // cannot change.
522 return cs.attempt.s.Context()
523}
524
525func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
526 cs.mu.Lock()
527 for {
528 if cs.committed {
529 cs.mu.Unlock()
530 return op(cs.attempt)
531 }
532 a := cs.attempt
533 cs.mu.Unlock()
534 err := op(a)
535 cs.mu.Lock()
536 if a != cs.attempt {
537 // We started another attempt already.
538 continue
539 }
540 if err == io.EOF {
541 <-a.s.Done()
542 }
543 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
544 onSuccess()
545 cs.mu.Unlock()
546 return err
547 }
548 if err := cs.retryLocked(err); err != nil {
549 cs.mu.Unlock()
550 return err
551 }
552 }
553}
554
555func (cs *clientStream) Header() (metadata.MD, error) {
556 var m metadata.MD
557 err := cs.withRetry(func(a *csAttempt) error {
558 var err error
559 m, err = a.s.Header()
560 return toRPCErr(err)
561 }, cs.commitAttemptLocked)
562 if err != nil {
563 cs.finish(err)
564 }
565 return m, err
566}
567
568func (cs *clientStream) Trailer() metadata.MD {
569 // On RPC failure, we never need to retry, because usage requires that
570 // RecvMsg() returned a non-nil error before calling this function is valid.
571 // We would have retried earlier if necessary.
572 //
573 // Commit the attempt anyway, just in case users are not following those
574 // directions -- it will prevent races and should not meaningfully impact
575 // performance.
576 cs.commitAttempt()
577 if cs.attempt.s == nil {
578 return nil
579 }
580 return cs.attempt.s.Trailer()
581}
582
583func (cs *clientStream) replayBufferLocked() error {
584 a := cs.attempt
585 for _, f := range cs.buffer {
586 if err := f(a); err != nil {
587 return err
588 }
589 }
590 return nil
591}
592
593func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
594 // Note: we still will buffer if retry is disabled (for transparent retries).
595 if cs.committed {
596 return
597 }
598 cs.bufferSize += sz
599 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
600 cs.commitAttemptLocked()
601 return
602 }
603 cs.buffer = append(cs.buffer, op)
604}
605
606func (cs *clientStream) SendMsg(m interface{}) (err error) {
607 defer func() {
608 if err != nil && err != io.EOF {
609 // Call finish on the client stream for errors generated by this SendMsg
610 // call, as these indicate problems created by this client. (Transport
611 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
612 // error will be returned from RecvMsg eventually in that case, or be
613 // retried.)
614 cs.finish(err)
615 }
616 }()
617 if cs.sentLast {
618 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
619 }
620 if !cs.desc.ClientStreams {
621 cs.sentLast = true
622 }
623 data, err := encode(cs.codec, m)
624 if err != nil {
625 return err
626 }
627 compData, err := compress(data, cs.cp, cs.comp)
628 if err != nil {
629 return err
630 }
631 hdr, payload := msgHeader(data, compData)
632 // TODO(dfawley): should we be checking len(data) instead?
633 if len(payload) > *cs.callInfo.maxSendMessageSize {
634 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
635 }
636 op := func(a *csAttempt) error {
637 err := a.sendMsg(m, hdr, payload, data)
638 // nil out the message and uncomp when replaying; they are only needed for
639 // stats which is disabled for subsequent attempts.
640 m, data = nil, nil
641 return err
642 }
643 return cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
644}
645
646func (cs *clientStream) RecvMsg(m interface{}) error {
647 err := cs.withRetry(func(a *csAttempt) error {
648 return a.recvMsg(m)
649 }, cs.commitAttemptLocked)
650 if err != nil || !cs.desc.ServerStreams {
651 // err != nil or non-server-streaming indicates end of stream.
652 cs.finish(err)
653 }
654 return err
655}
656
657func (cs *clientStream) CloseSend() error {
658 if cs.sentLast {
659 // TODO: return an error and finish the stream instead, due to API misuse?
660 return nil
661 }
662 cs.sentLast = true
663 op := func(a *csAttempt) error {
664 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
665 // Always return nil; io.EOF is the only error that might make sense
666 // instead, but there is no need to signal the client to call RecvMsg
667 // as the only use left for the stream after CloseSend is to call
668 // RecvMsg. This also matches historical behavior.
669 return nil
670 }
671 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
672 // We never returned an error here for reasons.
673 return nil
674}
675
676func (cs *clientStream) finish(err error) {
677 if err == io.EOF {
678 // Ending a stream with EOF indicates a success.
679 err = nil
680 }
681 cs.mu.Lock()
682 if cs.finished {
683 cs.mu.Unlock()
684 return
685 }
686 cs.finished = true
687 cs.commitAttemptLocked()
688 cs.mu.Unlock()
689 if err == nil {
690 cs.retryThrottler.successfulRPC()
691 }
692 if channelz.IsOn() {
693 if err != nil {
694 cs.cc.incrCallsFailed()
695 } else {
696 cs.cc.incrCallsSucceeded()
697 }
698 }
699 if cs.attempt != nil {
700 cs.attempt.finish(err)
701 }
702 // after functions all rely upon having a stream.
703 if cs.attempt.s != nil {
704 for _, o := range cs.opts {
705 o.after(cs.callInfo)
706 }
707 }
708 cs.cancel()
709}
710
711func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
712 cs := a.cs
713 if EnableTracing {
714 a.mu.Lock()
715 if a.trInfo.tr != nil {
716 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
717 }
718 a.mu.Unlock()
719 }
720 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
721 if !cs.desc.ClientStreams {
722 // For non-client-streaming RPCs, we return nil instead of EOF on error
723 // because the generated code requires it. finish is not called; RecvMsg()
724 // will call it with the stream's status independently.
725 return nil
726 }
727 return io.EOF
728 }
729 if a.statsHandler != nil {
730 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
731 }
732 if channelz.IsOn() {
733 a.t.IncrMsgSent()
734 }
735 return nil
736}
737
738func (a *csAttempt) recvMsg(m interface{}) (err error) {
739 cs := a.cs
740 var inPayload *stats.InPayload
741 if a.statsHandler != nil {
742 inPayload = &stats.InPayload{
743 Client: true,
744 }
745 }
746 if !a.decompSet {
747 // Block until we receive headers containing received message encoding.
748 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
749 if a.dc == nil || a.dc.Type() != ct {
750 // No configured decompressor, or it does not match the incoming
751 // message encoding; attempt to find a registered compressor that does.
752 a.dc = nil
753 a.decomp = encoding.GetCompressor(ct)
754 }
755 } else {
756 // No compression is used; disable our decompressor.
757 a.dc = nil
758 }
759 // Only initialize this state once per stream.
760 a.decompSet = true
761 }
762 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, inPayload, a.decomp)
763 if err != nil {
764 if err == io.EOF {
765 if statusErr := a.s.Status().Err(); statusErr != nil {
766 return statusErr
767 }
768 return io.EOF // indicates successful end of stream.
769 }
770 return toRPCErr(err)
771 }
772 if EnableTracing {
773 a.mu.Lock()
774 if a.trInfo.tr != nil {
775 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
776 }
777 a.mu.Unlock()
778 }
779 if inPayload != nil {
780 a.statsHandler.HandleRPC(cs.ctx, inPayload)
781 }
782 if channelz.IsOn() {
783 a.t.IncrMsgRecv()
784 }
785 if cs.desc.ServerStreams {
786 // Subsequent messages should be received by subsequent RecvMsg calls.
787 return nil
788 }
789
790 // Special handling for non-server-stream rpcs.
791 // This recv expects EOF or errors, so we don't collect inPayload.
792 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
793 if err == nil {
794 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
795 }
796 if err == io.EOF {
797 return a.s.Status().Err() // non-server streaming Recv returns nil on success
798 }
799 return toRPCErr(err)
800}
801
802func (a *csAttempt) finish(err error) {
803 a.mu.Lock()
804 if a.finished {
805 a.mu.Unlock()
806 return
807 }
808 a.finished = true
809 if err == io.EOF {
810 // Ending a stream with EOF indicates a success.
811 err = nil
812 }
813 if a.s != nil {
814 a.t.CloseStream(a.s, err)
815 }
816
817 if a.done != nil {
818 br := false
819 var tr metadata.MD
820 if a.s != nil {
821 br = a.s.BytesReceived()
822 tr = a.s.Trailer()
823 }
824 a.done(balancer.DoneInfo{
825 Err: err,
826 Trailer: tr,
827 BytesSent: a.s != nil,
828 BytesReceived: br,
829 })
830 }
831 if a.statsHandler != nil {
832 end := &stats.End{
833 Client: true,
834 BeginTime: a.cs.beginTime,
835 EndTime: time.Now(),
836 Error: err,
837 }
838 a.statsHandler.HandleRPC(a.cs.ctx, end)
839 }
840 if a.trInfo.tr != nil {
841 if err == nil {
842 a.trInfo.tr.LazyPrintf("RPC: [OK]")
843 } else {
844 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
845 a.trInfo.tr.SetError()
846 }
847 a.trInfo.tr.Finish()
848 a.trInfo.tr = nil
849 }
850 a.mu.Unlock()
851}
852
853// ServerStream defines the server-side behavior of a streaming RPC.
854//
855// All errors returned from ServerStream methods are compatible with the
856// status package.
857type ServerStream interface {
858 // SetHeader sets the header metadata. It may be called multiple times.
859 // When call multiple times, all the provided metadata will be merged.
860 // All the metadata will be sent out when one of the following happens:
861 // - ServerStream.SendHeader() is called;
862 // - The first response is sent out;
863 // - An RPC status is sent out (error or success).
864 SetHeader(metadata.MD) error
865 // SendHeader sends the header metadata.
866 // The provided md and headers set by SetHeader() will be sent.
867 // It fails if called multiple times.
868 SendHeader(metadata.MD) error
869 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
870 // When called more than once, all the provided metadata will be merged.
871 SetTrailer(metadata.MD)
872 // Context returns the context for this stream.
873 Context() context.Context
874 // SendMsg sends a message. On error, SendMsg aborts the stream and the
875 // error is returned directly.
876 //
877 // SendMsg blocks until:
878 // - There is sufficient flow control to schedule m with the transport, or
879 // - The stream is done, or
880 // - The stream breaks.
881 //
882 // SendMsg does not wait until the message is received by the client. An
883 // untimely stream closure may result in lost messages.
884 //
885 // It is safe to have a goroutine calling SendMsg and another goroutine
886 // calling RecvMsg on the same stream at the same time, but it is not safe
887 // to call SendMsg on the same stream in different goroutines.
888 SendMsg(m interface{}) error
889 // RecvMsg blocks until it receives a message into m or the stream is
890 // done. It returns io.EOF when the client has performed a CloseSend. On
891 // any non-EOF error, the stream is aborted and the error contains the
892 // RPC status.
893 //
894 // It is safe to have a goroutine calling SendMsg and another goroutine
895 // calling RecvMsg on the same stream at the same time, but it is not
896 // safe to call RecvMsg on the same stream in different goroutines.
897 RecvMsg(m interface{}) error
898}
899
900// serverStream implements a server side Stream.
901type serverStream struct {
902 ctx context.Context
903 t transport.ServerTransport
904 s *transport.Stream
905 p *parser
906 codec baseCodec
907
908 cp Compressor
909 dc Decompressor
910 comp encoding.Compressor
911 decomp encoding.Compressor
912
913 maxReceiveMessageSize int
914 maxSendMessageSize int
915 trInfo *traceInfo
916
917 statsHandler stats.Handler
918
919 mu sync.Mutex // protects trInfo.tr after the service handler runs.
920}
921
922func (ss *serverStream) Context() context.Context {
923 return ss.ctx
924}
925
926func (ss *serverStream) SetHeader(md metadata.MD) error {
927 if md.Len() == 0 {
928 return nil
929 }
930 return ss.s.SetHeader(md)
931}
932
933func (ss *serverStream) SendHeader(md metadata.MD) error {
934 return ss.t.WriteHeader(ss.s, md)
935}
936
937func (ss *serverStream) SetTrailer(md metadata.MD) {
938 if md.Len() == 0 {
939 return
940 }
941 ss.s.SetTrailer(md)
942}
943
944func (ss *serverStream) SendMsg(m interface{}) (err error) {
945 defer func() {
946 if ss.trInfo != nil {
947 ss.mu.Lock()
948 if ss.trInfo.tr != nil {
949 if err == nil {
950 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
951 } else {
952 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
953 ss.trInfo.tr.SetError()
954 }
955 }
956 ss.mu.Unlock()
957 }
958 if err != nil && err != io.EOF {
959 st, _ := status.FromError(toRPCErr(err))
960 ss.t.WriteStatus(ss.s, st)
961 }
962 if channelz.IsOn() && err == nil {
963 ss.t.IncrMsgSent()
964 }
965 }()
966 data, err := encode(ss.codec, m)
967 if err != nil {
968 return err
969 }
970 compData, err := compress(data, ss.cp, ss.comp)
971 if err != nil {
972 return err
973 }
974 hdr, payload := msgHeader(data, compData)
975 // TODO(dfawley): should we be checking len(data) instead?
976 if len(payload) > ss.maxSendMessageSize {
977 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
978 }
979 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
980 return toRPCErr(err)
981 }
982 if ss.statsHandler != nil {
983 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
984 }
985 return nil
986}
987
988func (ss *serverStream) RecvMsg(m interface{}) (err error) {
989 defer func() {
990 if ss.trInfo != nil {
991 ss.mu.Lock()
992 if ss.trInfo.tr != nil {
993 if err == nil {
994 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
995 } else if err != io.EOF {
996 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
997 ss.trInfo.tr.SetError()
998 }
999 }
1000 ss.mu.Unlock()
1001 }
1002 if err != nil && err != io.EOF {
1003 st, _ := status.FromError(toRPCErr(err))
1004 ss.t.WriteStatus(ss.s, st)
1005 }
1006 if channelz.IsOn() && err == nil {
1007 ss.t.IncrMsgRecv()
1008 }
1009 }()
1010 var inPayload *stats.InPayload
1011 if ss.statsHandler != nil {
1012 inPayload = &stats.InPayload{}
1013 }
1014 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload, ss.decomp); err != nil {
1015 if err == io.EOF {
1016 return err
1017 }
1018 if err == io.ErrUnexpectedEOF {
1019 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1020 }
1021 return toRPCErr(err)
1022 }
1023 if inPayload != nil {
1024 ss.statsHandler.HandleRPC(ss.s.Context(), inPayload)
1025 }
1026 return nil
1027}
1028
1029// MethodFromServerStream returns the method string for the input stream.
1030// The returned string is in the format of "/service/method".
1031func MethodFromServerStream(stream ServerStream) (string, bool) {
1032 return Method(stream.Context())
1033}