blob: f49ac3f9b839bccb8570fdd4bc83156a8411354d [file] [log] [blame]
Serge Bazanskicc25bdf2018-10-25 14:02:58 +02001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "errors"
23 "fmt"
24 "math"
25 "net"
26 "reflect"
27 "strings"
28 "sync"
29 "sync/atomic"
30 "time"
31
32 "golang.org/x/net/context"
33 "google.golang.org/grpc/balancer"
34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/grpclog"
39 "google.golang.org/grpc/internal/backoff"
40 "google.golang.org/grpc/internal/channelz"
41 "google.golang.org/grpc/internal/transport"
42 "google.golang.org/grpc/keepalive"
43 "google.golang.org/grpc/metadata"
44 "google.golang.org/grpc/resolver"
45 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
46 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
47 "google.golang.org/grpc/status"
48)
49
50const (
51 // minimum time to give a connection to complete
52 minConnectTimeout = 20 * time.Second
53 // must match grpclbName in grpclb/grpclb.go
54 grpclbName = "grpclb"
55)
56
57var (
58 // ErrClientConnClosing indicates that the operation is illegal because
59 // the ClientConn is closing.
60 //
61 // Deprecated: this error should not be relied upon by users; use the status
62 // code of Canceled instead.
63 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
64 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
65 errConnDrain = errors.New("grpc: the connection is drained")
66 // errConnClosing indicates that the connection is closing.
67 errConnClosing = errors.New("grpc: the connection is closing")
68 // errBalancerClosed indicates that the balancer is closed.
69 errBalancerClosed = errors.New("grpc: balancer is closed")
70 // We use an accessor so that minConnectTimeout can be
71 // atomically read and updated while testing.
72 getMinConnectTimeout = func() time.Duration {
73 return minConnectTimeout
74 }
75)
76
77// The following errors are returned from Dial and DialContext
78var (
79 // errNoTransportSecurity indicates that there is no transport security
80 // being set for ClientConn. Users should either set one or explicitly
81 // call WithInsecure DialOption to disable security.
82 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
83 // errTransportCredsAndBundle indicates that creds bundle is used together
84 // with other individual Transport Credentials.
85 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
86 // errTransportCredentialsMissing indicates that users want to transmit security
87 // information (e.g., oauth2 token) which requires secure connection on an insecure
88 // connection.
89 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
90 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
91 // and grpc.WithInsecure() are both called for a connection.
92 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
93)
94
95const (
96 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
97 defaultClientMaxSendMessageSize = math.MaxInt32
98 // http2IOBufSize specifies the buffer size for sending frames.
99 defaultWriteBufSize = 32 * 1024
100 defaultReadBufSize = 32 * 1024
101)
102
103// Dial creates a client connection to the given target.
104func Dial(target string, opts ...DialOption) (*ClientConn, error) {
105 return DialContext(context.Background(), target, opts...)
106}
107
108// DialContext creates a client connection to the given target. By default, it's
109// a non-blocking dial (the function won't wait for connections to be
110// established, and connecting happens in the background). To make it a blocking
111// dial, use WithBlock() dial option.
112//
113// In the non-blocking case, the ctx does not act against the connection. It
114// only controls the setup steps.
115//
116// In the blocking case, ctx can be used to cancel or expire the pending
117// connection. Once this function returns, the cancellation and expiration of
118// ctx will be noop. Users should call ClientConn.Close to terminate all the
119// pending operations after this function returns.
120//
121// The target name syntax is defined in
122// https://github.com/grpc/grpc/blob/master/doc/naming.md.
123// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
124func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
125 cc := &ClientConn{
126 target: target,
127 csMgr: &connectivityStateManager{},
128 conns: make(map[*addrConn]struct{}),
129 dopts: defaultDialOptions(),
130 blockingpicker: newPickerWrapper(),
131 czData: new(channelzData),
132 }
133 cc.retryThrottler.Store((*retryThrottler)(nil))
134 cc.ctx, cc.cancel = context.WithCancel(context.Background())
135
136 for _, opt := range opts {
137 opt.apply(&cc.dopts)
138 }
139
140 if channelz.IsOn() {
141 if cc.dopts.channelzParentID != 0 {
142 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
143 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
144 Desc: "Channel Created",
145 Severity: channelz.CtINFO,
146 Parent: &channelz.TraceEventDesc{
147 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
148 Severity: channelz.CtINFO,
149 },
150 })
151 } else {
152 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
153 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
154 Desc: "Channel Created",
155 Severity: channelz.CtINFO,
156 })
157 }
158 cc.csMgr.channelzID = cc.channelzID
159 }
160
161 if !cc.dopts.insecure {
162 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
163 return nil, errNoTransportSecurity
164 }
165 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
166 return nil, errTransportCredsAndBundle
167 }
168 } else {
169 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
170 return nil, errCredentialsConflict
171 }
172 for _, cd := range cc.dopts.copts.PerRPCCredentials {
173 if cd.RequireTransportSecurity() {
174 return nil, errTransportCredentialsMissing
175 }
176 }
177 }
178
179 cc.mkp = cc.dopts.copts.KeepaliveParams
180
181 if cc.dopts.copts.Dialer == nil {
182 cc.dopts.copts.Dialer = newProxyDialer(
183 func(ctx context.Context, addr string) (net.Conn, error) {
184 network, addr := parseDialTarget(addr)
185 return dialContext(ctx, network, addr)
186 },
187 )
188 }
189
190 if cc.dopts.copts.UserAgent != "" {
191 cc.dopts.copts.UserAgent += " " + grpcUA
192 } else {
193 cc.dopts.copts.UserAgent = grpcUA
194 }
195
196 if cc.dopts.timeout > 0 {
197 var cancel context.CancelFunc
198 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
199 defer cancel()
200 }
201
202 defer func() {
203 select {
204 case <-ctx.Done():
205 conn, err = nil, ctx.Err()
206 default:
207 }
208
209 if err != nil {
210 cc.Close()
211 }
212 }()
213
214 scSet := false
215 if cc.dopts.scChan != nil {
216 // Try to get an initial service config.
217 select {
218 case sc, ok := <-cc.dopts.scChan:
219 if ok {
220 cc.sc = sc
221 scSet = true
222 }
223 default:
224 }
225 }
226 if cc.dopts.bs == nil {
227 cc.dopts.bs = backoff.Exponential{
228 MaxDelay: DefaultBackoffConfig.MaxDelay,
229 }
230 }
231 if cc.dopts.resolverBuilder == nil {
232 // Only try to parse target when resolver builder is not already set.
233 cc.parsedTarget = parseTarget(cc.target)
234 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
235 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
236 if cc.dopts.resolverBuilder == nil {
237 // If resolver builder is still nil, the parse target's scheme is
238 // not registered. Fallback to default resolver and set Endpoint to
239 // the original unparsed target.
240 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
241 cc.parsedTarget = resolver.Target{
242 Scheme: resolver.GetDefaultScheme(),
243 Endpoint: target,
244 }
245 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
246 }
247 } else {
248 cc.parsedTarget = resolver.Target{Endpoint: target}
249 }
250 creds := cc.dopts.copts.TransportCredentials
251 if creds != nil && creds.Info().ServerName != "" {
252 cc.authority = creds.Info().ServerName
253 } else if cc.dopts.insecure && cc.dopts.authority != "" {
254 cc.authority = cc.dopts.authority
255 } else {
256 // Use endpoint from "scheme://authority/endpoint" as the default
257 // authority for ClientConn.
258 cc.authority = cc.parsedTarget.Endpoint
259 }
260
261 if cc.dopts.scChan != nil && !scSet {
262 // Blocking wait for the initial service config.
263 select {
264 case sc, ok := <-cc.dopts.scChan:
265 if ok {
266 cc.sc = sc
267 }
268 case <-ctx.Done():
269 return nil, ctx.Err()
270 }
271 }
272 if cc.dopts.scChan != nil {
273 go cc.scWatcher()
274 }
275
276 var credsClone credentials.TransportCredentials
277 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
278 credsClone = creds.Clone()
279 }
280 cc.balancerBuildOpts = balancer.BuildOptions{
281 DialCreds: credsClone,
282 CredsBundle: cc.dopts.copts.CredsBundle,
283 Dialer: cc.dopts.copts.Dialer,
284 ChannelzParentID: cc.channelzID,
285 }
286
287 // Build the resolver.
288 cc.resolverWrapper, err = newCCResolverWrapper(cc)
289 if err != nil {
290 return nil, fmt.Errorf("failed to build resolver: %v", err)
291 }
292 // Start the resolver wrapper goroutine after resolverWrapper is created.
293 //
294 // If the goroutine is started before resolverWrapper is ready, the
295 // following may happen: The goroutine sends updates to cc. cc forwards
296 // those to balancer. Balancer creates new addrConn. addrConn fails to
297 // connect, and calls resolveNow(). resolveNow() tries to use the non-ready
298 // resolverWrapper.
299 cc.resolverWrapper.start()
300
301 // A blocking dial blocks until the clientConn is ready.
302 if cc.dopts.block {
303 for {
304 s := cc.GetState()
305 if s == connectivity.Ready {
306 break
307 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
308 if err = cc.blockingpicker.connectionError(); err != nil {
309 terr, ok := err.(interface{ Temporary() bool })
310 if ok && !terr.Temporary() {
311 return nil, err
312 }
313 }
314 }
315 if !cc.WaitForStateChange(ctx, s) {
316 // ctx got timeout or canceled.
317 return nil, ctx.Err()
318 }
319 }
320 }
321
322 return cc, nil
323}
324
325// connectivityStateManager keeps the connectivity.State of ClientConn.
326// This struct will eventually be exported so the balancers can access it.
327type connectivityStateManager struct {
328 mu sync.Mutex
329 state connectivity.State
330 notifyChan chan struct{}
331 channelzID int64
332}
333
334// updateState updates the connectivity.State of ClientConn.
335// If there's a change it notifies goroutines waiting on state change to
336// happen.
337func (csm *connectivityStateManager) updateState(state connectivity.State) {
338 csm.mu.Lock()
339 defer csm.mu.Unlock()
340 if csm.state == connectivity.Shutdown {
341 return
342 }
343 if csm.state == state {
344 return
345 }
346 csm.state = state
347 if channelz.IsOn() {
348 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
349 Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
350 Severity: channelz.CtINFO,
351 })
352 }
353 if csm.notifyChan != nil {
354 // There are other goroutines waiting on this channel.
355 close(csm.notifyChan)
356 csm.notifyChan = nil
357 }
358}
359
360func (csm *connectivityStateManager) getState() connectivity.State {
361 csm.mu.Lock()
362 defer csm.mu.Unlock()
363 return csm.state
364}
365
366func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
367 csm.mu.Lock()
368 defer csm.mu.Unlock()
369 if csm.notifyChan == nil {
370 csm.notifyChan = make(chan struct{})
371 }
372 return csm.notifyChan
373}
374
375// ClientConn represents a client connection to an RPC server.
376type ClientConn struct {
377 ctx context.Context
378 cancel context.CancelFunc
379
380 target string
381 parsedTarget resolver.Target
382 authority string
383 dopts dialOptions
384 csMgr *connectivityStateManager
385
386 balancerBuildOpts balancer.BuildOptions
387 resolverWrapper *ccResolverWrapper
388 blockingpicker *pickerWrapper
389
390 mu sync.RWMutex
391 sc ServiceConfig
392 scRaw string
393 conns map[*addrConn]struct{}
394 // Keepalive parameter can be updated if a GoAway is received.
395 mkp keepalive.ClientParameters
396 curBalancerName string
397 preBalancerName string // previous balancer name.
398 curAddresses []resolver.Address
399 balancerWrapper *ccBalancerWrapper
400 retryThrottler atomic.Value
401
402 channelzID int64 // channelz unique identification number
403 czData *channelzData
404}
405
406// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
407// ctx expires. A true value is returned in former case and false in latter.
408// This is an EXPERIMENTAL API.
409func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
410 ch := cc.csMgr.getNotifyChan()
411 if cc.csMgr.getState() != sourceState {
412 return true
413 }
414 select {
415 case <-ctx.Done():
416 return false
417 case <-ch:
418 return true
419 }
420}
421
422// GetState returns the connectivity.State of ClientConn.
423// This is an EXPERIMENTAL API.
424func (cc *ClientConn) GetState() connectivity.State {
425 return cc.csMgr.getState()
426}
427
428func (cc *ClientConn) scWatcher() {
429 for {
430 select {
431 case sc, ok := <-cc.dopts.scChan:
432 if !ok {
433 return
434 }
435 cc.mu.Lock()
436 // TODO: load balance policy runtime change is ignored.
437 // We may revist this decision in the future.
438 cc.sc = sc
439 cc.scRaw = ""
440 cc.mu.Unlock()
441 case <-cc.ctx.Done():
442 return
443 }
444 }
445}
446
447func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
448 cc.mu.Lock()
449 defer cc.mu.Unlock()
450 if cc.conns == nil {
451 // cc was closed.
452 return
453 }
454
455 if reflect.DeepEqual(cc.curAddresses, addrs) {
456 return
457 }
458
459 cc.curAddresses = addrs
460
461 if cc.dopts.balancerBuilder == nil {
462 // Only look at balancer types and switch balancer if balancer dial
463 // option is not set.
464 var isGRPCLB bool
465 for _, a := range addrs {
466 if a.Type == resolver.GRPCLB {
467 isGRPCLB = true
468 break
469 }
470 }
471 var newBalancerName string
472 if isGRPCLB {
473 newBalancerName = grpclbName
474 } else {
475 // Address list doesn't contain grpclb address. Try to pick a
476 // non-grpclb balancer.
477 newBalancerName = cc.curBalancerName
478 // If current balancer is grpclb, switch to the previous one.
479 if newBalancerName == grpclbName {
480 newBalancerName = cc.preBalancerName
481 }
482 // The following could be true in two cases:
483 // - the first time handling resolved addresses
484 // (curBalancerName="")
485 // - the first time handling non-grpclb addresses
486 // (curBalancerName="grpclb", preBalancerName="")
487 if newBalancerName == "" {
488 newBalancerName = PickFirstBalancerName
489 }
490 }
491 cc.switchBalancer(newBalancerName)
492 } else if cc.balancerWrapper == nil {
493 // Balancer dial option was set, and this is the first time handling
494 // resolved addresses. Build a balancer with dopts.balancerBuilder.
495 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
496 }
497
498 cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
499}
500
501// switchBalancer starts the switching from current balancer to the balancer
502// with the given name.
503//
504// It will NOT send the current address list to the new balancer. If needed,
505// caller of this function should send address list to the new balancer after
506// this function returns.
507//
508// Caller must hold cc.mu.
509func (cc *ClientConn) switchBalancer(name string) {
510 if cc.conns == nil {
511 return
512 }
513
514 if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
515 return
516 }
517
518 grpclog.Infof("ClientConn switching balancer to %q", name)
519 if cc.dopts.balancerBuilder != nil {
520 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
521 return
522 }
523 // TODO(bar switching) change this to two steps: drain and close.
524 // Keep track of sc in wrapper.
525 if cc.balancerWrapper != nil {
526 cc.balancerWrapper.close()
527 }
528
529 builder := balancer.Get(name)
530 // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
531 // we reuse previous one?
532 if channelz.IsOn() {
533 if builder == nil {
534 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
535 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
536 Severity: channelz.CtWarning,
537 })
538 } else {
539 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
540 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
541 Severity: channelz.CtINFO,
542 })
543 }
544 }
545 if builder == nil {
546 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
547 builder = newPickfirstBuilder()
548 }
549
550 cc.preBalancerName = cc.curBalancerName
551 cc.curBalancerName = builder.Name()
552 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
553}
554
555func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
556 cc.mu.Lock()
557 if cc.conns == nil {
558 cc.mu.Unlock()
559 return
560 }
561 // TODO(bar switching) send updates to all balancer wrappers when balancer
562 // gracefully switching is supported.
563 cc.balancerWrapper.handleSubConnStateChange(sc, s)
564 cc.mu.Unlock()
565}
566
567// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
568//
569// Caller needs to make sure len(addrs) > 0.
570func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
571 ac := &addrConn{
572 cc: cc,
573 addrs: addrs,
574 scopts: opts,
575 dopts: cc.dopts,
576 czData: new(channelzData),
577 successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
578 resetBackoff: make(chan struct{}),
579 }
580 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
581 // Track ac in cc. This needs to be done before any getTransport(...) is called.
582 cc.mu.Lock()
583 if cc.conns == nil {
584 cc.mu.Unlock()
585 return nil, ErrClientConnClosing
586 }
587 if channelz.IsOn() {
588 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
589 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
590 Desc: "Subchannel Created",
591 Severity: channelz.CtINFO,
592 Parent: &channelz.TraceEventDesc{
593 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
594 Severity: channelz.CtINFO,
595 },
596 })
597 }
598 cc.conns[ac] = struct{}{}
599 cc.mu.Unlock()
600 return ac, nil
601}
602
603// removeAddrConn removes the addrConn in the subConn from clientConn.
604// It also tears down the ac with the given error.
605func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
606 cc.mu.Lock()
607 if cc.conns == nil {
608 cc.mu.Unlock()
609 return
610 }
611 delete(cc.conns, ac)
612 cc.mu.Unlock()
613 ac.tearDown(err)
614}
615
616func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
617 return &channelz.ChannelInternalMetric{
618 State: cc.GetState(),
619 Target: cc.target,
620 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
621 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
622 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
623 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
624 }
625}
626
627// Target returns the target string of the ClientConn.
628// This is an EXPERIMENTAL API.
629func (cc *ClientConn) Target() string {
630 return cc.target
631}
632
633func (cc *ClientConn) incrCallsStarted() {
634 atomic.AddInt64(&cc.czData.callsStarted, 1)
635 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
636}
637
638func (cc *ClientConn) incrCallsSucceeded() {
639 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
640}
641
642func (cc *ClientConn) incrCallsFailed() {
643 atomic.AddInt64(&cc.czData.callsFailed, 1)
644}
645
646// connect starts creating a transport.
647// It does nothing if the ac is not IDLE.
648// TODO(bar) Move this to the addrConn section.
649func (ac *addrConn) connect() error {
650 ac.mu.Lock()
651 if ac.state == connectivity.Shutdown {
652 ac.mu.Unlock()
653 return errConnClosing
654 }
655 if ac.state != connectivity.Idle {
656 ac.mu.Unlock()
657 return nil
658 }
659 ac.updateConnectivityState(connectivity.Connecting)
660 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
661 ac.mu.Unlock()
662
663 // Start a goroutine connecting to the server asynchronously.
664 go ac.resetTransport(false)
665 return nil
666}
667
668// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
669//
670// It checks whether current connected address of ac is in the new addrs list.
671// - If true, it updates ac.addrs and returns true. The ac will keep using
672// the existing connection.
673// - If false, it does nothing and returns false.
674func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
675 ac.mu.Lock()
676 defer ac.mu.Unlock()
677 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
678 if ac.state == connectivity.Shutdown {
679 ac.addrs = addrs
680 return true
681 }
682
683 var curAddrFound bool
684 for _, a := range addrs {
685 if reflect.DeepEqual(ac.curAddr, a) {
686 curAddrFound = true
687 break
688 }
689 }
690 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
691 if curAddrFound {
692 ac.addrs = addrs
693 ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
694 }
695
696 return curAddrFound
697}
698
699// GetMethodConfig gets the method config of the input method.
700// If there's an exact match for input method (i.e. /service/method), we return
701// the corresponding MethodConfig.
702// If there isn't an exact match for the input method, we look for the default config
703// under the service (i.e /service/). If there is a default MethodConfig for
704// the service, we return it.
705// Otherwise, we return an empty MethodConfig.
706func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
707 // TODO: Avoid the locking here.
708 cc.mu.RLock()
709 defer cc.mu.RUnlock()
710 m, ok := cc.sc.Methods[method]
711 if !ok {
712 i := strings.LastIndex(method, "/")
713 m = cc.sc.Methods[method[:i+1]]
714 }
715 return m
716}
717
718func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
719 hdr, _ := metadata.FromOutgoingContext(ctx)
720 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
721 FullMethodName: method,
722 Header: hdr,
723 })
724 if err != nil {
725 return nil, nil, toRPCErr(err)
726 }
727 return t, done, nil
728}
729
730// handleServiceConfig parses the service config string in JSON format to Go native
731// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
732func (cc *ClientConn) handleServiceConfig(js string) error {
733 if cc.dopts.disableServiceConfig {
734 return nil
735 }
736 if cc.scRaw == js {
737 return nil
738 }
739 if channelz.IsOn() {
740 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
741 // The special formatting of \"%s\" instead of %q is to provide nice printing of service config
742 // for human consumption.
743 Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
744 Severity: channelz.CtINFO,
745 })
746 }
747 sc, err := parseServiceConfig(js)
748 if err != nil {
749 return err
750 }
751 cc.mu.Lock()
752 // Check if the ClientConn is already closed. Some fields (e.g.
753 // balancerWrapper) are set to nil when closing the ClientConn, and could
754 // cause nil pointer panic if we don't have this check.
755 if cc.conns == nil {
756 cc.mu.Unlock()
757 return nil
758 }
759 cc.scRaw = js
760 cc.sc = sc
761
762 if sc.retryThrottling != nil {
763 newThrottler := &retryThrottler{
764 tokens: sc.retryThrottling.MaxTokens,
765 max: sc.retryThrottling.MaxTokens,
766 thresh: sc.retryThrottling.MaxTokens / 2,
767 ratio: sc.retryThrottling.TokenRatio,
768 }
769 cc.retryThrottler.Store(newThrottler)
770 } else {
771 cc.retryThrottler.Store((*retryThrottler)(nil))
772 }
773
774 if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
775 if cc.curBalancerName == grpclbName {
776 // If current balancer is grpclb, there's at least one grpclb
777 // balancer address in the resolved list. Don't switch the balancer,
778 // but change the previous balancer name, so if a new resolved
779 // address list doesn't contain grpclb address, balancer will be
780 // switched to *sc.LB.
781 cc.preBalancerName = *sc.LB
782 } else {
783 cc.switchBalancer(*sc.LB)
784 cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
785 }
786 }
787
788 cc.mu.Unlock()
789 return nil
790}
791
792func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
793 cc.mu.RLock()
794 r := cc.resolverWrapper
795 cc.mu.RUnlock()
796 if r == nil {
797 return
798 }
799 go r.resolveNow(o)
800}
801
802// ResetConnectBackoff wakes up all subchannels in transient failure and causes
803// them to attempt another connection immediately. It also resets the backoff
804// times used for subsequent attempts regardless of the current state.
805//
806// In general, this function should not be used. Typical service or network
807// outages result in a reasonable client reconnection strategy by default.
808// However, if a previously unavailable network becomes available, this may be
809// used to trigger an immediate reconnect.
810//
811// This API is EXPERIMENTAL.
812func (cc *ClientConn) ResetConnectBackoff() {
813 cc.mu.Lock()
814 defer cc.mu.Unlock()
815 for ac := range cc.conns {
816 ac.resetConnectBackoff()
817 }
818}
819
820// Close tears down the ClientConn and all underlying connections.
821func (cc *ClientConn) Close() error {
822 defer cc.cancel()
823
824 cc.mu.Lock()
825 if cc.conns == nil {
826 cc.mu.Unlock()
827 return ErrClientConnClosing
828 }
829 conns := cc.conns
830 cc.conns = nil
831 cc.csMgr.updateState(connectivity.Shutdown)
832
833 rWrapper := cc.resolverWrapper
834 cc.resolverWrapper = nil
835 bWrapper := cc.balancerWrapper
836 cc.balancerWrapper = nil
837 cc.mu.Unlock()
838
839 cc.blockingpicker.close()
840
841 if rWrapper != nil {
842 rWrapper.close()
843 }
844 if bWrapper != nil {
845 bWrapper.close()
846 }
847
848 for ac := range conns {
849 ac.tearDown(ErrClientConnClosing)
850 }
851 if channelz.IsOn() {
852 ted := &channelz.TraceEventDesc{
853 Desc: "Channel Deleted",
854 Severity: channelz.CtINFO,
855 }
856 if cc.dopts.channelzParentID != 0 {
857 ted.Parent = &channelz.TraceEventDesc{
858 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
859 Severity: channelz.CtINFO,
860 }
861 }
862 channelz.AddTraceEvent(cc.channelzID, ted)
863 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
864 // the entity beng deleted, and thus prevent it from being deleted right away.
865 channelz.RemoveEntry(cc.channelzID)
866 }
867 return nil
868}
869
870// addrConn is a network connection to a given address.
871type addrConn struct {
872 ctx context.Context
873 cancel context.CancelFunc
874
875 cc *ClientConn
876 dopts dialOptions
877 acbw balancer.SubConn
878 scopts balancer.NewSubConnOptions
879
880 transport transport.ClientTransport // The current transport.
881
882 mu sync.Mutex
883 addrIdx int // The index in addrs list to start reconnecting from.
884 curAddr resolver.Address // The current address.
885 addrs []resolver.Address // All addresses that the resolver resolved to.
886
887 // Use updateConnectivityState for updating addrConn's connectivity state.
888 state connectivity.State
889
890 tearDownErr error // The reason this addrConn is torn down.
891
892 backoffIdx int
893 // backoffDeadline is the time until which resetTransport needs to
894 // wait before increasing backoffIdx count.
895 backoffDeadline time.Time
896 // connectDeadline is the time by which all connection
897 // negotiations must complete.
898 connectDeadline time.Time
899
900 resetBackoff chan struct{}
901
902 channelzID int64 // channelz unique identification number
903 czData *channelzData
904
905 successfulHandshake bool
906}
907
908// Note: this requires a lock on ac.mu.
909func (ac *addrConn) updateConnectivityState(s connectivity.State) {
910 ac.state = s
911 if channelz.IsOn() {
912 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
913 Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s),
914 Severity: channelz.CtINFO,
915 })
916 }
917}
918
919// adjustParams updates parameters used to create transports upon
920// receiving a GoAway.
921func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
922 switch r {
923 case transport.GoAwayTooManyPings:
924 v := 2 * ac.dopts.copts.KeepaliveParams.Time
925 ac.cc.mu.Lock()
926 if v > ac.cc.mkp.Time {
927 ac.cc.mkp.Time = v
928 }
929 ac.cc.mu.Unlock()
930 }
931}
932
933// resetTransport makes sure that a healthy ac.transport exists.
934//
935// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
936// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
937// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
938// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
939// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
940// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
941//
942// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
943func (ac *addrConn) resetTransport(resolveNow bool) {
944 for {
945 // If this is the first in a line of resets, we want to resolve immediately. The only other time we
946 // want to reset is if we have tried all the addresses handed to us.
947 if resolveNow {
948 ac.mu.Lock()
949 ac.cc.resolveNow(resolver.ResolveNowOption{})
950 ac.mu.Unlock()
951 }
952
953 ac.mu.Lock()
954 if ac.state == connectivity.Shutdown {
955 ac.mu.Unlock()
956 return
957 }
958
959 // If the connection is READY, a failure must have occurred.
960 // Otherwise, we'll consider this is a transient failure when:
961 // We've exhausted all addresses
962 // We're in CONNECTING
963 // And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
964 if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
965 ac.updateConnectivityState(connectivity.TransientFailure)
966 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
967 }
968 ac.transport = nil
969 ac.mu.Unlock()
970
971 if err := ac.nextAddr(); err != nil {
972 return
973 }
974
975 ac.mu.Lock()
976 if ac.state == connectivity.Shutdown {
977 ac.mu.Unlock()
978 return
979 }
980
981 backoffIdx := ac.backoffIdx
982 backoffFor := ac.dopts.bs.Backoff(backoffIdx)
983
984 // This will be the duration that dial gets to finish.
985 dialDuration := getMinConnectTimeout()
986 if backoffFor > dialDuration {
987 // Give dial more time as we keep failing to connect.
988 dialDuration = backoffFor
989 }
990 start := time.Now()
991 connectDeadline := start.Add(dialDuration)
992 ac.backoffDeadline = start.Add(backoffFor)
993 ac.connectDeadline = connectDeadline
994
995 ac.mu.Unlock()
996
997 ac.cc.mu.RLock()
998 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
999 ac.cc.mu.RUnlock()
1000
1001 ac.mu.Lock()
1002
1003 if ac.state == connectivity.Shutdown {
1004 ac.mu.Unlock()
1005 return
1006 }
1007
1008 if ac.state != connectivity.Connecting {
1009 ac.updateConnectivityState(connectivity.Connecting)
1010 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1011 }
1012
1013 addr := ac.addrs[ac.addrIdx]
1014 copts := ac.dopts.copts
1015 if ac.scopts.CredsBundle != nil {
1016 copts.CredsBundle = ac.scopts.CredsBundle
1017 }
1018 ac.mu.Unlock()
1019
1020 if channelz.IsOn() {
1021 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1022 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1023 Severity: channelz.CtINFO,
1024 })
1025 }
1026
1027 if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
1028 continue
1029 }
1030
1031 return
1032 }
1033}
1034
1035// createTransport creates a connection to one of the backends in addrs.
1036func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
1037 oneReset := sync.Once{}
1038 skipReset := make(chan struct{})
1039 allowedToReset := make(chan struct{})
1040 prefaceReceived := make(chan struct{})
1041 onCloseCalled := make(chan struct{})
1042
1043 var prefaceMu sync.Mutex
1044 var serverPrefaceReceived bool
1045 var clientPrefaceWrote bool
1046
1047 onGoAway := func(r transport.GoAwayReason) {
1048 ac.mu.Lock()
1049 ac.adjustParams(r)
1050 ac.mu.Unlock()
1051 select {
1052 case <-skipReset: // The outer resetTransport loop will handle reconnection.
1053 return
1054 case <-allowedToReset: // We're in the clear to reset.
1055 go oneReset.Do(func() { ac.resetTransport(false) })
1056 }
1057 }
1058
1059 prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
1060
1061 onClose := func() {
1062 close(onCloseCalled)
1063 prefaceTimer.Stop()
1064
1065 select {
1066 case <-skipReset: // The outer resetTransport loop will handle reconnection.
1067 return
1068 case <-allowedToReset: // We're in the clear to reset.
1069 oneReset.Do(func() { ac.resetTransport(false) })
1070 }
1071 }
1072
1073 target := transport.TargetInfo{
1074 Addr: addr.Addr,
1075 Metadata: addr.Metadata,
1076 Authority: ac.cc.authority,
1077 }
1078
1079 onPrefaceReceipt := func() {
1080 close(prefaceReceived)
1081 prefaceTimer.Stop()
1082
1083 // TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
1084 ac.mu.Lock()
1085
1086 prefaceMu.Lock()
1087 serverPrefaceReceived = true
1088 if clientPrefaceWrote {
1089 ac.successfulHandshake = true
1090 ac.backoffDeadline = time.Time{}
1091 ac.connectDeadline = time.Time{}
1092 ac.addrIdx = 0
1093 ac.backoffIdx = 0
1094 }
1095 prefaceMu.Unlock()
1096
1097 ac.mu.Unlock()
1098 }
1099
1100 // Do not cancel in the success path because of this issue in Go1.6: https://github.com/golang/go/issues/15078.
1101 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1102 if channelz.IsOn() {
1103 copts.ChannelzParentID = ac.channelzID
1104 }
1105
1106 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1107
1108 if err == nil {
1109 prefaceMu.Lock()
1110 clientPrefaceWrote = true
1111 if serverPrefaceReceived {
1112 ac.successfulHandshake = true
1113 }
1114 prefaceMu.Unlock()
1115
1116 if ac.dopts.waitForHandshake {
1117 select {
1118 case <-prefaceTimer.C:
1119 // We didn't get the preface in time.
1120 newTr.Close()
1121 err = errors.New("timed out waiting for server handshake")
1122 case <-prefaceReceived:
1123 // We got the preface - huzzah! things are good.
1124 case <-onCloseCalled:
1125 // The transport has already closed - noop.
1126 close(allowedToReset)
1127 return nil
1128 }
1129 } else {
1130 go func() {
1131 select {
1132 case <-prefaceTimer.C:
1133 // We didn't get the preface in time.
1134 newTr.Close()
1135 case <-prefaceReceived:
1136 // We got the preface just in the nick of time - huzzah!
1137 case <-onCloseCalled:
1138 // The transport has already closed - noop.
1139 }
1140 }()
1141 }
1142 }
1143
1144 if err != nil {
1145 // newTr is either nil, or closed.
1146 cancel()
1147 ac.cc.blockingpicker.updateConnectionError(err)
1148 ac.mu.Lock()
1149 if ac.state == connectivity.Shutdown {
1150 // ac.tearDown(...) has been invoked.
1151 ac.mu.Unlock()
1152
1153 // We don't want to reset during this close because we prefer to kick out of this function and let the loop
1154 // in resetTransport take care of reconnecting.
1155 close(skipReset)
1156
1157 return errConnClosing
1158 }
1159 ac.mu.Unlock()
1160 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1161
1162 // We don't want to reset during this close because we prefer to kick out of this function and let the loop
1163 // in resetTransport take care of reconnecting.
1164 close(skipReset)
1165
1166 return err
1167 }
1168
1169 ac.mu.Lock()
1170
1171 if ac.state == connectivity.Shutdown {
1172 ac.mu.Unlock()
1173
1174 // We don't want to reset during this close because we prefer to kick out of this function and let the loop
1175 // in resetTransport take care of reconnecting.
1176 close(skipReset)
1177
1178 newTr.Close()
1179 return errConnClosing
1180 }
1181
1182 ac.updateConnectivityState(connectivity.Ready)
1183 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1184 ac.transport = newTr
1185 ac.curAddr = addr
1186
1187 ac.mu.Unlock()
1188
1189 // Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
1190 // goroutine failing races with all the code in this method that sets the connection to "ready".
1191 close(allowedToReset)
1192 return nil
1193}
1194
1195// nextAddr increments the addrIdx if there are more addresses to try. If
1196// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
1197// increment the backoffIdx.
1198//
1199// nextAddr must be called without ac.mu being held.
1200func (ac *addrConn) nextAddr() error {
1201 ac.mu.Lock()
1202
1203 // If a handshake has been observed, we expect the counters to have manually
1204 // been reset so we'll just return, since we want the next usage to start
1205 // at index 0.
1206 if ac.successfulHandshake {
1207 ac.successfulHandshake = false
1208 ac.mu.Unlock()
1209 return nil
1210 }
1211
1212 if ac.addrIdx < len(ac.addrs)-1 {
1213 ac.addrIdx++
1214 ac.mu.Unlock()
1215 return nil
1216 }
1217
1218 ac.addrIdx = 0
1219 ac.backoffIdx++
1220
1221 if ac.state == connectivity.Shutdown {
1222 ac.mu.Unlock()
1223 return errConnClosing
1224 }
1225 ac.cc.resolveNow(resolver.ResolveNowOption{})
1226 backoffDeadline := ac.backoffDeadline
1227 b := ac.resetBackoff
1228 ac.mu.Unlock()
1229 timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
1230 select {
1231 case <-timer.C:
1232 case <-b:
1233 timer.Stop()
1234 case <-ac.ctx.Done():
1235 timer.Stop()
1236 return ac.ctx.Err()
1237 }
1238 return nil
1239}
1240
1241func (ac *addrConn) resetConnectBackoff() {
1242 ac.mu.Lock()
1243 close(ac.resetBackoff)
1244 ac.backoffIdx = 0
1245 ac.resetBackoff = make(chan struct{})
1246 ac.mu.Unlock()
1247}
1248
1249// getReadyTransport returns the transport if ac's state is READY.
1250// Otherwise it returns nil, false.
1251// If ac's state is IDLE, it will trigger ac to connect.
1252func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1253 ac.mu.Lock()
1254 if ac.state == connectivity.Ready && ac.transport != nil {
1255 t := ac.transport
1256 ac.mu.Unlock()
1257 return t, true
1258 }
1259 var idle bool
1260 if ac.state == connectivity.Idle {
1261 idle = true
1262 }
1263 ac.mu.Unlock()
1264 // Trigger idle ac to connect.
1265 if idle {
1266 ac.connect()
1267 }
1268 return nil, false
1269}
1270
1271// tearDown starts to tear down the addrConn.
1272// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1273// some edge cases (e.g., the caller opens and closes many addrConn's in a
1274// tight loop.
1275// tearDown doesn't remove ac from ac.cc.conns.
1276func (ac *addrConn) tearDown(err error) {
1277 ac.mu.Lock()
1278 if ac.state == connectivity.Shutdown {
1279 ac.mu.Unlock()
1280 return
1281 }
1282 // We have to set the state to Shutdown before anything else to prevent races
1283 // between setting the state and logic that waits on context cancelation / etc.
1284 ac.updateConnectivityState(connectivity.Shutdown)
1285 ac.cancel()
1286 ac.tearDownErr = err
1287 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1288 ac.curAddr = resolver.Address{}
1289 if err == errConnDrain && ac.transport != nil {
1290 // GracefulClose(...) may be executed multiple times when
1291 // i) receiving multiple GoAway frames from the server; or
1292 // ii) there are concurrent name resolver/Balancer triggered
1293 // address removal and GoAway.
1294 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1295 ac.mu.Unlock()
1296 ac.transport.GracefulClose()
1297 ac.mu.Lock()
1298 }
1299 if channelz.IsOn() {
1300 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1301 Desc: "Subchannel Deleted",
1302 Severity: channelz.CtINFO,
1303 Parent: &channelz.TraceEventDesc{
1304 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1305 Severity: channelz.CtINFO,
1306 },
1307 })
1308 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1309 // the entity beng deleted, and thus prevent it from being deleted right away.
1310 channelz.RemoveEntry(ac.channelzID)
1311 }
1312 ac.mu.Unlock()
1313}
1314
1315func (ac *addrConn) getState() connectivity.State {
1316 ac.mu.Lock()
1317 defer ac.mu.Unlock()
1318 return ac.state
1319}
1320
1321func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1322 ac.mu.Lock()
1323 addr := ac.curAddr.Addr
1324 ac.mu.Unlock()
1325 return &channelz.ChannelInternalMetric{
1326 State: ac.getState(),
1327 Target: addr,
1328 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1329 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1330 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1331 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1332 }
1333}
1334
1335func (ac *addrConn) incrCallsStarted() {
1336 atomic.AddInt64(&ac.czData.callsStarted, 1)
1337 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1338}
1339
1340func (ac *addrConn) incrCallsSucceeded() {
1341 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1342}
1343
1344func (ac *addrConn) incrCallsFailed() {
1345 atomic.AddInt64(&ac.czData.callsFailed, 1)
1346}
1347
1348type retryThrottler struct {
1349 max float64
1350 thresh float64
1351 ratio float64
1352
1353 mu sync.Mutex
1354 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1355}
1356
1357// throttle subtracts a retry token from the pool and returns whether a retry
1358// should be throttled (disallowed) based upon the retry throttling policy in
1359// the service config.
1360func (rt *retryThrottler) throttle() bool {
1361 if rt == nil {
1362 return false
1363 }
1364 rt.mu.Lock()
1365 defer rt.mu.Unlock()
1366 rt.tokens--
1367 if rt.tokens < 0 {
1368 rt.tokens = 0
1369 }
1370 return rt.tokens <= rt.thresh
1371}
1372
1373func (rt *retryThrottler) successfulRPC() {
1374 if rt == nil {
1375 return
1376 }
1377 rt.mu.Lock()
1378 defer rt.mu.Unlock()
1379 rt.tokens += rt.ratio
1380 if rt.tokens > rt.max {
1381 rt.tokens = rt.max
1382 }
1383}
1384
1385type channelzChannel struct {
1386 cc *ClientConn
1387}
1388
1389func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1390 return c.cc.channelzMetric()
1391}
1392
1393// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1394// underlying connections within the specified timeout.
1395//
1396// Deprecated: This error is never returned by grpc and should not be
1397// referenced by users.
1398var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")