| package main |
| |
| import ( |
| "context" |
| "flag" |
| "fmt" |
| "sync" |
| "time" |
| |
| pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto" |
| "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model" |
| "code.hackerspace.pl/hscloud/go/mirko" |
| "code.hackerspace.pl/hscloud/go/statusz" |
| "github.com/golang/glog" |
| "github.com/lib/pq" |
| "golang.org/x/net/trace" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| type processorState struct { |
| name string |
| p processor |
| lastRun *time.Time |
| lastErr error |
| } |
| |
| func (p *processorState) nextRun() *time.Time { |
| if p.lastRun == nil { |
| return nil |
| } |
| nr := p.p.NextRun(*p.lastRun) |
| return &nr |
| } |
| |
| type service struct { |
| model model.Model |
| |
| processors map[string]*processorState |
| processorsMu sync.RWMutex |
| |
| requiredChecks []string |
| } |
| |
| func (s *service) run(ctx context.Context) { |
| t := time.NewTicker(time.Second) |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case <-t.C: |
| break |
| } |
| |
| s.runProcessors(ctx) |
| } |
| } |
| |
| func (s *service) runProcessors(ctx context.Context) { |
| s.processorsMu.RLock() |
| defer s.processorsMu.RUnlock() |
| |
| now := time.Now() |
| |
| for _, p := range s.processors { |
| nr := p.nextRun() |
| if nr == nil || nr.Before(now) { |
| glog.Infof("Running processor %q...", p.name) |
| tr := trace.New(fmt.Sprintf("processor.%s", p.name), "Run") |
| pctx := trace.NewContext(ctx, tr) |
| err := p.p.RunAll(pctx, s.model) |
| tr.LazyPrintf("Processor done: %v", err) |
| tr.Finish() |
| if err != nil { |
| glog.Errorf("Running processor %q failed: %v", p.name, err) |
| } |
| p.lastErr = err |
| p.lastRun = &now |
| } |
| } |
| } |
| |
| var ( |
| flagDSN string |
| flagPeeringDB string |
| flagIRR string |
| flagOctoRPKI string |
| ) |
| |
| func main() { |
| flag.StringVar(&flagDSN, "dsn", "", "PostrgreSQL connection string") |
| flag.StringVar(&flagPeeringDB, "peeringdb", "", "Address of peeringdb service") |
| flag.StringVar(&flagIRR, "irr", "", "Address of irr service") |
| flag.StringVar(&flagOctoRPKI, "octorpki", "", "Address of octorpki service") |
| flag.Parse() |
| |
| // Picking an existing postgres-like driver for sqlx.BindType to work |
| // See: https://github.com/jmoiron/sqlx/blob/ed7c52c43ee1e12a35efbcfea8dbae2d62a90370/bind.go#L24 |
| mirko.TraceSQL(&pq.Driver{}, "pgx") |
| mi := mirko.New() |
| |
| m, err := model.Connect(mi.Context(), "pgx", flagDSN) |
| if err != nil { |
| glog.Exitf("Failed to create model: %v", err) |
| } |
| |
| err = m.MigrateUp() |
| if err != nil { |
| glog.Exitf("Failed to migrate up: %v", err) |
| } |
| |
| if err := mi.Listen(); err != nil { |
| glog.Exitf("Listen failed: %v", err) |
| } |
| |
| s := &service{ |
| model: m, |
| processors: make(map[string]*processorState), |
| requiredChecks: []string{"irr"}, |
| } |
| |
| must := func(p processor, err error) processor { |
| if err != nil { |
| panic(err) |
| } |
| return p |
| } |
| s.addProcessor(must(newPeeringDB(flagPeeringDB))) |
| s.addProcessor(must(newIRR(flagIRR))) |
| s.addProcessor(must(newSecretGen())) |
| s.addProcessor(must(newRPKI(flagOctoRPKI))) |
| statusz.AddStatusPart("Processors", processorsFragment, s.statuszProcessors) |
| |
| go s.run(mi.Context()) |
| |
| pb.RegisterVerifierServer(mi.GRPC(), s) |
| |
| if err := mi.Serve(); err != nil { |
| glog.Exitf("Serve failed: %v", err) |
| } |
| |
| <-mi.Done() |
| } |
| |
| func (s *service) addProcessor(p processor) { |
| s.processorsMu.Lock() |
| defer s.processorsMu.Unlock() |
| |
| name := p.Name() |
| if _, ok := s.processors[name]; ok { |
| panic(fmt.Sprintf("duplicated processor %q", name)) |
| } |
| s.processors[name] = &processorState{ |
| name: name, |
| p: p, |
| lastRun: nil, |
| } |
| } |
| |
| func (s *service) ProcessorStatus(ctx context.Context, req *pb.ProcessorStatusRequest) (*pb.ProcessorStatusResponse, error) { |
| s.processorsMu.RLock() |
| defer s.processorsMu.RUnlock() |
| |
| res := &pb.ProcessorStatusResponse{ |
| Processors: make([]*pb.ProcessorStatusResponse_Processor, len(s.processors)), |
| } |
| |
| i := 0 |
| for _, p := range s.processors { |
| res.Processors[i] = &pb.ProcessorStatusResponse_Processor{ |
| Name: p.name, |
| Status: pb.ProcessorStatusResponse_Processor_STATUS_OK, |
| LastRun: 0, |
| NextRun: 0, |
| } |
| |
| if p.lastRun != nil { |
| res.Processors[i].LastRun = p.lastRun.UnixNano() |
| res.Processors[i].NextRun = p.p.NextRun(*p.lastRun).UnixNano() |
| } |
| |
| if p.lastErr != nil { |
| res.Processors[i].Status = pb.ProcessorStatusResponse_Processor_STATUS_ERROR |
| } |
| |
| i += 1 |
| } |
| return res, nil |
| } |
| |
| func (s *service) PeerSummary(req *pb.PeerSummaryRequest, stream pb.Verifier_PeerSummaryServer) error { |
| peers, err := s.model.GetCheckablePeers(stream.Context()) |
| if err != nil { |
| glog.Errorf("model.GetCheckablePeers: %v", err) |
| return status.Error(codes.Unavailable, "model error") |
| } |
| |
| asns := make([]int64, len(peers)) |
| asnToRes := make(map[int64]*pb.PeerSummaryResponse) |
| |
| for i, peer := range peers { |
| routers := make([]*pb.PeeringDBMember_Router, len(peer.Routers)) |
| for i, router := range peer.Routers { |
| routers[i] = &pb.PeeringDBMember_Router{} |
| if router.V4 != nil { |
| routers[i].Ipv4 = router.V4.String() |
| } |
| if router.V6 != nil { |
| routers[i].Ipv6 = router.V6.String() |
| } |
| } |
| p := &pb.PeeringDBMember{ |
| Asn: peer.ASN, |
| Name: peer.Name, |
| Routers: routers, |
| } |
| res := &pb.PeerSummaryResponse{ |
| PeeringdbInfo: p, |
| CheckStatus: pb.PeerSummaryResponse_STATUS_OK, |
| } |
| asnToRes[peer.ASN] = res |
| asns[i] = peer.ASN |
| } |
| |
| checkres, err := s.model.GetPeerCheckResults(stream.Context(), asns) |
| if err != nil { |
| glog.Errorf("GetPeerCheckResults(%v): %v", asns, err) |
| for _, res := range asnToRes { |
| res.CheckStatus = pb.PeerSummaryResponse_STATUS_UNKNOWN |
| } |
| } else { |
| passedChecks := make(map[int64]map[string]bool) |
| for _, c := range checkres { |
| if _, ok := passedChecks[c.PeerASN]; !ok { |
| passedChecks[c.PeerASN] = make(map[string]bool) |
| } |
| passedChecks[c.PeerASN][c.CheckName] = c.Status == model.PeerCheckStatus_Okay |
| } |
| |
| for asn, checks := range passedChecks { |
| for _, required := range s.requiredChecks { |
| if !checks[required] { |
| asnToRes[asn].CheckStatus = pb.PeerSummaryResponse_STATUS_FAILED |
| break |
| } |
| } |
| } |
| } |
| |
| for _, res := range asnToRes { |
| if err := stream.Send(res); err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| func (s *service) PeerDetails(ctx context.Context, req *pb.PeerDetailsRequest) (*pb.PeerDetailsResponse, error) { |
| if req.Asn <= 0 { |
| return nil, status.Error(codes.InvalidArgument, "asn must be set") |
| } |
| |
| res := &pb.PeerDetailsResponse{} |
| |
| peeringdb, err := s.model.GetPeeringDBPeer(ctx, req.Asn) |
| if err != nil { |
| glog.Errorf("GetPeeringDBPeer(%v): %v", req.Asn, err) |
| return nil, status.Error(codes.Unavailable, "could not get allowed prefixes") |
| } |
| |
| if peeringdb.Asn != req.Asn { |
| return nil, status.Error(codes.NotFound, "no such ASN") |
| } |
| |
| res.PeeringdbInfo = peeringdb |
| |
| checkres, err := s.model.GetPeerCheckResults(ctx, []int64{req.Asn}) |
| if err != nil { |
| glog.Errorf("GetPeerCheckResults(%v): %v", req.Asn, err) |
| return nil, status.Error(codes.Unavailable, "could not get check results") |
| } |
| |
| res.Checks = make([]*pb.PeerDetailsResponse_Check, len(checkres)) |
| for i, check := range checkres { |
| status := pb.PeerDetailsResponse_Check_STATUS_INVALID |
| switch check.Status { |
| case model.PeerCheckStatus_Okay: |
| status = pb.PeerDetailsResponse_Check_STATUS_OK |
| case model.PeerCheckStatus_SoftFailed: |
| status = pb.PeerDetailsResponse_Check_STATUS_OK |
| case model.PeerCheckStatus_Failed: |
| status = pb.PeerDetailsResponse_Check_STATUS_FAILED |
| } |
| res.Checks[i] = &pb.PeerDetailsResponse_Check{ |
| Name: check.CheckName, |
| Status: status, |
| Time: check.Time.UnixNano(), |
| Msg: check.Message, |
| } |
| } |
| |
| prefixes, err := s.model.GetAllowedPrefixes(ctx, req.Asn) |
| if err != nil { |
| glog.Errorf("GetAllowedPrefixes(%v): %v", req.Asn, err) |
| return nil, status.Error(codes.Unavailable, "could not get allowed prefixes") |
| } |
| |
| res.AllowedPrefixes = make([]*pb.PeerDetailsResponse_AllowedPrefix, len(prefixes)) |
| for i, prefix := range prefixes { |
| res.AllowedPrefixes[i] = &pb.PeerDetailsResponse_AllowedPrefix{ |
| Prefix: prefix.Prefix.String(), |
| MaxLength: prefix.MaxLength, |
| Ta: prefix.TA, |
| } |
| } |
| |
| return res, nil |
| } |