blob: 920da5e0138a49e115719a9d65148e057fd41e29 [file] [log] [blame]
Serge Bazanskicc25bdf2018-10-25 14:02:58 +02001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "errors"
23 "fmt"
24 "io"
25 "math"
26 "net"
27 "net/http"
28 "reflect"
29 "runtime"
30 "strings"
31 "sync"
32 "sync/atomic"
33 "time"
34
35 "golang.org/x/net/context"
36 "golang.org/x/net/trace"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
42 "google.golang.org/grpc/grpclog"
43 "google.golang.org/grpc/internal/channelz"
44 "google.golang.org/grpc/internal/transport"
45 "google.golang.org/grpc/keepalive"
46 "google.golang.org/grpc/metadata"
47 "google.golang.org/grpc/stats"
48 "google.golang.org/grpc/status"
49 "google.golang.org/grpc/tap"
50)
51
52const (
53 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
54 defaultServerMaxSendMessageSize = math.MaxInt32
55)
56
57type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
58
59// MethodDesc represents an RPC service's method specification.
60type MethodDesc struct {
61 MethodName string
62 Handler methodHandler
63}
64
65// ServiceDesc represents an RPC service's specification.
66type ServiceDesc struct {
67 ServiceName string
68 // The pointer to the service interface. Used to check whether the user
69 // provided implementation satisfies the interface requirements.
70 HandlerType interface{}
71 Methods []MethodDesc
72 Streams []StreamDesc
73 Metadata interface{}
74}
75
76// service consists of the information of the server serving this service and
77// the methods in this service.
78type service struct {
79 server interface{} // the server for service methods
80 md map[string]*MethodDesc
81 sd map[string]*StreamDesc
82 mdata interface{}
83}
84
85// Server is a gRPC server to serve RPC requests.
86type Server struct {
87 opts options
88
89 mu sync.Mutex // guards following
90 lis map[net.Listener]bool
91 conns map[io.Closer]bool
92 serve bool
93 drain bool
94 cv *sync.Cond // signaled when connections close for GracefulStop
95 m map[string]*service // service name -> service info
96 events trace.EventLog
97
98 quit chan struct{}
99 done chan struct{}
100 quitOnce sync.Once
101 doneOnce sync.Once
102 channelzRemoveOnce sync.Once
103 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
104
105 channelzID int64 // channelz unique identification number
106 czData *channelzData
107}
108
109type options struct {
110 creds credentials.TransportCredentials
111 codec baseCodec
112 cp Compressor
113 dc Decompressor
114 unaryInt UnaryServerInterceptor
115 streamInt StreamServerInterceptor
116 inTapHandle tap.ServerInHandle
117 statsHandler stats.Handler
118 maxConcurrentStreams uint32
119 maxReceiveMessageSize int
120 maxSendMessageSize int
121 unknownStreamDesc *StreamDesc
122 keepaliveParams keepalive.ServerParameters
123 keepalivePolicy keepalive.EnforcementPolicy
124 initialWindowSize int32
125 initialConnWindowSize int32
126 writeBufferSize int
127 readBufferSize int
128 connectionTimeout time.Duration
129 maxHeaderListSize *uint32
130}
131
132var defaultServerOptions = options{
133 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
134 maxSendMessageSize: defaultServerMaxSendMessageSize,
135 connectionTimeout: 120 * time.Second,
136 writeBufferSize: defaultWriteBufSize,
137 readBufferSize: defaultReadBufSize,
138}
139
140// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
141type ServerOption func(*options)
142
143// WriteBufferSize determines how much data can be batched before doing a write on the wire.
144// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
145// The default value for this buffer is 32KB.
146// Zero will disable the write buffer such that each write will be on underlying connection.
147// Note: A Send call may not directly translate to a write.
148func WriteBufferSize(s int) ServerOption {
149 return func(o *options) {
150 o.writeBufferSize = s
151 }
152}
153
154// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
155// for one read syscall.
156// The default value for this buffer is 32KB.
157// Zero will disable read buffer for a connection so data framer can access the underlying
158// conn directly.
159func ReadBufferSize(s int) ServerOption {
160 return func(o *options) {
161 o.readBufferSize = s
162 }
163}
164
165// InitialWindowSize returns a ServerOption that sets window size for stream.
166// The lower bound for window size is 64K and any value smaller than that will be ignored.
167func InitialWindowSize(s int32) ServerOption {
168 return func(o *options) {
169 o.initialWindowSize = s
170 }
171}
172
173// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
174// The lower bound for window size is 64K and any value smaller than that will be ignored.
175func InitialConnWindowSize(s int32) ServerOption {
176 return func(o *options) {
177 o.initialConnWindowSize = s
178 }
179}
180
181// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
182func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
183 return func(o *options) {
184 o.keepaliveParams = kp
185 }
186}
187
188// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
189func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
190 return func(o *options) {
191 o.keepalivePolicy = kep
192 }
193}
194
195// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
196//
197// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
198func CustomCodec(codec Codec) ServerOption {
199 return func(o *options) {
200 o.codec = codec
201 }
202}
203
204// RPCCompressor returns a ServerOption that sets a compressor for outbound
205// messages. For backward compatibility, all outbound messages will be sent
206// using this compressor, regardless of incoming message compression. By
207// default, server messages will be sent using the same compressor with which
208// request messages were sent.
209//
210// Deprecated: use encoding.RegisterCompressor instead.
211func RPCCompressor(cp Compressor) ServerOption {
212 return func(o *options) {
213 o.cp = cp
214 }
215}
216
217// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
218// messages. It has higher priority than decompressors registered via
219// encoding.RegisterCompressor.
220//
221// Deprecated: use encoding.RegisterCompressor instead.
222func RPCDecompressor(dc Decompressor) ServerOption {
223 return func(o *options) {
224 o.dc = dc
225 }
226}
227
228// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
229// If this is not set, gRPC uses the default limit.
230//
231// Deprecated: use MaxRecvMsgSize instead.
232func MaxMsgSize(m int) ServerOption {
233 return MaxRecvMsgSize(m)
234}
235
236// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
237// If this is not set, gRPC uses the default 4MB.
238func MaxRecvMsgSize(m int) ServerOption {
239 return func(o *options) {
240 o.maxReceiveMessageSize = m
241 }
242}
243
244// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
245// If this is not set, gRPC uses the default 4MB.
246func MaxSendMsgSize(m int) ServerOption {
247 return func(o *options) {
248 o.maxSendMessageSize = m
249 }
250}
251
252// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
253// of concurrent streams to each ServerTransport.
254func MaxConcurrentStreams(n uint32) ServerOption {
255 return func(o *options) {
256 o.maxConcurrentStreams = n
257 }
258}
259
260// Creds returns a ServerOption that sets credentials for server connections.
261func Creds(c credentials.TransportCredentials) ServerOption {
262 return func(o *options) {
263 o.creds = c
264 }
265}
266
267// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
268// server. Only one unary interceptor can be installed. The construction of multiple
269// interceptors (e.g., chaining) can be implemented at the caller.
270func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
271 return func(o *options) {
272 if o.unaryInt != nil {
273 panic("The unary server interceptor was already set and may not be reset.")
274 }
275 o.unaryInt = i
276 }
277}
278
279// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
280// server. Only one stream interceptor can be installed.
281func StreamInterceptor(i StreamServerInterceptor) ServerOption {
282 return func(o *options) {
283 if o.streamInt != nil {
284 panic("The stream server interceptor was already set and may not be reset.")
285 }
286 o.streamInt = i
287 }
288}
289
290// InTapHandle returns a ServerOption that sets the tap handle for all the server
291// transport to be created. Only one can be installed.
292func InTapHandle(h tap.ServerInHandle) ServerOption {
293 return func(o *options) {
294 if o.inTapHandle != nil {
295 panic("The tap handle was already set and may not be reset.")
296 }
297 o.inTapHandle = h
298 }
299}
300
301// StatsHandler returns a ServerOption that sets the stats handler for the server.
302func StatsHandler(h stats.Handler) ServerOption {
303 return func(o *options) {
304 o.statsHandler = h
305 }
306}
307
308// UnknownServiceHandler returns a ServerOption that allows for adding a custom
309// unknown service handler. The provided method is a bidi-streaming RPC service
310// handler that will be invoked instead of returning the "unimplemented" gRPC
311// error whenever a request is received for an unregistered service or method.
312// The handling function has full access to the Context of the request and the
313// stream, and the invocation bypasses interceptors.
314func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
315 return func(o *options) {
316 o.unknownStreamDesc = &StreamDesc{
317 StreamName: "unknown_service_handler",
318 Handler: streamHandler,
319 // We need to assume that the users of the streamHandler will want to use both.
320 ClientStreams: true,
321 ServerStreams: true,
322 }
323 }
324}
325
326// ConnectionTimeout returns a ServerOption that sets the timeout for
327// connection establishment (up to and including HTTP/2 handshaking) for all
328// new connections. If this is not set, the default is 120 seconds. A zero or
329// negative value will result in an immediate timeout.
330//
331// This API is EXPERIMENTAL.
332func ConnectionTimeout(d time.Duration) ServerOption {
333 return func(o *options) {
334 o.connectionTimeout = d
335 }
336}
337
338// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
339// of header list that the server is prepared to accept.
340func MaxHeaderListSize(s uint32) ServerOption {
341 return func(o *options) {
342 o.maxHeaderListSize = &s
343 }
344}
345
346// NewServer creates a gRPC server which has no service registered and has not
347// started to accept requests yet.
348func NewServer(opt ...ServerOption) *Server {
349 opts := defaultServerOptions
350 for _, o := range opt {
351 o(&opts)
352 }
353 s := &Server{
354 lis: make(map[net.Listener]bool),
355 opts: opts,
356 conns: make(map[io.Closer]bool),
357 m: make(map[string]*service),
358 quit: make(chan struct{}),
359 done: make(chan struct{}),
360 czData: new(channelzData),
361 }
362 s.cv = sync.NewCond(&s.mu)
363 if EnableTracing {
364 _, file, line, _ := runtime.Caller(1)
365 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
366 }
367
368 if channelz.IsOn() {
369 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
370 }
371 return s
372}
373
374// printf records an event in s's event log, unless s has been stopped.
375// REQUIRES s.mu is held.
376func (s *Server) printf(format string, a ...interface{}) {
377 if s.events != nil {
378 s.events.Printf(format, a...)
379 }
380}
381
382// errorf records an error in s's event log, unless s has been stopped.
383// REQUIRES s.mu is held.
384func (s *Server) errorf(format string, a ...interface{}) {
385 if s.events != nil {
386 s.events.Errorf(format, a...)
387 }
388}
389
390// RegisterService registers a service and its implementation to the gRPC
391// server. It is called from the IDL generated code. This must be called before
392// invoking Serve.
393func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
394 ht := reflect.TypeOf(sd.HandlerType).Elem()
395 st := reflect.TypeOf(ss)
396 if !st.Implements(ht) {
397 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
398 }
399 s.register(sd, ss)
400}
401
402func (s *Server) register(sd *ServiceDesc, ss interface{}) {
403 s.mu.Lock()
404 defer s.mu.Unlock()
405 s.printf("RegisterService(%q)", sd.ServiceName)
406 if s.serve {
407 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
408 }
409 if _, ok := s.m[sd.ServiceName]; ok {
410 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
411 }
412 srv := &service{
413 server: ss,
414 md: make(map[string]*MethodDesc),
415 sd: make(map[string]*StreamDesc),
416 mdata: sd.Metadata,
417 }
418 for i := range sd.Methods {
419 d := &sd.Methods[i]
420 srv.md[d.MethodName] = d
421 }
422 for i := range sd.Streams {
423 d := &sd.Streams[i]
424 srv.sd[d.StreamName] = d
425 }
426 s.m[sd.ServiceName] = srv
427}
428
429// MethodInfo contains the information of an RPC including its method name and type.
430type MethodInfo struct {
431 // Name is the method name only, without the service name or package name.
432 Name string
433 // IsClientStream indicates whether the RPC is a client streaming RPC.
434 IsClientStream bool
435 // IsServerStream indicates whether the RPC is a server streaming RPC.
436 IsServerStream bool
437}
438
439// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
440type ServiceInfo struct {
441 Methods []MethodInfo
442 // Metadata is the metadata specified in ServiceDesc when registering service.
443 Metadata interface{}
444}
445
446// GetServiceInfo returns a map from service names to ServiceInfo.
447// Service names include the package names, in the form of <package>.<service>.
448func (s *Server) GetServiceInfo() map[string]ServiceInfo {
449 ret := make(map[string]ServiceInfo)
450 for n, srv := range s.m {
451 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
452 for m := range srv.md {
453 methods = append(methods, MethodInfo{
454 Name: m,
455 IsClientStream: false,
456 IsServerStream: false,
457 })
458 }
459 for m, d := range srv.sd {
460 methods = append(methods, MethodInfo{
461 Name: m,
462 IsClientStream: d.ClientStreams,
463 IsServerStream: d.ServerStreams,
464 })
465 }
466
467 ret[n] = ServiceInfo{
468 Methods: methods,
469 Metadata: srv.mdata,
470 }
471 }
472 return ret
473}
474
475// ErrServerStopped indicates that the operation is now illegal because of
476// the server being stopped.
477var ErrServerStopped = errors.New("grpc: the server has been stopped")
478
479func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
480 if s.opts.creds == nil {
481 return rawConn, nil, nil
482 }
483 return s.opts.creds.ServerHandshake(rawConn)
484}
485
486type listenSocket struct {
487 net.Listener
488 channelzID int64
489}
490
491func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
492 return &channelz.SocketInternalMetric{
493 SocketOptions: channelz.GetSocketOption(l.Listener),
494 LocalAddr: l.Listener.Addr(),
495 }
496}
497
498func (l *listenSocket) Close() error {
499 err := l.Listener.Close()
500 if channelz.IsOn() {
501 channelz.RemoveEntry(l.channelzID)
502 }
503 return err
504}
505
506// Serve accepts incoming connections on the listener lis, creating a new
507// ServerTransport and service goroutine for each. The service goroutines
508// read gRPC requests and then call the registered handlers to reply to them.
509// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
510// this method returns.
511// Serve will return a non-nil error unless Stop or GracefulStop is called.
512func (s *Server) Serve(lis net.Listener) error {
513 s.mu.Lock()
514 s.printf("serving")
515 s.serve = true
516 if s.lis == nil {
517 // Serve called after Stop or GracefulStop.
518 s.mu.Unlock()
519 lis.Close()
520 return ErrServerStopped
521 }
522
523 s.serveWG.Add(1)
524 defer func() {
525 s.serveWG.Done()
526 select {
527 // Stop or GracefulStop called; block until done and return nil.
528 case <-s.quit:
529 <-s.done
530 default:
531 }
532 }()
533
534 ls := &listenSocket{Listener: lis}
535 s.lis[ls] = true
536
537 if channelz.IsOn() {
538 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, "")
539 }
540 s.mu.Unlock()
541
542 defer func() {
543 s.mu.Lock()
544 if s.lis != nil && s.lis[ls] {
545 ls.Close()
546 delete(s.lis, ls)
547 }
548 s.mu.Unlock()
549 }()
550
551 var tempDelay time.Duration // how long to sleep on accept failure
552
553 for {
554 rawConn, err := lis.Accept()
555 if err != nil {
556 if ne, ok := err.(interface {
557 Temporary() bool
558 }); ok && ne.Temporary() {
559 if tempDelay == 0 {
560 tempDelay = 5 * time.Millisecond
561 } else {
562 tempDelay *= 2
563 }
564 if max := 1 * time.Second; tempDelay > max {
565 tempDelay = max
566 }
567 s.mu.Lock()
568 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
569 s.mu.Unlock()
570 timer := time.NewTimer(tempDelay)
571 select {
572 case <-timer.C:
573 case <-s.quit:
574 timer.Stop()
575 return nil
576 }
577 continue
578 }
579 s.mu.Lock()
580 s.printf("done serving; Accept = %v", err)
581 s.mu.Unlock()
582
583 select {
584 case <-s.quit:
585 return nil
586 default:
587 }
588 return err
589 }
590 tempDelay = 0
591 // Start a new goroutine to deal with rawConn so we don't stall this Accept
592 // loop goroutine.
593 //
594 // Make sure we account for the goroutine so GracefulStop doesn't nil out
595 // s.conns before this conn can be added.
596 s.serveWG.Add(1)
597 go func() {
598 s.handleRawConn(rawConn)
599 s.serveWG.Done()
600 }()
601 }
602}
603
604// handleRawConn forks a goroutine to handle a just-accepted connection that
605// has not had any I/O performed on it yet.
606func (s *Server) handleRawConn(rawConn net.Conn) {
607 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
608 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
609 if err != nil {
610 s.mu.Lock()
611 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
612 s.mu.Unlock()
613 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
614 // If serverHandshake returns ErrConnDispatched, keep rawConn open.
615 if err != credentials.ErrConnDispatched {
616 rawConn.Close()
617 }
618 rawConn.SetDeadline(time.Time{})
619 return
620 }
621
622 s.mu.Lock()
623 if s.conns == nil {
624 s.mu.Unlock()
625 conn.Close()
626 return
627 }
628 s.mu.Unlock()
629
630 // Finish handshaking (HTTP2)
631 st := s.newHTTP2Transport(conn, authInfo)
632 if st == nil {
633 return
634 }
635
636 rawConn.SetDeadline(time.Time{})
637 if !s.addConn(st) {
638 return
639 }
640 go func() {
641 s.serveStreams(st)
642 s.removeConn(st)
643 }()
644}
645
646// newHTTP2Transport sets up a http/2 transport (using the
647// gRPC http2 server transport in transport/http2_server.go).
648func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
649 config := &transport.ServerConfig{
650 MaxStreams: s.opts.maxConcurrentStreams,
651 AuthInfo: authInfo,
652 InTapHandle: s.opts.inTapHandle,
653 StatsHandler: s.opts.statsHandler,
654 KeepaliveParams: s.opts.keepaliveParams,
655 KeepalivePolicy: s.opts.keepalivePolicy,
656 InitialWindowSize: s.opts.initialWindowSize,
657 InitialConnWindowSize: s.opts.initialConnWindowSize,
658 WriteBufferSize: s.opts.writeBufferSize,
659 ReadBufferSize: s.opts.readBufferSize,
660 ChannelzParentID: s.channelzID,
661 MaxHeaderListSize: s.opts.maxHeaderListSize,
662 }
663 st, err := transport.NewServerTransport("http2", c, config)
664 if err != nil {
665 s.mu.Lock()
666 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
667 s.mu.Unlock()
668 c.Close()
669 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
670 return nil
671 }
672
673 return st
674}
675
676func (s *Server) serveStreams(st transport.ServerTransport) {
677 defer st.Close()
678 var wg sync.WaitGroup
679 st.HandleStreams(func(stream *transport.Stream) {
680 wg.Add(1)
681 go func() {
682 defer wg.Done()
683 s.handleStream(st, stream, s.traceInfo(st, stream))
684 }()
685 }, func(ctx context.Context, method string) context.Context {
686 if !EnableTracing {
687 return ctx
688 }
689 tr := trace.New("grpc.Recv."+methodFamily(method), method)
690 return trace.NewContext(ctx, tr)
691 })
692 wg.Wait()
693}
694
695var _ http.Handler = (*Server)(nil)
696
697// ServeHTTP implements the Go standard library's http.Handler
698// interface by responding to the gRPC request r, by looking up
699// the requested gRPC method in the gRPC server s.
700//
701// The provided HTTP request must have arrived on an HTTP/2
702// connection. When using the Go standard library's server,
703// practically this means that the Request must also have arrived
704// over TLS.
705//
706// To share one port (such as 443 for https) between gRPC and an
707// existing http.Handler, use a root http.Handler such as:
708//
709// if r.ProtoMajor == 2 && strings.HasPrefix(
710// r.Header.Get("Content-Type"), "application/grpc") {
711// grpcServer.ServeHTTP(w, r)
712// } else {
713// yourMux.ServeHTTP(w, r)
714// }
715//
716// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
717// separate from grpc-go's HTTP/2 server. Performance and features may vary
718// between the two paths. ServeHTTP does not support some gRPC features
719// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
720// and subject to change.
721func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
722 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
723 if err != nil {
724 http.Error(w, err.Error(), http.StatusInternalServerError)
725 return
726 }
727 if !s.addConn(st) {
728 return
729 }
730 defer s.removeConn(st)
731 s.serveStreams(st)
732}
733
734// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
735// If tracing is not enabled, it returns nil.
736func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
737 tr, ok := trace.FromContext(stream.Context())
738 if !ok {
739 return nil
740 }
741
742 trInfo = &traceInfo{
743 tr: tr,
744 }
745 trInfo.firstLine.client = false
746 trInfo.firstLine.remoteAddr = st.RemoteAddr()
747
748 if dl, ok := stream.Context().Deadline(); ok {
749 trInfo.firstLine.deadline = dl.Sub(time.Now())
750 }
751 return trInfo
752}
753
754func (s *Server) addConn(c io.Closer) bool {
755 s.mu.Lock()
756 defer s.mu.Unlock()
757 if s.conns == nil {
758 c.Close()
759 return false
760 }
761 if s.drain {
762 // Transport added after we drained our existing conns: drain it
763 // immediately.
764 c.(transport.ServerTransport).Drain()
765 }
766 s.conns[c] = true
767 return true
768}
769
770func (s *Server) removeConn(c io.Closer) {
771 s.mu.Lock()
772 defer s.mu.Unlock()
773 if s.conns != nil {
774 delete(s.conns, c)
775 s.cv.Broadcast()
776 }
777}
778
779func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
780 return &channelz.ServerInternalMetric{
781 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
782 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
783 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
784 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
785 }
786}
787
788func (s *Server) incrCallsStarted() {
789 atomic.AddInt64(&s.czData.callsStarted, 1)
790 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
791}
792
793func (s *Server) incrCallsSucceeded() {
794 atomic.AddInt64(&s.czData.callsSucceeded, 1)
795}
796
797func (s *Server) incrCallsFailed() {
798 atomic.AddInt64(&s.czData.callsFailed, 1)
799}
800
801func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
802 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
803 if err != nil {
804 grpclog.Errorln("grpc: server failed to encode response: ", err)
805 return err
806 }
807 compData, err := compress(data, cp, comp)
808 if err != nil {
809 grpclog.Errorln("grpc: server failed to compress response: ", err)
810 return err
811 }
812 hdr, payload := msgHeader(data, compData)
813 // TODO(dfawley): should we be checking len(data) instead?
814 if len(payload) > s.opts.maxSendMessageSize {
815 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
816 }
817 err = t.Write(stream, hdr, payload, opts)
818 if err == nil && s.opts.statsHandler != nil {
819 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
820 }
821 return err
822}
823
824func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
825 if channelz.IsOn() {
826 s.incrCallsStarted()
827 defer func() {
828 if err != nil && err != io.EOF {
829 s.incrCallsFailed()
830 } else {
831 s.incrCallsSucceeded()
832 }
833 }()
834 }
835 sh := s.opts.statsHandler
836 if sh != nil {
837 beginTime := time.Now()
838 begin := &stats.Begin{
839 BeginTime: beginTime,
840 }
841 sh.HandleRPC(stream.Context(), begin)
842 defer func() {
843 end := &stats.End{
844 BeginTime: beginTime,
845 EndTime: time.Now(),
846 }
847 if err != nil && err != io.EOF {
848 end.Error = toRPCErr(err)
849 }
850 sh.HandleRPC(stream.Context(), end)
851 }()
852 }
853 if trInfo != nil {
854 defer trInfo.tr.Finish()
855 trInfo.firstLine.client = false
856 trInfo.tr.LazyLog(&trInfo.firstLine, false)
857 defer func() {
858 if err != nil && err != io.EOF {
859 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
860 trInfo.tr.SetError()
861 }
862 }()
863 }
864
865 // comp and cp are used for compression. decomp and dc are used for
866 // decompression. If comp and decomp are both set, they are the same;
867 // however they are kept separate to ensure that at most one of the
868 // compressor/decompressor variable pairs are set for use later.
869 var comp, decomp encoding.Compressor
870 var cp Compressor
871 var dc Decompressor
872
873 // If dc is set and matches the stream's compression, use it. Otherwise, try
874 // to find a matching registered compressor for decomp.
875 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
876 dc = s.opts.dc
877 } else if rc != "" && rc != encoding.Identity {
878 decomp = encoding.GetCompressor(rc)
879 if decomp == nil {
880 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
881 t.WriteStatus(stream, st)
882 return st.Err()
883 }
884 }
885
886 // If cp is set, use it. Otherwise, attempt to compress the response using
887 // the incoming message compression method.
888 //
889 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
890 if s.opts.cp != nil {
891 cp = s.opts.cp
892 stream.SetSendCompress(cp.Type())
893 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
894 // Legacy compressor not specified; attempt to respond with same encoding.
895 comp = encoding.GetCompressor(rc)
896 if comp != nil {
897 stream.SetSendCompress(rc)
898 }
899 }
900
901 var inPayload *stats.InPayload
902 if sh != nil {
903 inPayload = &stats.InPayload{
904 RecvTime: time.Now(),
905 }
906 }
907 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, inPayload, decomp)
908 if err != nil {
909 if st, ok := status.FromError(err); ok {
910 if e := t.WriteStatus(stream, st); e != nil {
911 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
912 }
913 }
914 return err
915 }
916 if channelz.IsOn() {
917 t.IncrMsgRecv()
918 }
919 df := func(v interface{}) error {
920 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
921 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
922 }
923 if inPayload != nil {
924 inPayload.Payload = v
925 inPayload.Data = d
926 inPayload.Length = len(d)
927 sh.HandleRPC(stream.Context(), inPayload)
928 }
929 if trInfo != nil {
930 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
931 }
932 return nil
933 }
934 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
935 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
936 if appErr != nil {
937 appStatus, ok := status.FromError(appErr)
938 if !ok {
939 // Convert appErr if it is not a grpc status error.
940 appErr = status.Error(codes.Unknown, appErr.Error())
941 appStatus, _ = status.FromError(appErr)
942 }
943 if trInfo != nil {
944 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
945 trInfo.tr.SetError()
946 }
947 if e := t.WriteStatus(stream, appStatus); e != nil {
948 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
949 }
950 return appErr
951 }
952 if trInfo != nil {
953 trInfo.tr.LazyLog(stringer("OK"), false)
954 }
955 opts := &transport.Options{Last: true}
956
957 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
958 if err == io.EOF {
959 // The entire stream is done (for unary RPC only).
960 return err
961 }
962 if s, ok := status.FromError(err); ok {
963 if e := t.WriteStatus(stream, s); e != nil {
964 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
965 }
966 } else {
967 switch st := err.(type) {
968 case transport.ConnectionError:
969 // Nothing to do here.
970 default:
971 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
972 }
973 }
974 return err
975 }
976 if channelz.IsOn() {
977 t.IncrMsgSent()
978 }
979 if trInfo != nil {
980 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
981 }
982 // TODO: Should we be logging if writing status failed here, like above?
983 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
984 // error or allow the stats handler to see it?
985 return t.WriteStatus(stream, status.New(codes.OK, ""))
986}
987
988func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
989 if channelz.IsOn() {
990 s.incrCallsStarted()
991 defer func() {
992 if err != nil && err != io.EOF {
993 s.incrCallsFailed()
994 } else {
995 s.incrCallsSucceeded()
996 }
997 }()
998 }
999 sh := s.opts.statsHandler
1000 if sh != nil {
1001 beginTime := time.Now()
1002 begin := &stats.Begin{
1003 BeginTime: beginTime,
1004 }
1005 sh.HandleRPC(stream.Context(), begin)
1006 defer func() {
1007 end := &stats.End{
1008 BeginTime: beginTime,
1009 EndTime: time.Now(),
1010 }
1011 if err != nil && err != io.EOF {
1012 end.Error = toRPCErr(err)
1013 }
1014 sh.HandleRPC(stream.Context(), end)
1015 }()
1016 }
1017 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1018 ss := &serverStream{
1019 ctx: ctx,
1020 t: t,
1021 s: stream,
1022 p: &parser{r: stream},
1023 codec: s.getCodec(stream.ContentSubtype()),
1024 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1025 maxSendMessageSize: s.opts.maxSendMessageSize,
1026 trInfo: trInfo,
1027 statsHandler: sh,
1028 }
1029
1030 // If dc is set and matches the stream's compression, use it. Otherwise, try
1031 // to find a matching registered compressor for decomp.
1032 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1033 ss.dc = s.opts.dc
1034 } else if rc != "" && rc != encoding.Identity {
1035 ss.decomp = encoding.GetCompressor(rc)
1036 if ss.decomp == nil {
1037 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1038 t.WriteStatus(ss.s, st)
1039 return st.Err()
1040 }
1041 }
1042
1043 // If cp is set, use it. Otherwise, attempt to compress the response using
1044 // the incoming message compression method.
1045 //
1046 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1047 if s.opts.cp != nil {
1048 ss.cp = s.opts.cp
1049 stream.SetSendCompress(s.opts.cp.Type())
1050 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1051 // Legacy compressor not specified; attempt to respond with same encoding.
1052 ss.comp = encoding.GetCompressor(rc)
1053 if ss.comp != nil {
1054 stream.SetSendCompress(rc)
1055 }
1056 }
1057
1058 if trInfo != nil {
1059 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1060 defer func() {
1061 ss.mu.Lock()
1062 if err != nil && err != io.EOF {
1063 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1064 ss.trInfo.tr.SetError()
1065 }
1066 ss.trInfo.tr.Finish()
1067 ss.trInfo.tr = nil
1068 ss.mu.Unlock()
1069 }()
1070 }
1071 var appErr error
1072 var server interface{}
1073 if srv != nil {
1074 server = srv.server
1075 }
1076 if s.opts.streamInt == nil {
1077 appErr = sd.Handler(server, ss)
1078 } else {
1079 info := &StreamServerInfo{
1080 FullMethod: stream.Method(),
1081 IsClientStream: sd.ClientStreams,
1082 IsServerStream: sd.ServerStreams,
1083 }
1084 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1085 }
1086 if appErr != nil {
1087 appStatus, ok := status.FromError(appErr)
1088 if !ok {
1089 appStatus = status.New(codes.Unknown, appErr.Error())
1090 appErr = appStatus.Err()
1091 }
1092 if trInfo != nil {
1093 ss.mu.Lock()
1094 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1095 ss.trInfo.tr.SetError()
1096 ss.mu.Unlock()
1097 }
1098 t.WriteStatus(ss.s, appStatus)
1099 // TODO: Should we log an error from WriteStatus here and below?
1100 return appErr
1101 }
1102 if trInfo != nil {
1103 ss.mu.Lock()
1104 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1105 ss.mu.Unlock()
1106 }
1107 return t.WriteStatus(ss.s, status.New(codes.OK, ""))
1108}
1109
1110func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1111 sm := stream.Method()
1112 if sm != "" && sm[0] == '/' {
1113 sm = sm[1:]
1114 }
1115 pos := strings.LastIndex(sm, "/")
1116 if pos == -1 {
1117 if trInfo != nil {
1118 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1119 trInfo.tr.SetError()
1120 }
1121 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1122 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1123 if trInfo != nil {
1124 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1125 trInfo.tr.SetError()
1126 }
1127 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1128 }
1129 if trInfo != nil {
1130 trInfo.tr.Finish()
1131 }
1132 return
1133 }
1134 service := sm[:pos]
1135 method := sm[pos+1:]
1136
1137 if srv, ok := s.m[service]; ok {
1138 if md, ok := srv.md[method]; ok {
1139 s.processUnaryRPC(t, stream, srv, md, trInfo)
1140 return
1141 }
1142 if sd, ok := srv.sd[method]; ok {
1143 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1144 return
1145 }
1146 }
1147 // Unknown service, or known server unknown method.
1148 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1149 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1150 return
1151 }
1152 if trInfo != nil {
1153 trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
1154 trInfo.tr.SetError()
1155 }
1156 errDesc := fmt.Sprintf("unknown service %v", service)
1157 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1158 if trInfo != nil {
1159 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1160 trInfo.tr.SetError()
1161 }
1162 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1163 }
1164 if trInfo != nil {
1165 trInfo.tr.Finish()
1166 }
1167}
1168
1169// The key to save ServerTransportStream in the context.
1170type streamKey struct{}
1171
1172// NewContextWithServerTransportStream creates a new context from ctx and
1173// attaches stream to it.
1174//
1175// This API is EXPERIMENTAL.
1176func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1177 return context.WithValue(ctx, streamKey{}, stream)
1178}
1179
1180// ServerTransportStream is a minimal interface that a transport stream must
1181// implement. This can be used to mock an actual transport stream for tests of
1182// handler code that use, for example, grpc.SetHeader (which requires some
1183// stream to be in context).
1184//
1185// See also NewContextWithServerTransportStream.
1186//
1187// This API is EXPERIMENTAL.
1188type ServerTransportStream interface {
1189 Method() string
1190 SetHeader(md metadata.MD) error
1191 SendHeader(md metadata.MD) error
1192 SetTrailer(md metadata.MD) error
1193}
1194
1195// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1196// ctx. Returns nil if the given context has no stream associated with it
1197// (which implies it is not an RPC invocation context).
1198//
1199// This API is EXPERIMENTAL.
1200func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1201 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1202 return s
1203}
1204
1205// Stop stops the gRPC server. It immediately closes all open
1206// connections and listeners.
1207// It cancels all active RPCs on the server side and the corresponding
1208// pending RPCs on the client side will get notified by connection
1209// errors.
1210func (s *Server) Stop() {
1211 s.quitOnce.Do(func() {
1212 close(s.quit)
1213 })
1214
1215 defer func() {
1216 s.serveWG.Wait()
1217 s.doneOnce.Do(func() {
1218 close(s.done)
1219 })
1220 }()
1221
1222 s.channelzRemoveOnce.Do(func() {
1223 if channelz.IsOn() {
1224 channelz.RemoveEntry(s.channelzID)
1225 }
1226 })
1227
1228 s.mu.Lock()
1229 listeners := s.lis
1230 s.lis = nil
1231 st := s.conns
1232 s.conns = nil
1233 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1234 s.cv.Broadcast()
1235 s.mu.Unlock()
1236
1237 for lis := range listeners {
1238 lis.Close()
1239 }
1240 for c := range st {
1241 c.Close()
1242 }
1243
1244 s.mu.Lock()
1245 if s.events != nil {
1246 s.events.Finish()
1247 s.events = nil
1248 }
1249 s.mu.Unlock()
1250}
1251
1252// GracefulStop stops the gRPC server gracefully. It stops the server from
1253// accepting new connections and RPCs and blocks until all the pending RPCs are
1254// finished.
1255func (s *Server) GracefulStop() {
1256 s.quitOnce.Do(func() {
1257 close(s.quit)
1258 })
1259
1260 defer func() {
1261 s.doneOnce.Do(func() {
1262 close(s.done)
1263 })
1264 }()
1265
1266 s.channelzRemoveOnce.Do(func() {
1267 if channelz.IsOn() {
1268 channelz.RemoveEntry(s.channelzID)
1269 }
1270 })
1271 s.mu.Lock()
1272 if s.conns == nil {
1273 s.mu.Unlock()
1274 return
1275 }
1276
1277 for lis := range s.lis {
1278 lis.Close()
1279 }
1280 s.lis = nil
1281 if !s.drain {
1282 for c := range s.conns {
1283 c.(transport.ServerTransport).Drain()
1284 }
1285 s.drain = true
1286 }
1287
1288 // Wait for serving threads to be ready to exit. Only then can we be sure no
1289 // new conns will be created.
1290 s.mu.Unlock()
1291 s.serveWG.Wait()
1292 s.mu.Lock()
1293
1294 for len(s.conns) != 0 {
1295 s.cv.Wait()
1296 }
1297 s.conns = nil
1298 if s.events != nil {
1299 s.events.Finish()
1300 s.events = nil
1301 }
1302 s.mu.Unlock()
1303}
1304
1305// contentSubtype must be lowercase
1306// cannot return nil
1307func (s *Server) getCodec(contentSubtype string) baseCodec {
1308 if s.opts.codec != nil {
1309 return s.opts.codec
1310 }
1311 if contentSubtype == "" {
1312 return encoding.GetCodec(proto.Name)
1313 }
1314 codec := encoding.GetCodec(contentSubtype)
1315 if codec == nil {
1316 return encoding.GetCodec(proto.Name)
1317 }
1318 return codec
1319}
1320
1321// SetHeader sets the header metadata.
1322// When called multiple times, all the provided metadata will be merged.
1323// All the metadata will be sent out when one of the following happens:
1324// - grpc.SendHeader() is called;
1325// - The first response is sent out;
1326// - An RPC status is sent out (error or success).
1327func SetHeader(ctx context.Context, md metadata.MD) error {
1328 if md.Len() == 0 {
1329 return nil
1330 }
1331 stream := ServerTransportStreamFromContext(ctx)
1332 if stream == nil {
1333 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1334 }
1335 return stream.SetHeader(md)
1336}
1337
1338// SendHeader sends header metadata. It may be called at most once.
1339// The provided md and headers set by SetHeader() will be sent.
1340func SendHeader(ctx context.Context, md metadata.MD) error {
1341 stream := ServerTransportStreamFromContext(ctx)
1342 if stream == nil {
1343 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1344 }
1345 if err := stream.SendHeader(md); err != nil {
1346 return toRPCErr(err)
1347 }
1348 return nil
1349}
1350
1351// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1352// When called more than once, all the provided metadata will be merged.
1353func SetTrailer(ctx context.Context, md metadata.MD) error {
1354 if md.Len() == 0 {
1355 return nil
1356 }
1357 stream := ServerTransportStreamFromContext(ctx)
1358 if stream == nil {
1359 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1360 }
1361 return stream.SetTrailer(md)
1362}
1363
1364// Method returns the method string for the server context. The returned
1365// string is in the format of "/service/method".
1366func Method(ctx context.Context) (string, bool) {
1367 s := ServerTransportStreamFromContext(ctx)
1368 if s == nil {
1369 return "", false
1370 }
1371 return s.Method(), true
1372}
1373
1374type channelzServer struct {
1375 s *Server
1376}
1377
1378func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1379 return c.s.channelzMetric()
1380}