hswaw/smsgw: implement

The SMS gateway service allows consumers to subscribe to SMS messages
received by a Twilio phone number.

This is useful for receiving SMS auth messages.

Change-Id: Ib02a4306ad0d856dd10c7ca9241d9163809e7084
diff --git a/hswaw/smsgw/dispatcher_test.go b/hswaw/smsgw/dispatcher_test.go
new file mode 100644
index 0000000..a5f3977
--- /dev/null
+++ b/hswaw/smsgw/dispatcher_test.go
@@ -0,0 +1,206 @@
+package main
+
+import (
+	"context"
+	"regexp"
+	"testing"
+	"time"
+)
+
+func makeDut() (*dispatcher, context.CancelFunc, context.Context) {
+	dut := newDispatcher()
+
+	ctx := context.Background()
+	ctxC, cancelCtx := context.WithCancel(ctx)
+	go dut.run(ctxC)
+
+	return dut, cancelCtx, ctx
+}
+
+func expectReceived(t *testing.T, s *sms, data chan *sms) {
+	ticker := time.NewTicker(100 * time.Millisecond)
+	defer ticker.Stop()
+	select {
+	case d := <-data:
+		if d.from != s.from {
+			t.Errorf("Received SMS from %q, wanted %q", d.from, s.from)
+		}
+		if d.body != s.body {
+			t.Errorf("Received SMS body %q, wanted %q", d.body, s.body)
+		}
+		if d.timestamp != s.timestamp {
+			t.Errorf("Received SMS timestamp %v, wanted %v", d.timestamp, s.timestamp)
+		}
+	case <-ticker.C:
+		t.Fatalf("Timed out waiting for message")
+	}
+}
+
+func expectEmpty(t *testing.T, data chan *sms) {
+	ticker := time.NewTicker(1 * time.Millisecond)
+	defer ticker.Stop()
+	select {
+	case <-data:
+		t.Fatalf("Received unwanted message")
+	case <-ticker.C:
+	}
+}
+
+func TestDispatcher(t *testing.T) {
+	dut, cancelDut, _ := makeDut()
+	defer cancelDut()
+
+	data := make(chan *sms)
+	cancel := make(chan struct{})
+
+	dut.subscribe(&subscriber{
+		re:     regexp.MustCompile(".*"),
+		data:   data,
+		cancel: cancel,
+	})
+
+	in := &sms{
+		from:      "+4821372137",
+		body:      "foo",
+		timestamp: time.Now(),
+	}
+	dut.publish(in)
+
+	// Make sure we ge the message.
+	expectReceived(t, in, data)
+
+	// Make sure we don't receive the message again.
+	expectEmpty(t, data)
+
+	// Publish a new message, but this time close our subscriber.
+	close(cancel)
+	// Hack: yield.
+	time.Sleep(1 * time.Millisecond)
+
+	dut.publish(in)
+	expectEmpty(t, data)
+}
+
+type testSubscriber struct {
+	re     *regexp.Regexp
+	data   chan *sms
+	cancel chan struct{}
+}
+
+func TestDispatcherFilters(t *testing.T) {
+	dut, cancelDut, _ := makeDut()
+	defer cancelDut()
+
+	subscribers := []*testSubscriber{
+		{re: regexp.MustCompile(".*")},
+		{re: regexp.MustCompile("foo")},
+		{re: regexp.MustCompile("bar")},
+	}
+
+	for _, s := range subscribers {
+		s.data = make(chan *sms)
+		s.cancel = make(chan struct{})
+		dut.subscribe(&subscriber{
+			re:     s.re,
+			data:   s.data,
+			cancel: s.cancel,
+		})
+		defer func(c chan struct{}) {
+			close(c)
+		}(s.cancel)
+	}
+
+	in := &sms{
+		from:      "+4821372137",
+		body:      "foo",
+		timestamp: time.Now(),
+	}
+	dut.publish(in)
+	expectReceived(t, in, subscribers[0].data)
+	expectReceived(t, in, subscribers[1].data)
+	expectEmpty(t, subscribers[2].data)
+
+	in = &sms{
+		from:      "+4821372137",
+		body:      "bar",
+		timestamp: time.Now(),
+	}
+	dut.publish(in)
+	expectReceived(t, in, subscribers[0].data)
+	expectEmpty(t, subscribers[1].data)
+	expectReceived(t, in, subscribers[2].data)
+
+	in = &sms{
+		from:      "+4821372137",
+		body:      "foobar",
+		timestamp: time.Now(),
+	}
+	dut.publish(in)
+	expectReceived(t, in, subscribers[0].data)
+	expectReceived(t, in, subscribers[1].data)
+	expectReceived(t, in, subscribers[2].data)
+}
+
+func TestDispatcherMany(t *testing.T) {
+	dut, cancelDut, _ := makeDut()
+	defer cancelDut()
+
+	subscribers := make([]*testSubscriber, 10000)
+
+	for i, _ := range subscribers {
+		s := &testSubscriber{
+			re:     regexp.MustCompile(".*"),
+			data:   make(chan *sms),
+			cancel: make(chan struct{}),
+		}
+		subscribers[i] = s
+		dut.subscribe(&subscriber{
+			re:     s.re,
+			data:   s.data,
+			cancel: s.cancel,
+		})
+		defer func(c chan struct{}) {
+			close(c)
+		}(s.cancel)
+	}
+
+	in := &sms{
+		from:      "+4821372137",
+		body:      "foo",
+		timestamp: time.Now(),
+	}
+	dut.publish(in)
+
+	for _, s := range subscribers {
+		expectReceived(t, in, s.data)
+	}
+}
+
+func TestDispatcherHammer(t *testing.T) {
+	dut, cancelDut, _ := makeDut()
+	defer cancelDut()
+
+	for i := 0; i < 1000000; i += 1 {
+		s := &testSubscriber{
+			re:     regexp.MustCompile(".*"),
+			data:   make(chan *sms),
+			cancel: make(chan struct{}),
+		}
+
+		dut.subscribe(&subscriber{
+			re:     s.re,
+			data:   s.data,
+			cancel: s.cancel,
+		})
+
+		in := &sms{
+			from:      "+4821372137",
+			body:      "foo",
+			timestamp: time.Now(),
+		}
+		dut.publish(in)
+		expectReceived(t, in, s.data)
+
+		close(s.cancel)
+	}
+}