bgpwtf/cccampix: draw the rest of the fucking owl

Change-Id: I49fd5906e69512e8f2d414f406edc0179522f225
diff --git a/bgpwtf/cccampix/verifier/main.go b/bgpwtf/cccampix/verifier/main.go
new file mode 100644
index 0000000..591ee33
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/main.go
@@ -0,0 +1,319 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"sync"
+	"time"
+
+	pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
+	"code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
+	"code.hackerspace.pl/hscloud/go/mirko"
+	"code.hackerspace.pl/hscloud/go/statusz"
+	"github.com/golang/glog"
+	"github.com/lib/pq"
+	"golang.org/x/net/trace"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+type processorState struct {
+	name    string
+	p       processor
+	lastRun *time.Time
+	lastErr error
+}
+
+func (p *processorState) nextRun() *time.Time {
+	if p.lastRun == nil {
+		return nil
+	}
+	nr := p.p.NextRun(*p.lastRun)
+	return &nr
+}
+
+type service struct {
+	model model.Model
+
+	processors   map[string]*processorState
+	processorsMu sync.RWMutex
+
+	requiredChecks []string
+}
+
+func (s *service) run(ctx context.Context) {
+	t := time.NewTicker(time.Second)
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case <-t.C:
+			break
+		}
+
+		s.runProcessors(ctx)
+	}
+}
+
+func (s *service) runProcessors(ctx context.Context) {
+	s.processorsMu.RLock()
+	defer s.processorsMu.RUnlock()
+
+	now := time.Now()
+
+	for _, p := range s.processors {
+		nr := p.nextRun()
+		if nr == nil || nr.Before(now) {
+			glog.Infof("Running processor %q...", p.name)
+			tr := trace.New(fmt.Sprintf("processor.%s", p.name), "Run")
+			pctx := trace.NewContext(ctx, tr)
+			err := p.p.RunAll(pctx, s.model)
+			tr.LazyPrintf("Processor done: %v", err)
+			tr.Finish()
+			if err != nil {
+				glog.Errorf("Running processor %q failed: %v", p.name, err)
+			}
+			p.lastErr = err
+			p.lastRun = &now
+		}
+	}
+}
+
+var (
+	flagDSN       string
+	flagPeeringDB string
+	flagIRR       string
+	flagOctoRPKI  string
+)
+
+func main() {
+	flag.StringVar(&flagDSN, "dsn", "", "PostrgreSQL connection string")
+	flag.StringVar(&flagPeeringDB, "peeringdb", "", "Address of peeringdb service")
+	flag.StringVar(&flagIRR, "irr", "", "Address of irr service")
+	flag.StringVar(&flagOctoRPKI, "octorpki", "", "Address of octorpki service")
+	flag.Parse()
+
+	// Picking an existing postgres-like driver for sqlx.BindType to work
+	// See: https://github.com/jmoiron/sqlx/blob/ed7c52c43ee1e12a35efbcfea8dbae2d62a90370/bind.go#L24
+	mirko.TraceSQL(&pq.Driver{}, "pgx")
+	mi := mirko.New()
+
+	m, err := model.Connect(mi.Context(), "pgx", flagDSN)
+	if err != nil {
+		glog.Exitf("Failed to create model: %v", err)
+	}
+
+	err = m.MigrateUp()
+	if err != nil {
+		glog.Exitf("Failed to migrate up: %v", err)
+	}
+
+	if err := mi.Listen(); err != nil {
+		glog.Exitf("Listen failed: %v", err)
+	}
+
+	s := &service{
+		model:          m,
+		processors:     make(map[string]*processorState),
+		requiredChecks: []string{"irr"},
+	}
+
+	must := func(p processor, err error) processor {
+		if err != nil {
+			panic(err)
+		}
+		return p
+	}
+	s.addProcessor(must(newPeeringDB(flagPeeringDB)))
+	s.addProcessor(must(newIRR(flagIRR)))
+	s.addProcessor(must(newSecretGen()))
+	s.addProcessor(must(newRPKI(flagOctoRPKI)))
+	statusz.AddStatusPart("Processors", processorsFragment, s.statuszProcessors)
+
+	go s.run(mi.Context())
+
+	pb.RegisterVerifierServer(mi.GRPC(), s)
+
+	if err := mi.Serve(); err != nil {
+		glog.Exitf("Serve failed: %v", err)
+	}
+
+	<-mi.Done()
+}
+
+func (s *service) addProcessor(p processor) {
+	s.processorsMu.Lock()
+	defer s.processorsMu.Unlock()
+
+	name := p.Name()
+	if _, ok := s.processors[name]; ok {
+		panic(fmt.Sprintf("duplicated processor %q", name))
+	}
+	s.processors[name] = &processorState{
+		name:    name,
+		p:       p,
+		lastRun: nil,
+	}
+}
+
+func (s *service) ProcessorStatus(ctx context.Context, req *pb.ProcessorStatusRequest) (*pb.ProcessorStatusResponse, error) {
+	s.processorsMu.RLock()
+	defer s.processorsMu.RUnlock()
+
+	res := &pb.ProcessorStatusResponse{
+		Processors: make([]*pb.ProcessorStatusResponse_Processor, len(s.processors)),
+	}
+
+	i := 0
+	for _, p := range s.processors {
+		res.Processors[i] = &pb.ProcessorStatusResponse_Processor{
+			Name:    p.name,
+			Status:  pb.ProcessorStatusResponse_Processor_STATUS_OK,
+			LastRun: 0,
+			NextRun: 0,
+		}
+
+		if p.lastRun != nil {
+			res.Processors[i].LastRun = p.lastRun.UnixNano()
+			res.Processors[i].NextRun = p.p.NextRun(*p.lastRun).UnixNano()
+		}
+
+		if p.lastErr != nil {
+			res.Processors[i].Status = pb.ProcessorStatusResponse_Processor_STATUS_ERROR
+		}
+
+		i += 1
+	}
+	return res, nil
+}
+
+func (s *service) PeerSummary(req *pb.PeerSummaryRequest, stream pb.Verifier_PeerSummaryServer) error {
+	peers, err := s.model.GetCheckablePeers(stream.Context())
+	if err != nil {
+		glog.Errorf("model.GetCheckablePeers: %v", err)
+		return status.Error(codes.Unavailable, "model error")
+	}
+
+	asns := make([]int64, len(peers))
+	asnToRes := make(map[int64]*pb.PeerSummaryResponse)
+
+	for i, peer := range peers {
+		routers := make([]*pb.PeeringDBMember_Router, len(peer.Routers))
+		for i, router := range peer.Routers {
+			routers[i] = &pb.PeeringDBMember_Router{}
+			if router.V4 != nil {
+				routers[i].Ipv4 = router.V4.String()
+			}
+			if router.V6 != nil {
+				routers[i].Ipv6 = router.V6.String()
+			}
+		}
+		p := &pb.PeeringDBMember{
+			Asn:     peer.ASN,
+			Name:    peer.Name,
+			Routers: routers,
+		}
+		res := &pb.PeerSummaryResponse{
+			PeeringdbInfo: p,
+			CheckStatus:   pb.PeerSummaryResponse_STATUS_OK,
+		}
+		asnToRes[peer.ASN] = res
+		asns[i] = peer.ASN
+	}
+
+	checkres, err := s.model.GetPeerCheckResults(stream.Context(), asns)
+	if err != nil {
+		glog.Errorf("GetPeerCheckResults(%v): %v", asns, err)
+		for _, res := range asnToRes {
+			res.CheckStatus = pb.PeerSummaryResponse_STATUS_UNKNOWN
+		}
+	} else {
+		passedChecks := make(map[int64]map[string]bool)
+		for _, c := range checkres {
+			if _, ok := passedChecks[c.PeerASN]; !ok {
+				passedChecks[c.PeerASN] = make(map[string]bool)
+			}
+			passedChecks[c.PeerASN][c.CheckName] = c.Status == model.PeerCheckStatus_Okay
+		}
+
+		for asn, checks := range passedChecks {
+			for _, required := range s.requiredChecks {
+				if !checks[required] {
+					asnToRes[asn].CheckStatus = pb.PeerSummaryResponse_STATUS_FAILED
+					break
+				}
+			}
+		}
+	}
+
+	for _, res := range asnToRes {
+		if err := stream.Send(res); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (s *service) PeerDetails(ctx context.Context, req *pb.PeerDetailsRequest) (*pb.PeerDetailsResponse, error) {
+	if req.Asn <= 0 {
+		return nil, status.Error(codes.InvalidArgument, "asn must be set")
+	}
+
+	res := &pb.PeerDetailsResponse{}
+
+	peeringdb, err := s.model.GetPeeringDBPeer(ctx, req.Asn)
+	if err != nil {
+		glog.Errorf("GetPeeringDBPeer(%v): %v", req.Asn, err)
+		return nil, status.Error(codes.Unavailable, "could not get allowed prefixes")
+	}
+
+	if peeringdb.Asn != req.Asn {
+		return nil, status.Error(codes.NotFound, "no such ASN")
+	}
+
+	res.PeeringdbInfo = peeringdb
+
+	checkres, err := s.model.GetPeerCheckResults(ctx, []int64{req.Asn})
+	if err != nil {
+		glog.Errorf("GetPeerCheckResults(%v): %v", req.Asn, err)
+		return nil, status.Error(codes.Unavailable, "could not get check results")
+	}
+
+	res.Checks = make([]*pb.PeerDetailsResponse_Check, len(checkres))
+	for i, check := range checkres {
+		status := pb.PeerDetailsResponse_Check_STATUS_INVALID
+		switch check.Status {
+		case model.PeerCheckStatus_Okay:
+			status = pb.PeerDetailsResponse_Check_STATUS_OK
+		case model.PeerCheckStatus_SoftFailed:
+			status = pb.PeerDetailsResponse_Check_STATUS_OK
+		case model.PeerCheckStatus_Failed:
+			status = pb.PeerDetailsResponse_Check_STATUS_FAILED
+		}
+		res.Checks[i] = &pb.PeerDetailsResponse_Check{
+			Name:   check.CheckName,
+			Status: status,
+			Time:   check.Time.UnixNano(),
+			Msg:    check.Message,
+		}
+	}
+
+	prefixes, err := s.model.GetAllowedPrefixes(ctx, req.Asn)
+	if err != nil {
+		glog.Errorf("GetAllowedPrefixes(%v): %v", req.Asn, err)
+		return nil, status.Error(codes.Unavailable, "could not get allowed prefixes")
+	}
+
+	res.AllowedPrefixes = make([]*pb.PeerDetailsResponse_AllowedPrefix, len(prefixes))
+	for i, prefix := range prefixes {
+		res.AllowedPrefixes[i] = &pb.PeerDetailsResponse_AllowedPrefix{
+			Prefix:    prefix.Prefix.String(),
+			MaxLength: prefix.MaxLength,
+			Ta:        prefix.TA,
+		}
+	}
+
+	return res, nil
+}