blob: e742512fd3a83f04e560dd8438e1048366fc9821 [file] [log] [blame]
Serge Bazanskicc25bdf2018-10-25 14:02:58 +02001package backoff
2
3import (
4 "runtime"
5 "sync"
6 "time"
7)
8
9// Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff.
10//
11// Ticks will continue to arrive when the previous operation is still running,
12// so operations that take a while to fail could run in quick succession.
13type Ticker struct {
14 C <-chan time.Time
15 c chan time.Time
16 b BackOffContext
17 stop chan struct{}
18 stopOnce sync.Once
19}
20
21// NewTicker returns a new Ticker containing a channel that will send
22// the time at times specified by the BackOff argument. Ticker is
23// guaranteed to tick at least once. The channel is closed when Stop
24// method is called or BackOff stops. It is not safe to manipulate the
25// provided backoff policy (notably calling NextBackOff or Reset)
26// while the ticker is running.
27func NewTicker(b BackOff) *Ticker {
28 c := make(chan time.Time)
29 t := &Ticker{
30 C: c,
31 c: c,
32 b: ensureContext(b),
33 stop: make(chan struct{}),
34 }
35 t.b.Reset()
36 go t.run()
37 runtime.SetFinalizer(t, (*Ticker).Stop)
38 return t
39}
40
41// Stop turns off a ticker. After Stop, no more ticks will be sent.
42func (t *Ticker) Stop() {
43 t.stopOnce.Do(func() { close(t.stop) })
44}
45
46func (t *Ticker) run() {
47 c := t.c
48 defer close(c)
49
50 // Ticker is guaranteed to tick at least once.
51 afterC := t.send(time.Now())
52
53 for {
54 if afterC == nil {
55 return
56 }
57
58 select {
59 case tick := <-afterC:
60 afterC = t.send(tick)
61 case <-t.stop:
62 t.c = nil // Prevent future ticks from being sent to the channel.
63 return
64 case <-t.b.Context().Done():
65 return
66 }
67 }
68}
69
70func (t *Ticker) send(tick time.Time) <-chan time.Time {
71 select {
72 case t.c <- tick:
73 case <-t.stop:
74 return nil
75 }
76
77 next := t.b.NextBackOff()
78 if next == Stop {
79 t.Stop()
80 return nil
81 }
82
83 return time.After(next)
84}