blob: 591ee331f1a81df1696502436e9d8eef0ca08551 [file] [log] [blame]
Sergiusz Bazanski1fad2e52019-08-01 20:16:27 +02001package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "sync"
8 "time"
9
10 pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
11 "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
12 "code.hackerspace.pl/hscloud/go/mirko"
13 "code.hackerspace.pl/hscloud/go/statusz"
14 "github.com/golang/glog"
15 "github.com/lib/pq"
16 "golang.org/x/net/trace"
17 "google.golang.org/grpc/codes"
18 "google.golang.org/grpc/status"
19)
20
21type processorState struct {
22 name string
23 p processor
24 lastRun *time.Time
25 lastErr error
26}
27
28func (p *processorState) nextRun() *time.Time {
29 if p.lastRun == nil {
30 return nil
31 }
32 nr := p.p.NextRun(*p.lastRun)
33 return &nr
34}
35
36type service struct {
37 model model.Model
38
39 processors map[string]*processorState
40 processorsMu sync.RWMutex
41
42 requiredChecks []string
43}
44
45func (s *service) run(ctx context.Context) {
46 t := time.NewTicker(time.Second)
47 for {
48 select {
49 case <-ctx.Done():
50 return
51 case <-t.C:
52 break
53 }
54
55 s.runProcessors(ctx)
56 }
57}
58
59func (s *service) runProcessors(ctx context.Context) {
60 s.processorsMu.RLock()
61 defer s.processorsMu.RUnlock()
62
63 now := time.Now()
64
65 for _, p := range s.processors {
66 nr := p.nextRun()
67 if nr == nil || nr.Before(now) {
68 glog.Infof("Running processor %q...", p.name)
69 tr := trace.New(fmt.Sprintf("processor.%s", p.name), "Run")
70 pctx := trace.NewContext(ctx, tr)
71 err := p.p.RunAll(pctx, s.model)
72 tr.LazyPrintf("Processor done: %v", err)
73 tr.Finish()
74 if err != nil {
75 glog.Errorf("Running processor %q failed: %v", p.name, err)
76 }
77 p.lastErr = err
78 p.lastRun = &now
79 }
80 }
81}
82
83var (
84 flagDSN string
85 flagPeeringDB string
86 flagIRR string
87 flagOctoRPKI string
88)
89
90func main() {
91 flag.StringVar(&flagDSN, "dsn", "", "PostrgreSQL connection string")
92 flag.StringVar(&flagPeeringDB, "peeringdb", "", "Address of peeringdb service")
93 flag.StringVar(&flagIRR, "irr", "", "Address of irr service")
94 flag.StringVar(&flagOctoRPKI, "octorpki", "", "Address of octorpki service")
95 flag.Parse()
96
97 // Picking an existing postgres-like driver for sqlx.BindType to work
98 // See: https://github.com/jmoiron/sqlx/blob/ed7c52c43ee1e12a35efbcfea8dbae2d62a90370/bind.go#L24
99 mirko.TraceSQL(&pq.Driver{}, "pgx")
100 mi := mirko.New()
101
102 m, err := model.Connect(mi.Context(), "pgx", flagDSN)
103 if err != nil {
104 glog.Exitf("Failed to create model: %v", err)
105 }
106
107 err = m.MigrateUp()
108 if err != nil {
109 glog.Exitf("Failed to migrate up: %v", err)
110 }
111
112 if err := mi.Listen(); err != nil {
113 glog.Exitf("Listen failed: %v", err)
114 }
115
116 s := &service{
117 model: m,
118 processors: make(map[string]*processorState),
119 requiredChecks: []string{"irr"},
120 }
121
122 must := func(p processor, err error) processor {
123 if err != nil {
124 panic(err)
125 }
126 return p
127 }
128 s.addProcessor(must(newPeeringDB(flagPeeringDB)))
129 s.addProcessor(must(newIRR(flagIRR)))
130 s.addProcessor(must(newSecretGen()))
131 s.addProcessor(must(newRPKI(flagOctoRPKI)))
132 statusz.AddStatusPart("Processors", processorsFragment, s.statuszProcessors)
133
134 go s.run(mi.Context())
135
136 pb.RegisterVerifierServer(mi.GRPC(), s)
137
138 if err := mi.Serve(); err != nil {
139 glog.Exitf("Serve failed: %v", err)
140 }
141
142 <-mi.Done()
143}
144
145func (s *service) addProcessor(p processor) {
146 s.processorsMu.Lock()
147 defer s.processorsMu.Unlock()
148
149 name := p.Name()
150 if _, ok := s.processors[name]; ok {
151 panic(fmt.Sprintf("duplicated processor %q", name))
152 }
153 s.processors[name] = &processorState{
154 name: name,
155 p: p,
156 lastRun: nil,
157 }
158}
159
160func (s *service) ProcessorStatus(ctx context.Context, req *pb.ProcessorStatusRequest) (*pb.ProcessorStatusResponse, error) {
161 s.processorsMu.RLock()
162 defer s.processorsMu.RUnlock()
163
164 res := &pb.ProcessorStatusResponse{
165 Processors: make([]*pb.ProcessorStatusResponse_Processor, len(s.processors)),
166 }
167
168 i := 0
169 for _, p := range s.processors {
170 res.Processors[i] = &pb.ProcessorStatusResponse_Processor{
171 Name: p.name,
172 Status: pb.ProcessorStatusResponse_Processor_STATUS_OK,
173 LastRun: 0,
174 NextRun: 0,
175 }
176
177 if p.lastRun != nil {
178 res.Processors[i].LastRun = p.lastRun.UnixNano()
179 res.Processors[i].NextRun = p.p.NextRun(*p.lastRun).UnixNano()
180 }
181
182 if p.lastErr != nil {
183 res.Processors[i].Status = pb.ProcessorStatusResponse_Processor_STATUS_ERROR
184 }
185
186 i += 1
187 }
188 return res, nil
189}
190
191func (s *service) PeerSummary(req *pb.PeerSummaryRequest, stream pb.Verifier_PeerSummaryServer) error {
192 peers, err := s.model.GetCheckablePeers(stream.Context())
193 if err != nil {
194 glog.Errorf("model.GetCheckablePeers: %v", err)
195 return status.Error(codes.Unavailable, "model error")
196 }
197
198 asns := make([]int64, len(peers))
199 asnToRes := make(map[int64]*pb.PeerSummaryResponse)
200
201 for i, peer := range peers {
202 routers := make([]*pb.PeeringDBMember_Router, len(peer.Routers))
203 for i, router := range peer.Routers {
204 routers[i] = &pb.PeeringDBMember_Router{}
205 if router.V4 != nil {
206 routers[i].Ipv4 = router.V4.String()
207 }
208 if router.V6 != nil {
209 routers[i].Ipv6 = router.V6.String()
210 }
211 }
212 p := &pb.PeeringDBMember{
213 Asn: peer.ASN,
214 Name: peer.Name,
215 Routers: routers,
216 }
217 res := &pb.PeerSummaryResponse{
218 PeeringdbInfo: p,
219 CheckStatus: pb.PeerSummaryResponse_STATUS_OK,
220 }
221 asnToRes[peer.ASN] = res
222 asns[i] = peer.ASN
223 }
224
225 checkres, err := s.model.GetPeerCheckResults(stream.Context(), asns)
226 if err != nil {
227 glog.Errorf("GetPeerCheckResults(%v): %v", asns, err)
228 for _, res := range asnToRes {
229 res.CheckStatus = pb.PeerSummaryResponse_STATUS_UNKNOWN
230 }
231 } else {
232 passedChecks := make(map[int64]map[string]bool)
233 for _, c := range checkres {
234 if _, ok := passedChecks[c.PeerASN]; !ok {
235 passedChecks[c.PeerASN] = make(map[string]bool)
236 }
237 passedChecks[c.PeerASN][c.CheckName] = c.Status == model.PeerCheckStatus_Okay
238 }
239
240 for asn, checks := range passedChecks {
241 for _, required := range s.requiredChecks {
242 if !checks[required] {
243 asnToRes[asn].CheckStatus = pb.PeerSummaryResponse_STATUS_FAILED
244 break
245 }
246 }
247 }
248 }
249
250 for _, res := range asnToRes {
251 if err := stream.Send(res); err != nil {
252 return err
253 }
254 }
255
256 return nil
257}
258
259func (s *service) PeerDetails(ctx context.Context, req *pb.PeerDetailsRequest) (*pb.PeerDetailsResponse, error) {
260 if req.Asn <= 0 {
261 return nil, status.Error(codes.InvalidArgument, "asn must be set")
262 }
263
264 res := &pb.PeerDetailsResponse{}
265
266 peeringdb, err := s.model.GetPeeringDBPeer(ctx, req.Asn)
267 if err != nil {
268 glog.Errorf("GetPeeringDBPeer(%v): %v", req.Asn, err)
269 return nil, status.Error(codes.Unavailable, "could not get allowed prefixes")
270 }
271
272 if peeringdb.Asn != req.Asn {
273 return nil, status.Error(codes.NotFound, "no such ASN")
274 }
275
276 res.PeeringdbInfo = peeringdb
277
278 checkres, err := s.model.GetPeerCheckResults(ctx, []int64{req.Asn})
279 if err != nil {
280 glog.Errorf("GetPeerCheckResults(%v): %v", req.Asn, err)
281 return nil, status.Error(codes.Unavailable, "could not get check results")
282 }
283
284 res.Checks = make([]*pb.PeerDetailsResponse_Check, len(checkres))
285 for i, check := range checkres {
286 status := pb.PeerDetailsResponse_Check_STATUS_INVALID
287 switch check.Status {
288 case model.PeerCheckStatus_Okay:
289 status = pb.PeerDetailsResponse_Check_STATUS_OK
290 case model.PeerCheckStatus_SoftFailed:
291 status = pb.PeerDetailsResponse_Check_STATUS_OK
292 case model.PeerCheckStatus_Failed:
293 status = pb.PeerDetailsResponse_Check_STATUS_FAILED
294 }
295 res.Checks[i] = &pb.PeerDetailsResponse_Check{
296 Name: check.CheckName,
297 Status: status,
298 Time: check.Time.UnixNano(),
299 Msg: check.Message,
300 }
301 }
302
303 prefixes, err := s.model.GetAllowedPrefixes(ctx, req.Asn)
304 if err != nil {
305 glog.Errorf("GetAllowedPrefixes(%v): %v", req.Asn, err)
306 return nil, status.Error(codes.Unavailable, "could not get allowed prefixes")
307 }
308
309 res.AllowedPrefixes = make([]*pb.PeerDetailsResponse_AllowedPrefix, len(prefixes))
310 for i, prefix := range prefixes {
311 res.AllowedPrefixes[i] = &pb.PeerDetailsResponse_AllowedPrefix{
312 Prefix: prefix.Prefix.String(),
313 MaxLength: prefix.MaxLength,
314 Ta: prefix.TA,
315 }
316 }
317
318 return res, nil
319}