| package main |
| |
| import ( |
| "context" |
| "regexp" |
| "time" |
| |
| "github.com/golang/glog" |
| ) |
| |
| // dispatcher is responsible for dispatching incoming SMS messages to subscribers |
| // that have chosen to receive them, filtering accordingly. |
| type dispatcher struct { |
| // New SMS messages to be dispatched. |
| incoming chan *sms |
| // New subscribers to send messages to. |
| subscribers chan *subscriber |
| } |
| |
| // newDispatcher creates a new dispatcher. |
| func newDispatcher() *dispatcher { |
| return &dispatcher{ |
| incoming: make(chan *sms), |
| subscribers: make(chan *subscriber), |
| } |
| } |
| |
| // sms received from the upstream provider. |
| type sms struct { |
| from string |
| body string |
| timestamp time.Time |
| } |
| |
| // subscriber that wants to receive messages with a given body filter. |
| type subscriber struct { |
| // regexp to filter message body by |
| re *regexp.Regexp |
| // channel to which messages will be sent, must be emptied regularly by the |
| // subscriber. |
| data chan *sms |
| // channel that needs to be closed when the subscriber doesn't want to receive |
| // any more messages. |
| cancel chan struct{} |
| } |
| |
| func (p *dispatcher) publish(msg *sms) { |
| p.incoming <- msg |
| } |
| |
| func (p *dispatcher) subscribe(sub *subscriber) { |
| p.subscribers <- sub |
| } |
| |
| func (p *dispatcher) run(ctx context.Context) { |
| // Map of internal IDs to subscribers. Internal IDs are used to remove |
| // canceled subscribers easily. |
| subscriberMap := make(map[int64]*subscriber) |
| // Internal channel that will emit SIDs of subscribers that needs to be |
| // removed. |
| subscriberCancel := make(chan int64) |
| |
| for { |
| select { |
| |
| // Should the processor close? |
| case <-ctx.Done(): |
| return |
| |
| // Do we need to remove a given subscriber? |
| case sid := <-subscriberCancel: |
| delete(subscriberMap, sid) |
| |
| // Do we have a new subscriber? |
| case sub := <-p.subscribers: |
| // Generate a SID. A UNIX nanosecond timestamp is enough, since |
| // we're not running in parallel. |
| sid := time.Now().UnixNano() |
| glog.V(5).Infof("New subscriber %x, regexp %v", sid, sub.re) |
| |
| // Add to subscriber map. |
| subscriberMap[sid] = sub |
| |
| // On sub.cancel closed, emit info that we need to delete that |
| // subscriber. |
| go func() { |
| _, _ = <-sub.cancel |
| subscriberCancel <- sid |
| }() |
| |
| // Do we have a new message to dispatch? |
| case in := <-p.incoming: |
| for sid, s := range subscriberMap { |
| glog.V(10).Infof("Considering %x", sid) |
| // If this subscriber doesn't care, ignore. |
| if !s.re.MatchString(in.body) { |
| continue |
| } |
| |
| // Send, non-blocking, to subscriber. This ensures that we |
| // don't get stuck if a subscriber doesn't drain fast enough. |
| go func(to *subscriber, sid int64) { |
| glog.V(10).Infof("Dispatching to %x, %v", sid, to.data) |
| to.data <- in |
| glog.V(10).Infof("Dispatched to %x", sid) |
| }(s, sid) |
| } |
| } |
| } |
| } |