blob: a5f39777ac55de6a6c10356d11df7caf1c722d18 [file] [log] [blame]
package main
import (
"context"
"regexp"
"testing"
"time"
)
func makeDut() (*dispatcher, context.CancelFunc, context.Context) {
dut := newDispatcher()
ctx := context.Background()
ctxC, cancelCtx := context.WithCancel(ctx)
go dut.run(ctxC)
return dut, cancelCtx, ctx
}
func expectReceived(t *testing.T, s *sms, data chan *sms) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
select {
case d := <-data:
if d.from != s.from {
t.Errorf("Received SMS from %q, wanted %q", d.from, s.from)
}
if d.body != s.body {
t.Errorf("Received SMS body %q, wanted %q", d.body, s.body)
}
if d.timestamp != s.timestamp {
t.Errorf("Received SMS timestamp %v, wanted %v", d.timestamp, s.timestamp)
}
case <-ticker.C:
t.Fatalf("Timed out waiting for message")
}
}
func expectEmpty(t *testing.T, data chan *sms) {
ticker := time.NewTicker(1 * time.Millisecond)
defer ticker.Stop()
select {
case <-data:
t.Fatalf("Received unwanted message")
case <-ticker.C:
}
}
func TestDispatcher(t *testing.T) {
dut, cancelDut, _ := makeDut()
defer cancelDut()
data := make(chan *sms)
cancel := make(chan struct{})
dut.subscribe(&subscriber{
re: regexp.MustCompile(".*"),
data: data,
cancel: cancel,
})
in := &sms{
from: "+4821372137",
body: "foo",
timestamp: time.Now(),
}
dut.publish(in)
// Make sure we ge the message.
expectReceived(t, in, data)
// Make sure we don't receive the message again.
expectEmpty(t, data)
// Publish a new message, but this time close our subscriber.
close(cancel)
// Hack: yield.
time.Sleep(1 * time.Millisecond)
dut.publish(in)
expectEmpty(t, data)
}
type testSubscriber struct {
re *regexp.Regexp
data chan *sms
cancel chan struct{}
}
func TestDispatcherFilters(t *testing.T) {
dut, cancelDut, _ := makeDut()
defer cancelDut()
subscribers := []*testSubscriber{
{re: regexp.MustCompile(".*")},
{re: regexp.MustCompile("foo")},
{re: regexp.MustCompile("bar")},
}
for _, s := range subscribers {
s.data = make(chan *sms)
s.cancel = make(chan struct{})
dut.subscribe(&subscriber{
re: s.re,
data: s.data,
cancel: s.cancel,
})
defer func(c chan struct{}) {
close(c)
}(s.cancel)
}
in := &sms{
from: "+4821372137",
body: "foo",
timestamp: time.Now(),
}
dut.publish(in)
expectReceived(t, in, subscribers[0].data)
expectReceived(t, in, subscribers[1].data)
expectEmpty(t, subscribers[2].data)
in = &sms{
from: "+4821372137",
body: "bar",
timestamp: time.Now(),
}
dut.publish(in)
expectReceived(t, in, subscribers[0].data)
expectEmpty(t, subscribers[1].data)
expectReceived(t, in, subscribers[2].data)
in = &sms{
from: "+4821372137",
body: "foobar",
timestamp: time.Now(),
}
dut.publish(in)
expectReceived(t, in, subscribers[0].data)
expectReceived(t, in, subscribers[1].data)
expectReceived(t, in, subscribers[2].data)
}
func TestDispatcherMany(t *testing.T) {
dut, cancelDut, _ := makeDut()
defer cancelDut()
subscribers := make([]*testSubscriber, 10000)
for i, _ := range subscribers {
s := &testSubscriber{
re: regexp.MustCompile(".*"),
data: make(chan *sms),
cancel: make(chan struct{}),
}
subscribers[i] = s
dut.subscribe(&subscriber{
re: s.re,
data: s.data,
cancel: s.cancel,
})
defer func(c chan struct{}) {
close(c)
}(s.cancel)
}
in := &sms{
from: "+4821372137",
body: "foo",
timestamp: time.Now(),
}
dut.publish(in)
for _, s := range subscribers {
expectReceived(t, in, s.data)
}
}
func TestDispatcherHammer(t *testing.T) {
dut, cancelDut, _ := makeDut()
defer cancelDut()
for i := 0; i < 1000000; i += 1 {
s := &testSubscriber{
re: regexp.MustCompile(".*"),
data: make(chan *sms),
cancel: make(chan struct{}),
}
dut.subscribe(&subscriber{
re: s.re,
data: s.data,
cancel: s.cancel,
})
in := &sms{
from: "+4821372137",
body: "foo",
timestamp: time.Now(),
}
dut.publish(in)
expectReceived(t, in, s.data)
close(s.cancel)
}
}