| package main |
| |
| import ( |
| "context" |
| "flag" |
| "fmt" |
| "sync" |
| "time" |
| |
| pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto" |
| "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model" |
| "code.hackerspace.pl/hscloud/go/mirko" |
| "code.hackerspace.pl/hscloud/go/pki" |
| "code.hackerspace.pl/hscloud/go/statusz" |
| "github.com/golang/glog" |
| "github.com/lib/pq" |
| "golang.org/x/net/trace" |
| "google.golang.org/grpc" |
| ) |
| |
| type processorState struct { |
| name string |
| p processor |
| lastRun *time.Time |
| lastErr error |
| } |
| |
| type routerState struct { |
| name string |
| last *time.Time |
| version string |
| } |
| |
| func (p *processorState) nextRun() *time.Time { |
| if p.lastRun == nil { |
| return nil |
| } |
| nr := p.p.NextRun(*p.lastRun, p.lastErr != nil) |
| return &nr |
| } |
| |
| type service struct { |
| model model.Model |
| |
| processors map[string]*processorState |
| processorsMu sync.RWMutex |
| |
| routers map[string]*routerState |
| routersLastVersion string |
| routersMu sync.RWMutex |
| |
| requiredChecks []string |
| |
| pgp pb.PGPEncryptorClient |
| } |
| |
| func (s *service) run(ctx context.Context) { |
| t := time.NewTicker(time.Second) |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case <-t.C: |
| break |
| } |
| |
| s.runProcessors(ctx) |
| } |
| } |
| |
| func (s *service) runProcessors(ctx context.Context) { |
| s.processorsMu.RLock() |
| defer s.processorsMu.RUnlock() |
| |
| now := time.Now() |
| |
| for _, p := range s.processors { |
| nr := p.nextRun() |
| if nr == nil || nr.Before(now) { |
| glog.Infof("Running processor %q...", p.name) |
| start := time.Now() |
| |
| tr := trace.New(fmt.Sprintf("processor.%s", p.name), "Run") |
| pctx := trace.NewContext(ctx, tr) |
| err := p.p.RunAll(pctx, s.model) |
| tr.LazyPrintf("Processor done: %v", err) |
| tr.Finish() |
| |
| if err != nil { |
| glog.Errorf("Running processor %q failed: %v", p.name, err) |
| } else { |
| diff := time.Since(start) |
| tr.LazyPrintf("Took %s", diff.String()) |
| glog.Infof("Processor %q took %s", p.name, diff.String()) |
| } |
| p.lastErr = err |
| p.lastRun = &now |
| } |
| } |
| } |
| |
| var ( |
| flagDSN string |
| flagIRR string |
| flagOctoRPKI string |
| flagPGPEncryptor string |
| flagPeeringDB string |
| ) |
| |
| func main() { |
| flag.StringVar(&flagDSN, "dsn", "", "PostrgreSQL connection string") |
| flag.StringVar(&flagIRR, "irr", "", "Address of irr service") |
| flag.StringVar(&flagOctoRPKI, "octorpki", "", "Address of octorpki service") |
| flag.StringVar(&flagPGPEncryptor, "pgpencryptor", "", "Address of pgpencryptor service") |
| flag.StringVar(&flagPeeringDB, "peeringdb", "", "Address of peeringdb service") |
| flag.Parse() |
| |
| // Picking an existing postgres-like driver for sqlx.BindType to work |
| // See: https://github.com/jmoiron/sqlx/blob/ed7c52c43ee1e12a35efbcfea8dbae2d62a90370/bind.go#L24 |
| mirko.TraceSQL(&pq.Driver{}, "pgx") |
| mi := mirko.New() |
| |
| m, err := model.Connect(mi.Context(), "pgx", flagDSN) |
| if err != nil { |
| glog.Exitf("Failed to create model: %v", err) |
| } |
| |
| err = m.MigrateUp() |
| if err != nil { |
| glog.Exitf("Failed to migrate up: %v", err) |
| } |
| |
| if err := mi.Listen(); err != nil { |
| glog.Exitf("Listen failed: %v", err) |
| } |
| |
| conn, err := grpc.Dial(flagPGPEncryptor, pki.WithClientHSPKI()) |
| if err != nil { |
| glog.Exitf("could not connect to pgpencryptor service: %v", err) |
| } |
| |
| s := &service{ |
| model: m, |
| processors: make(map[string]*processorState), |
| requiredChecks: []string{"irr", "pgp"}, |
| routers: make(map[string]*routerState), |
| pgp: pb.NewPGPEncryptorClient(conn), |
| } |
| |
| must := func(p processor, err error) processor { |
| if err != nil { |
| panic(err) |
| } |
| return p |
| } |
| s.addProcessor(must(newPeeringDB(flagPeeringDB))) |
| s.addProcessor(must(newIRR(flagIRR))) |
| s.addProcessor(must(newSecretGen())) |
| s.addProcessor(must(newRPKI(flagOctoRPKI))) |
| s.addProcessor(must(newPGP(s.pgp))) |
| statusz.AddStatusPart("Processors", processorsFragment, s.statuszProcessors) |
| |
| go s.run(mi.Context()) |
| |
| pb.RegisterVerifierServer(mi.GRPC(), s) |
| |
| if err := mi.Serve(); err != nil { |
| glog.Exitf("Serve failed: %v", err) |
| } |
| |
| <-mi.Done() |
| } |
| |
| func (s *service) addProcessor(p processor) { |
| s.processorsMu.Lock() |
| defer s.processorsMu.Unlock() |
| |
| name := p.Name() |
| if _, ok := s.processors[name]; ok { |
| panic(fmt.Sprintf("duplicated processor %q", name)) |
| } |
| s.processors[name] = &processorState{ |
| name: name, |
| p: p, |
| lastRun: nil, |
| } |
| } |