hswaw/smsgw: implement
The SMS gateway service allows consumers to subscribe to SMS messages
received by a Twilio phone number.
This is useful for receiving SMS auth messages.
Change-Id: Ib02a4306ad0d856dd10c7ca9241d9163809e7084
diff --git a/hswaw/smsgw/dispatcher.go b/hswaw/smsgw/dispatcher.go
new file mode 100644
index 0000000..60fba32
--- /dev/null
+++ b/hswaw/smsgw/dispatcher.go
@@ -0,0 +1,110 @@
+package main
+
+import (
+ "context"
+ "regexp"
+ "time"
+
+ "github.com/golang/glog"
+)
+
+// dispatcher is responsible for dispatching incoming SMS messages to subscribers
+// that have chosen to receive them, filtering accordingly.
+type dispatcher struct {
+ // New SMS messages to be dispatched.
+ incoming chan *sms
+ // New subscribers to send messages to.
+ subscribers chan *subscriber
+}
+
+// newDispatcher creates a new dispatcher.
+func newDispatcher() *dispatcher {
+ return &dispatcher{
+ incoming: make(chan *sms),
+ subscribers: make(chan *subscriber),
+ }
+}
+
+// sms received from the upstream provider.
+type sms struct {
+ from string
+ body string
+ timestamp time.Time
+}
+
+// subscriber that wants to receive messages with a given body filter.
+type subscriber struct {
+ // regexp to filter message body by
+ re *regexp.Regexp
+ // channel to which messages will be sent, must be emptied regularly by the
+ // subscriber.
+ data chan *sms
+ // channel that needs to be closed when the subscriber doesn't want to receive
+ // any more messages.
+ cancel chan struct{}
+}
+
+func (p *dispatcher) publish(msg *sms) {
+ p.incoming <- msg
+}
+
+func (p *dispatcher) subscribe(sub *subscriber) {
+ p.subscribers <- sub
+}
+
+func (p *dispatcher) run(ctx context.Context) {
+ // Map of internal IDs to subscribers. Internal IDs are used to remove
+ // canceled subscribers easily.
+ subscriberMap := make(map[int64]*subscriber)
+ // Internal channel that will emit SIDs of subscribers that needs to be
+ // removed.
+ subscriberCancel := make(chan int64)
+
+ for {
+ select {
+
+ // Should the processor close?
+ case <-ctx.Done():
+ return
+
+ // Do we need to remove a given subscriber?
+ case sid := <-subscriberCancel:
+ delete(subscriberMap, sid)
+
+ // Do we have a new subscriber?
+ case sub := <-p.subscribers:
+ // Generate a SID. A UNIX nanosecond timestamp is enough, since
+ // we're not running in parallel.
+ sid := time.Now().UnixNano()
+ glog.V(5).Infof("New subscriber %x, regexp %v", sid, sub.re)
+
+ // Add to subscriber map.
+ subscriberMap[sid] = sub
+
+ // On sub.cancel closed, emit info that we need to delete that
+ // subscriber.
+ go func() {
+ _, _ = <-sub.cancel
+ subscriberCancel <- sid
+ }()
+
+ // Do we have a new message to dispatch?
+ case in := <-p.incoming:
+ for sid, s := range subscriberMap {
+ glog.V(10).Infof("Considering %x", sid)
+ // If this subscriber doesn't care, ignore.
+ if !s.re.MatchString(in.body) {
+ continue
+ }
+
+ // Send, non-blocking, to subscriber. This ensures that we
+ // don't get stuck if a subscriber doesn't drain fast enough.
+ go func(to *subscriber, sid int64) {
+ glog.V(10).Infof("Dispatching to %x, %v", sid, to.data)
+ to.data <- in
+ glog.V(10).Infof("Dispatched to %x", sid)
+ }(s, sid)
+ }
+ }
+ }
+}