blob: 371545ede4d503c69de3a2f8032db3a1acd2625e [file] [log] [blame]
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +02001package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "sync"
8 "time"
9
10 pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
11 "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
12 "code.hackerspace.pl/hscloud/go/mirko"
Serge Bazanskiec71cb52019-08-22 18:13:13 +020013 "code.hackerspace.pl/hscloud/go/pki"
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +020014 "code.hackerspace.pl/hscloud/go/statusz"
15 "github.com/golang/glog"
16 "github.com/lib/pq"
17 "golang.org/x/net/trace"
Serge Bazanskiec71cb52019-08-22 18:13:13 +020018 "google.golang.org/grpc"
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +020019)
20
21type processorState struct {
22 name string
23 p processor
24 lastRun *time.Time
25 lastErr error
26}
27
Serge Bazanskiec71cb52019-08-22 18:13:13 +020028type routerState struct {
29 name string
30 last *time.Time
31 version string
32}
33
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +020034func (p *processorState) nextRun() *time.Time {
35 if p.lastRun == nil {
36 return nil
37 }
Serge Bazanskiec71cb52019-08-22 18:13:13 +020038 nr := p.p.NextRun(*p.lastRun, p.lastErr != nil)
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +020039 return &nr
40}
41
42type service struct {
43 model model.Model
44
45 processors map[string]*processorState
46 processorsMu sync.RWMutex
47
Serge Bazanskiec71cb52019-08-22 18:13:13 +020048 routers map[string]*routerState
49 routersLastVersion string
50 routersMu sync.RWMutex
51
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +020052 requiredChecks []string
Serge Bazanskiec71cb52019-08-22 18:13:13 +020053
54 pgp pb.PGPEncryptorClient
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +020055}
56
57func (s *service) run(ctx context.Context) {
58 t := time.NewTicker(time.Second)
59 for {
60 select {
61 case <-ctx.Done():
62 return
63 case <-t.C:
64 break
65 }
66
67 s.runProcessors(ctx)
68 }
69}
70
71func (s *service) runProcessors(ctx context.Context) {
72 s.processorsMu.RLock()
73 defer s.processorsMu.RUnlock()
74
75 now := time.Now()
76
77 for _, p := range s.processors {
78 nr := p.nextRun()
79 if nr == nil || nr.Before(now) {
80 glog.Infof("Running processor %q...", p.name)
Serge Bazanskiec71cb52019-08-22 18:13:13 +020081 start := time.Now()
82
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +020083 tr := trace.New(fmt.Sprintf("processor.%s", p.name), "Run")
84 pctx := trace.NewContext(ctx, tr)
85 err := p.p.RunAll(pctx, s.model)
86 tr.LazyPrintf("Processor done: %v", err)
87 tr.Finish()
Serge Bazanskiec71cb52019-08-22 18:13:13 +020088
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +020089 if err != nil {
90 glog.Errorf("Running processor %q failed: %v", p.name, err)
Serge Bazanskiec71cb52019-08-22 18:13:13 +020091 } else {
92 diff := time.Since(start)
93 tr.LazyPrintf("Took %s", diff.String())
94 glog.Infof("Processor %q took %s", p.name, diff.String())
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +020095 }
96 p.lastErr = err
97 p.lastRun = &now
98 }
99 }
100}
101
102var (
Serge Bazanskiec71cb52019-08-22 18:13:13 +0200103 flagDSN string
104 flagIRR string
105 flagOctoRPKI string
106 flagPGPEncryptor string
107 flagPeeringDB string
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +0200108)
109
110func main() {
111 flag.StringVar(&flagDSN, "dsn", "", "PostrgreSQL connection string")
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +0200112 flag.StringVar(&flagIRR, "irr", "", "Address of irr service")
113 flag.StringVar(&flagOctoRPKI, "octorpki", "", "Address of octorpki service")
Serge Bazanskiec71cb52019-08-22 18:13:13 +0200114 flag.StringVar(&flagPGPEncryptor, "pgpencryptor", "", "Address of pgpencryptor service")
115 flag.StringVar(&flagPeeringDB, "peeringdb", "", "Address of peeringdb service")
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +0200116 flag.Parse()
117
118 // Picking an existing postgres-like driver for sqlx.BindType to work
119 // See: https://github.com/jmoiron/sqlx/blob/ed7c52c43ee1e12a35efbcfea8dbae2d62a90370/bind.go#L24
120 mirko.TraceSQL(&pq.Driver{}, "pgx")
121 mi := mirko.New()
122
123 m, err := model.Connect(mi.Context(), "pgx", flagDSN)
124 if err != nil {
125 glog.Exitf("Failed to create model: %v", err)
126 }
127
128 err = m.MigrateUp()
129 if err != nil {
130 glog.Exitf("Failed to migrate up: %v", err)
131 }
132
133 if err := mi.Listen(); err != nil {
134 glog.Exitf("Listen failed: %v", err)
135 }
136
Serge Bazanskiec71cb52019-08-22 18:13:13 +0200137 conn, err := grpc.Dial(flagPGPEncryptor, pki.WithClientHSPKI())
138 if err != nil {
139 glog.Exitf("could not connect to pgpencryptor service: %v", err)
140 }
141
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +0200142 s := &service{
143 model: m,
144 processors: make(map[string]*processorState),
Serge Bazanskiec71cb52019-08-22 18:13:13 +0200145 requiredChecks: []string{"irr", "pgp"},
146 routers: make(map[string]*routerState),
147 pgp: pb.NewPGPEncryptorClient(conn),
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +0200148 }
149
150 must := func(p processor, err error) processor {
151 if err != nil {
152 panic(err)
153 }
154 return p
155 }
156 s.addProcessor(must(newPeeringDB(flagPeeringDB)))
157 s.addProcessor(must(newIRR(flagIRR)))
158 s.addProcessor(must(newSecretGen()))
159 s.addProcessor(must(newRPKI(flagOctoRPKI)))
Serge Bazanskiec71cb52019-08-22 18:13:13 +0200160 s.addProcessor(must(newPGP(s.pgp)))
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +0200161 statusz.AddStatusPart("Processors", processorsFragment, s.statuszProcessors)
162
163 go s.run(mi.Context())
164
165 pb.RegisterVerifierServer(mi.GRPC(), s)
166
167 if err := mi.Serve(); err != nil {
168 glog.Exitf("Serve failed: %v", err)
169 }
170
171 <-mi.Done()
172}
173
174func (s *service) addProcessor(p processor) {
175 s.processorsMu.Lock()
176 defer s.processorsMu.Unlock()
177
178 name := p.Name()
179 if _, ok := s.processors[name]; ok {
180 panic(fmt.Sprintf("duplicated processor %q", name))
181 }
182 s.processors[name] = &processorState{
183 name: name,
184 p: p,
185 lastRun: nil,
186 }
187}