blob: 60fba32379ad80615415fa7222ba36b36a6a8ab6 [file] [log] [blame]
package main
import (
"context"
"regexp"
"time"
"github.com/golang/glog"
)
// dispatcher is responsible for dispatching incoming SMS messages to subscribers
// that have chosen to receive them, filtering accordingly.
type dispatcher struct {
// New SMS messages to be dispatched.
incoming chan *sms
// New subscribers to send messages to.
subscribers chan *subscriber
}
// newDispatcher creates a new dispatcher.
func newDispatcher() *dispatcher {
return &dispatcher{
incoming: make(chan *sms),
subscribers: make(chan *subscriber),
}
}
// sms received from the upstream provider.
type sms struct {
from string
body string
timestamp time.Time
}
// subscriber that wants to receive messages with a given body filter.
type subscriber struct {
// regexp to filter message body by
re *regexp.Regexp
// channel to which messages will be sent, must be emptied regularly by the
// subscriber.
data chan *sms
// channel that needs to be closed when the subscriber doesn't want to receive
// any more messages.
cancel chan struct{}
}
func (p *dispatcher) publish(msg *sms) {
p.incoming <- msg
}
func (p *dispatcher) subscribe(sub *subscriber) {
p.subscribers <- sub
}
func (p *dispatcher) run(ctx context.Context) {
// Map of internal IDs to subscribers. Internal IDs are used to remove
// canceled subscribers easily.
subscriberMap := make(map[int64]*subscriber)
// Internal channel that will emit SIDs of subscribers that needs to be
// removed.
subscriberCancel := make(chan int64)
for {
select {
// Should the processor close?
case <-ctx.Done():
return
// Do we need to remove a given subscriber?
case sid := <-subscriberCancel:
delete(subscriberMap, sid)
// Do we have a new subscriber?
case sub := <-p.subscribers:
// Generate a SID. A UNIX nanosecond timestamp is enough, since
// we're not running in parallel.
sid := time.Now().UnixNano()
glog.V(5).Infof("New subscriber %x, regexp %v", sid, sub.re)
// Add to subscriber map.
subscriberMap[sid] = sub
// On sub.cancel closed, emit info that we need to delete that
// subscriber.
go func() {
_, _ = <-sub.cancel
subscriberCancel <- sid
}()
// Do we have a new message to dispatch?
case in := <-p.incoming:
for sid, s := range subscriberMap {
glog.V(10).Infof("Considering %x", sid)
// If this subscriber doesn't care, ignore.
if !s.re.MatchString(in.body) {
continue
}
// Send, non-blocking, to subscriber. This ensures that we
// don't get stuck if a subscriber doesn't drain fast enough.
go func(to *subscriber, sid int64) {
glog.V(10).Infof("Dispatching to %x, %v", sid, to.data)
to.data <- in
glog.V(10).Infof("Dispatched to %x", sid)
}(s, sid)
}
}
}
}