hswaw/smsgw: implement

The SMS gateway service allows consumers to subscribe to SMS messages
received by a Twilio phone number.

This is useful for receiving SMS auth messages.

Change-Id: Ib02a4306ad0d856dd10c7ca9241d9163809e7084
diff --git a/hswaw/smsgw/dispatcher.go b/hswaw/smsgw/dispatcher.go
new file mode 100644
index 0000000..60fba32
--- /dev/null
+++ b/hswaw/smsgw/dispatcher.go
@@ -0,0 +1,110 @@
+package main
+
+import (
+	"context"
+	"regexp"
+	"time"
+
+	"github.com/golang/glog"
+)
+
+// dispatcher is responsible for dispatching incoming SMS messages to subscribers
+// that have chosen to receive them, filtering accordingly.
+type dispatcher struct {
+	// New SMS messages to be dispatched.
+	incoming chan *sms
+	// New subscribers to send messages to.
+	subscribers chan *subscriber
+}
+
+// newDispatcher creates a new dispatcher.
+func newDispatcher() *dispatcher {
+	return &dispatcher{
+		incoming:    make(chan *sms),
+		subscribers: make(chan *subscriber),
+	}
+}
+
+// sms received from the upstream provider.
+type sms struct {
+	from      string
+	body      string
+	timestamp time.Time
+}
+
+// subscriber that wants to receive messages with a given body filter.
+type subscriber struct {
+	// regexp to filter message body by
+	re *regexp.Regexp
+	// channel to which messages will be sent, must be emptied regularly by the
+	// subscriber.
+	data chan *sms
+	// channel that needs to be closed when the subscriber doesn't want to receive
+	// any more messages.
+	cancel chan struct{}
+}
+
+func (p *dispatcher) publish(msg *sms) {
+	p.incoming <- msg
+}
+
+func (p *dispatcher) subscribe(sub *subscriber) {
+	p.subscribers <- sub
+}
+
+func (p *dispatcher) run(ctx context.Context) {
+	// Map of internal IDs to subscribers. Internal IDs are used to remove
+	// canceled subscribers easily.
+	subscriberMap := make(map[int64]*subscriber)
+	// Internal channel that will emit SIDs of subscribers that needs to be
+	// removed.
+	subscriberCancel := make(chan int64)
+
+	for {
+		select {
+
+		// Should the processor close?
+		case <-ctx.Done():
+			return
+
+		// Do we need to remove a given subscriber?
+		case sid := <-subscriberCancel:
+			delete(subscriberMap, sid)
+
+		// Do we have a new subscriber?
+		case sub := <-p.subscribers:
+			// Generate a SID. A UNIX nanosecond timestamp is enough, since
+			// we're not running in parallel.
+			sid := time.Now().UnixNano()
+			glog.V(5).Infof("New subscriber %x, regexp %v", sid, sub.re)
+
+			// Add to subscriber map.
+			subscriberMap[sid] = sub
+
+			// On sub.cancel closed, emit info that we need to delete that
+			// subscriber.
+			go func() {
+				_, _ = <-sub.cancel
+				subscriberCancel <- sid
+			}()
+
+		// Do we have a new message to dispatch?
+		case in := <-p.incoming:
+			for sid, s := range subscriberMap {
+				glog.V(10).Infof("Considering %x", sid)
+				// If this subscriber doesn't care, ignore.
+				if !s.re.MatchString(in.body) {
+					continue
+				}
+
+				// Send, non-blocking, to subscriber. This ensures that we
+				// don't get stuck if a subscriber doesn't drain fast enough.
+				go func(to *subscriber, sid int64) {
+					glog.V(10).Infof("Dispatching to %x, %v", sid, to.data)
+					to.data <- in
+					glog.V(10).Infof("Dispatched to %x", sid)
+				}(s, sid)
+			}
+		}
+	}
+}