blob: 371545ede4d503c69de3a2f8032db3a1acd2625e [file] [log] [blame]
package main
import (
"context"
"flag"
"fmt"
"sync"
"time"
pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
"code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
"code.hackerspace.pl/hscloud/go/mirko"
"code.hackerspace.pl/hscloud/go/pki"
"code.hackerspace.pl/hscloud/go/statusz"
"github.com/golang/glog"
"github.com/lib/pq"
"golang.org/x/net/trace"
"google.golang.org/grpc"
)
type processorState struct {
name string
p processor
lastRun *time.Time
lastErr error
}
type routerState struct {
name string
last *time.Time
version string
}
func (p *processorState) nextRun() *time.Time {
if p.lastRun == nil {
return nil
}
nr := p.p.NextRun(*p.lastRun, p.lastErr != nil)
return &nr
}
type service struct {
model model.Model
processors map[string]*processorState
processorsMu sync.RWMutex
routers map[string]*routerState
routersLastVersion string
routersMu sync.RWMutex
requiredChecks []string
pgp pb.PGPEncryptorClient
}
func (s *service) run(ctx context.Context) {
t := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-t.C:
break
}
s.runProcessors(ctx)
}
}
func (s *service) runProcessors(ctx context.Context) {
s.processorsMu.RLock()
defer s.processorsMu.RUnlock()
now := time.Now()
for _, p := range s.processors {
nr := p.nextRun()
if nr == nil || nr.Before(now) {
glog.Infof("Running processor %q...", p.name)
start := time.Now()
tr := trace.New(fmt.Sprintf("processor.%s", p.name), "Run")
pctx := trace.NewContext(ctx, tr)
err := p.p.RunAll(pctx, s.model)
tr.LazyPrintf("Processor done: %v", err)
tr.Finish()
if err != nil {
glog.Errorf("Running processor %q failed: %v", p.name, err)
} else {
diff := time.Since(start)
tr.LazyPrintf("Took %s", diff.String())
glog.Infof("Processor %q took %s", p.name, diff.String())
}
p.lastErr = err
p.lastRun = &now
}
}
}
var (
flagDSN string
flagIRR string
flagOctoRPKI string
flagPGPEncryptor string
flagPeeringDB string
)
func main() {
flag.StringVar(&flagDSN, "dsn", "", "PostrgreSQL connection string")
flag.StringVar(&flagIRR, "irr", "", "Address of irr service")
flag.StringVar(&flagOctoRPKI, "octorpki", "", "Address of octorpki service")
flag.StringVar(&flagPGPEncryptor, "pgpencryptor", "", "Address of pgpencryptor service")
flag.StringVar(&flagPeeringDB, "peeringdb", "", "Address of peeringdb service")
flag.Parse()
// Picking an existing postgres-like driver for sqlx.BindType to work
// See: https://github.com/jmoiron/sqlx/blob/ed7c52c43ee1e12a35efbcfea8dbae2d62a90370/bind.go#L24
mirko.TraceSQL(&pq.Driver{}, "pgx")
mi := mirko.New()
m, err := model.Connect(mi.Context(), "pgx", flagDSN)
if err != nil {
glog.Exitf("Failed to create model: %v", err)
}
err = m.MigrateUp()
if err != nil {
glog.Exitf("Failed to migrate up: %v", err)
}
if err := mi.Listen(); err != nil {
glog.Exitf("Listen failed: %v", err)
}
conn, err := grpc.Dial(flagPGPEncryptor, pki.WithClientHSPKI())
if err != nil {
glog.Exitf("could not connect to pgpencryptor service: %v", err)
}
s := &service{
model: m,
processors: make(map[string]*processorState),
requiredChecks: []string{"irr", "pgp"},
routers: make(map[string]*routerState),
pgp: pb.NewPGPEncryptorClient(conn),
}
must := func(p processor, err error) processor {
if err != nil {
panic(err)
}
return p
}
s.addProcessor(must(newPeeringDB(flagPeeringDB)))
s.addProcessor(must(newIRR(flagIRR)))
s.addProcessor(must(newSecretGen()))
s.addProcessor(must(newRPKI(flagOctoRPKI)))
s.addProcessor(must(newPGP(s.pgp)))
statusz.AddStatusPart("Processors", processorsFragment, s.statuszProcessors)
go s.run(mi.Context())
pb.RegisterVerifierServer(mi.GRPC(), s)
if err := mi.Serve(); err != nil {
glog.Exitf("Serve failed: %v", err)
}
<-mi.Done()
}
func (s *service) addProcessor(p processor) {
s.processorsMu.Lock()
defer s.processorsMu.Unlock()
name := p.Name()
if _, ok := s.processors[name]; ok {
panic(fmt.Sprintf("duplicated processor %q", name))
}
s.processors[name] = &processorState{
name: name,
p: p,
lastRun: nil,
}
}