blob: 591ee331f1a81df1696502436e9d8eef0ca08551 [file] [log] [blame]
package main
import (
"context"
"flag"
"fmt"
"sync"
"time"
pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
"code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
"code.hackerspace.pl/hscloud/go/mirko"
"code.hackerspace.pl/hscloud/go/statusz"
"github.com/golang/glog"
"github.com/lib/pq"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type processorState struct {
name string
p processor
lastRun *time.Time
lastErr error
}
func (p *processorState) nextRun() *time.Time {
if p.lastRun == nil {
return nil
}
nr := p.p.NextRun(*p.lastRun)
return &nr
}
type service struct {
model model.Model
processors map[string]*processorState
processorsMu sync.RWMutex
requiredChecks []string
}
func (s *service) run(ctx context.Context) {
t := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-t.C:
break
}
s.runProcessors(ctx)
}
}
func (s *service) runProcessors(ctx context.Context) {
s.processorsMu.RLock()
defer s.processorsMu.RUnlock()
now := time.Now()
for _, p := range s.processors {
nr := p.nextRun()
if nr == nil || nr.Before(now) {
glog.Infof("Running processor %q...", p.name)
tr := trace.New(fmt.Sprintf("processor.%s", p.name), "Run")
pctx := trace.NewContext(ctx, tr)
err := p.p.RunAll(pctx, s.model)
tr.LazyPrintf("Processor done: %v", err)
tr.Finish()
if err != nil {
glog.Errorf("Running processor %q failed: %v", p.name, err)
}
p.lastErr = err
p.lastRun = &now
}
}
}
var (
flagDSN string
flagPeeringDB string
flagIRR string
flagOctoRPKI string
)
func main() {
flag.StringVar(&flagDSN, "dsn", "", "PostrgreSQL connection string")
flag.StringVar(&flagPeeringDB, "peeringdb", "", "Address of peeringdb service")
flag.StringVar(&flagIRR, "irr", "", "Address of irr service")
flag.StringVar(&flagOctoRPKI, "octorpki", "", "Address of octorpki service")
flag.Parse()
// Picking an existing postgres-like driver for sqlx.BindType to work
// See: https://github.com/jmoiron/sqlx/blob/ed7c52c43ee1e12a35efbcfea8dbae2d62a90370/bind.go#L24
mirko.TraceSQL(&pq.Driver{}, "pgx")
mi := mirko.New()
m, err := model.Connect(mi.Context(), "pgx", flagDSN)
if err != nil {
glog.Exitf("Failed to create model: %v", err)
}
err = m.MigrateUp()
if err != nil {
glog.Exitf("Failed to migrate up: %v", err)
}
if err := mi.Listen(); err != nil {
glog.Exitf("Listen failed: %v", err)
}
s := &service{
model: m,
processors: make(map[string]*processorState),
requiredChecks: []string{"irr"},
}
must := func(p processor, err error) processor {
if err != nil {
panic(err)
}
return p
}
s.addProcessor(must(newPeeringDB(flagPeeringDB)))
s.addProcessor(must(newIRR(flagIRR)))
s.addProcessor(must(newSecretGen()))
s.addProcessor(must(newRPKI(flagOctoRPKI)))
statusz.AddStatusPart("Processors", processorsFragment, s.statuszProcessors)
go s.run(mi.Context())
pb.RegisterVerifierServer(mi.GRPC(), s)
if err := mi.Serve(); err != nil {
glog.Exitf("Serve failed: %v", err)
}
<-mi.Done()
}
func (s *service) addProcessor(p processor) {
s.processorsMu.Lock()
defer s.processorsMu.Unlock()
name := p.Name()
if _, ok := s.processors[name]; ok {
panic(fmt.Sprintf("duplicated processor %q", name))
}
s.processors[name] = &processorState{
name: name,
p: p,
lastRun: nil,
}
}
func (s *service) ProcessorStatus(ctx context.Context, req *pb.ProcessorStatusRequest) (*pb.ProcessorStatusResponse, error) {
s.processorsMu.RLock()
defer s.processorsMu.RUnlock()
res := &pb.ProcessorStatusResponse{
Processors: make([]*pb.ProcessorStatusResponse_Processor, len(s.processors)),
}
i := 0
for _, p := range s.processors {
res.Processors[i] = &pb.ProcessorStatusResponse_Processor{
Name: p.name,
Status: pb.ProcessorStatusResponse_Processor_STATUS_OK,
LastRun: 0,
NextRun: 0,
}
if p.lastRun != nil {
res.Processors[i].LastRun = p.lastRun.UnixNano()
res.Processors[i].NextRun = p.p.NextRun(*p.lastRun).UnixNano()
}
if p.lastErr != nil {
res.Processors[i].Status = pb.ProcessorStatusResponse_Processor_STATUS_ERROR
}
i += 1
}
return res, nil
}
func (s *service) PeerSummary(req *pb.PeerSummaryRequest, stream pb.Verifier_PeerSummaryServer) error {
peers, err := s.model.GetCheckablePeers(stream.Context())
if err != nil {
glog.Errorf("model.GetCheckablePeers: %v", err)
return status.Error(codes.Unavailable, "model error")
}
asns := make([]int64, len(peers))
asnToRes := make(map[int64]*pb.PeerSummaryResponse)
for i, peer := range peers {
routers := make([]*pb.PeeringDBMember_Router, len(peer.Routers))
for i, router := range peer.Routers {
routers[i] = &pb.PeeringDBMember_Router{}
if router.V4 != nil {
routers[i].Ipv4 = router.V4.String()
}
if router.V6 != nil {
routers[i].Ipv6 = router.V6.String()
}
}
p := &pb.PeeringDBMember{
Asn: peer.ASN,
Name: peer.Name,
Routers: routers,
}
res := &pb.PeerSummaryResponse{
PeeringdbInfo: p,
CheckStatus: pb.PeerSummaryResponse_STATUS_OK,
}
asnToRes[peer.ASN] = res
asns[i] = peer.ASN
}
checkres, err := s.model.GetPeerCheckResults(stream.Context(), asns)
if err != nil {
glog.Errorf("GetPeerCheckResults(%v): %v", asns, err)
for _, res := range asnToRes {
res.CheckStatus = pb.PeerSummaryResponse_STATUS_UNKNOWN
}
} else {
passedChecks := make(map[int64]map[string]bool)
for _, c := range checkres {
if _, ok := passedChecks[c.PeerASN]; !ok {
passedChecks[c.PeerASN] = make(map[string]bool)
}
passedChecks[c.PeerASN][c.CheckName] = c.Status == model.PeerCheckStatus_Okay
}
for asn, checks := range passedChecks {
for _, required := range s.requiredChecks {
if !checks[required] {
asnToRes[asn].CheckStatus = pb.PeerSummaryResponse_STATUS_FAILED
break
}
}
}
}
for _, res := range asnToRes {
if err := stream.Send(res); err != nil {
return err
}
}
return nil
}
func (s *service) PeerDetails(ctx context.Context, req *pb.PeerDetailsRequest) (*pb.PeerDetailsResponse, error) {
if req.Asn <= 0 {
return nil, status.Error(codes.InvalidArgument, "asn must be set")
}
res := &pb.PeerDetailsResponse{}
peeringdb, err := s.model.GetPeeringDBPeer(ctx, req.Asn)
if err != nil {
glog.Errorf("GetPeeringDBPeer(%v): %v", req.Asn, err)
return nil, status.Error(codes.Unavailable, "could not get allowed prefixes")
}
if peeringdb.Asn != req.Asn {
return nil, status.Error(codes.NotFound, "no such ASN")
}
res.PeeringdbInfo = peeringdb
checkres, err := s.model.GetPeerCheckResults(ctx, []int64{req.Asn})
if err != nil {
glog.Errorf("GetPeerCheckResults(%v): %v", req.Asn, err)
return nil, status.Error(codes.Unavailable, "could not get check results")
}
res.Checks = make([]*pb.PeerDetailsResponse_Check, len(checkres))
for i, check := range checkres {
status := pb.PeerDetailsResponse_Check_STATUS_INVALID
switch check.Status {
case model.PeerCheckStatus_Okay:
status = pb.PeerDetailsResponse_Check_STATUS_OK
case model.PeerCheckStatus_SoftFailed:
status = pb.PeerDetailsResponse_Check_STATUS_OK
case model.PeerCheckStatus_Failed:
status = pb.PeerDetailsResponse_Check_STATUS_FAILED
}
res.Checks[i] = &pb.PeerDetailsResponse_Check{
Name: check.CheckName,
Status: status,
Time: check.Time.UnixNano(),
Msg: check.Message,
}
}
prefixes, err := s.model.GetAllowedPrefixes(ctx, req.Asn)
if err != nil {
glog.Errorf("GetAllowedPrefixes(%v): %v", req.Asn, err)
return nil, status.Error(codes.Unavailable, "could not get allowed prefixes")
}
res.AllowedPrefixes = make([]*pb.PeerDetailsResponse_AllowedPrefix, len(prefixes))
for i, prefix := range prefixes {
res.AllowedPrefixes[i] = &pb.PeerDetailsResponse_AllowedPrefix{
Prefix: prefix.Prefix.String(),
MaxLength: prefix.MaxLength,
Ta: prefix.TA,
}
}
return res, nil
}