blob: 60fba32379ad80615415fa7222ba36b36a6a8ab6 [file] [log] [blame]
Sergiusz Bazanski325e9472019-09-27 02:49:47 +02001package main
2
3import (
4 "context"
5 "regexp"
6 "time"
7
8 "github.com/golang/glog"
9)
10
11// dispatcher is responsible for dispatching incoming SMS messages to subscribers
12// that have chosen to receive them, filtering accordingly.
13type dispatcher struct {
14 // New SMS messages to be dispatched.
15 incoming chan *sms
16 // New subscribers to send messages to.
17 subscribers chan *subscriber
18}
19
20// newDispatcher creates a new dispatcher.
21func newDispatcher() *dispatcher {
22 return &dispatcher{
23 incoming: make(chan *sms),
24 subscribers: make(chan *subscriber),
25 }
26}
27
28// sms received from the upstream provider.
29type sms struct {
30 from string
31 body string
32 timestamp time.Time
33}
34
35// subscriber that wants to receive messages with a given body filter.
36type subscriber struct {
37 // regexp to filter message body by
38 re *regexp.Regexp
39 // channel to which messages will be sent, must be emptied regularly by the
40 // subscriber.
41 data chan *sms
42 // channel that needs to be closed when the subscriber doesn't want to receive
43 // any more messages.
44 cancel chan struct{}
45}
46
47func (p *dispatcher) publish(msg *sms) {
48 p.incoming <- msg
49}
50
51func (p *dispatcher) subscribe(sub *subscriber) {
52 p.subscribers <- sub
53}
54
55func (p *dispatcher) run(ctx context.Context) {
56 // Map of internal IDs to subscribers. Internal IDs are used to remove
57 // canceled subscribers easily.
58 subscriberMap := make(map[int64]*subscriber)
59 // Internal channel that will emit SIDs of subscribers that needs to be
60 // removed.
61 subscriberCancel := make(chan int64)
62
63 for {
64 select {
65
66 // Should the processor close?
67 case <-ctx.Done():
68 return
69
70 // Do we need to remove a given subscriber?
71 case sid := <-subscriberCancel:
72 delete(subscriberMap, sid)
73
74 // Do we have a new subscriber?
75 case sub := <-p.subscribers:
76 // Generate a SID. A UNIX nanosecond timestamp is enough, since
77 // we're not running in parallel.
78 sid := time.Now().UnixNano()
79 glog.V(5).Infof("New subscriber %x, regexp %v", sid, sub.re)
80
81 // Add to subscriber map.
82 subscriberMap[sid] = sub
83
84 // On sub.cancel closed, emit info that we need to delete that
85 // subscriber.
86 go func() {
87 _, _ = <-sub.cancel
88 subscriberCancel <- sid
89 }()
90
91 // Do we have a new message to dispatch?
92 case in := <-p.incoming:
93 for sid, s := range subscriberMap {
94 glog.V(10).Infof("Considering %x", sid)
95 // If this subscriber doesn't care, ignore.
96 if !s.re.MatchString(in.body) {
97 continue
98 }
99
100 // Send, non-blocking, to subscriber. This ensures that we
101 // don't get stuck if a subscriber doesn't drain fast enough.
102 go func(to *subscriber, sid int64) {
103 glog.V(10).Infof("Dispatching to %x, %v", sid, to.data)
104 to.data <- in
105 glog.V(10).Infof("Dispatched to %x", sid)
106 }(s, sid)
107 }
108 }
109 }
110}