bgpwtf/cccampix: draw the rest of the fucking owl
Change-Id: I49fd5906e69512e8f2d414f406edc0179522f225
diff --git a/bgpwtf/cccampix/verifier/BUILD.bazel b/bgpwtf/cccampix/verifier/BUILD.bazel
new file mode 100644
index 0000000..f7d15e3
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/BUILD.bazel
@@ -0,0 +1,38 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "main.go",
+ "processor_irr.go",
+ "processor_peeringdb.go",
+ "processor_rpki.go",
+ "processor_secretgen.go",
+ "processors.go",
+ "state.go",
+ "statusz.go",
+ ],
+ importpath = "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier",
+ visibility = ["//visibility:private"],
+ deps = [
+ "//bgpwtf/cccampix/proto:go_default_library",
+ "//bgpwtf/cccampix/verifier/model:go_default_library",
+ "//go/mirko:go_default_library",
+ "//go/pki:go_default_library",
+ "//go/statusz:go_default_library",
+ "@com_github_dustin_go_humanize//:go_default_library",
+ "@com_github_golang_glog//:go_default_library",
+ "@com_github_lib_pq//:go_default_library",
+ "@com_github_sethvargo_go_password//password:go_default_library",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//codes:go_default_library",
+ "@org_golang_google_grpc//status:go_default_library",
+ "@org_golang_x_net//trace:go_default_library",
+ ],
+)
+
+go_binary(
+ name = "verifier",
+ embed = [":go_default_library"],
+ visibility = ["//visibility:public"],
+)
diff --git a/bgpwtf/cccampix/verifier/main.go b/bgpwtf/cccampix/verifier/main.go
new file mode 100644
index 0000000..591ee33
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/main.go
@@ -0,0 +1,319 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "sync"
+ "time"
+
+ pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
+ "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
+ "code.hackerspace.pl/hscloud/go/mirko"
+ "code.hackerspace.pl/hscloud/go/statusz"
+ "github.com/golang/glog"
+ "github.com/lib/pq"
+ "golang.org/x/net/trace"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type processorState struct {
+ name string
+ p processor
+ lastRun *time.Time
+ lastErr error
+}
+
+func (p *processorState) nextRun() *time.Time {
+ if p.lastRun == nil {
+ return nil
+ }
+ nr := p.p.NextRun(*p.lastRun)
+ return &nr
+}
+
+type service struct {
+ model model.Model
+
+ processors map[string]*processorState
+ processorsMu sync.RWMutex
+
+ requiredChecks []string
+}
+
+func (s *service) run(ctx context.Context) {
+ t := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-t.C:
+ break
+ }
+
+ s.runProcessors(ctx)
+ }
+}
+
+func (s *service) runProcessors(ctx context.Context) {
+ s.processorsMu.RLock()
+ defer s.processorsMu.RUnlock()
+
+ now := time.Now()
+
+ for _, p := range s.processors {
+ nr := p.nextRun()
+ if nr == nil || nr.Before(now) {
+ glog.Infof("Running processor %q...", p.name)
+ tr := trace.New(fmt.Sprintf("processor.%s", p.name), "Run")
+ pctx := trace.NewContext(ctx, tr)
+ err := p.p.RunAll(pctx, s.model)
+ tr.LazyPrintf("Processor done: %v", err)
+ tr.Finish()
+ if err != nil {
+ glog.Errorf("Running processor %q failed: %v", p.name, err)
+ }
+ p.lastErr = err
+ p.lastRun = &now
+ }
+ }
+}
+
+var (
+ flagDSN string
+ flagPeeringDB string
+ flagIRR string
+ flagOctoRPKI string
+)
+
+func main() {
+ flag.StringVar(&flagDSN, "dsn", "", "PostrgreSQL connection string")
+ flag.StringVar(&flagPeeringDB, "peeringdb", "", "Address of peeringdb service")
+ flag.StringVar(&flagIRR, "irr", "", "Address of irr service")
+ flag.StringVar(&flagOctoRPKI, "octorpki", "", "Address of octorpki service")
+ flag.Parse()
+
+ // Picking an existing postgres-like driver for sqlx.BindType to work
+ // See: https://github.com/jmoiron/sqlx/blob/ed7c52c43ee1e12a35efbcfea8dbae2d62a90370/bind.go#L24
+ mirko.TraceSQL(&pq.Driver{}, "pgx")
+ mi := mirko.New()
+
+ m, err := model.Connect(mi.Context(), "pgx", flagDSN)
+ if err != nil {
+ glog.Exitf("Failed to create model: %v", err)
+ }
+
+ err = m.MigrateUp()
+ if err != nil {
+ glog.Exitf("Failed to migrate up: %v", err)
+ }
+
+ if err := mi.Listen(); err != nil {
+ glog.Exitf("Listen failed: %v", err)
+ }
+
+ s := &service{
+ model: m,
+ processors: make(map[string]*processorState),
+ requiredChecks: []string{"irr"},
+ }
+
+ must := func(p processor, err error) processor {
+ if err != nil {
+ panic(err)
+ }
+ return p
+ }
+ s.addProcessor(must(newPeeringDB(flagPeeringDB)))
+ s.addProcessor(must(newIRR(flagIRR)))
+ s.addProcessor(must(newSecretGen()))
+ s.addProcessor(must(newRPKI(flagOctoRPKI)))
+ statusz.AddStatusPart("Processors", processorsFragment, s.statuszProcessors)
+
+ go s.run(mi.Context())
+
+ pb.RegisterVerifierServer(mi.GRPC(), s)
+
+ if err := mi.Serve(); err != nil {
+ glog.Exitf("Serve failed: %v", err)
+ }
+
+ <-mi.Done()
+}
+
+func (s *service) addProcessor(p processor) {
+ s.processorsMu.Lock()
+ defer s.processorsMu.Unlock()
+
+ name := p.Name()
+ if _, ok := s.processors[name]; ok {
+ panic(fmt.Sprintf("duplicated processor %q", name))
+ }
+ s.processors[name] = &processorState{
+ name: name,
+ p: p,
+ lastRun: nil,
+ }
+}
+
+func (s *service) ProcessorStatus(ctx context.Context, req *pb.ProcessorStatusRequest) (*pb.ProcessorStatusResponse, error) {
+ s.processorsMu.RLock()
+ defer s.processorsMu.RUnlock()
+
+ res := &pb.ProcessorStatusResponse{
+ Processors: make([]*pb.ProcessorStatusResponse_Processor, len(s.processors)),
+ }
+
+ i := 0
+ for _, p := range s.processors {
+ res.Processors[i] = &pb.ProcessorStatusResponse_Processor{
+ Name: p.name,
+ Status: pb.ProcessorStatusResponse_Processor_STATUS_OK,
+ LastRun: 0,
+ NextRun: 0,
+ }
+
+ if p.lastRun != nil {
+ res.Processors[i].LastRun = p.lastRun.UnixNano()
+ res.Processors[i].NextRun = p.p.NextRun(*p.lastRun).UnixNano()
+ }
+
+ if p.lastErr != nil {
+ res.Processors[i].Status = pb.ProcessorStatusResponse_Processor_STATUS_ERROR
+ }
+
+ i += 1
+ }
+ return res, nil
+}
+
+func (s *service) PeerSummary(req *pb.PeerSummaryRequest, stream pb.Verifier_PeerSummaryServer) error {
+ peers, err := s.model.GetCheckablePeers(stream.Context())
+ if err != nil {
+ glog.Errorf("model.GetCheckablePeers: %v", err)
+ return status.Error(codes.Unavailable, "model error")
+ }
+
+ asns := make([]int64, len(peers))
+ asnToRes := make(map[int64]*pb.PeerSummaryResponse)
+
+ for i, peer := range peers {
+ routers := make([]*pb.PeeringDBMember_Router, len(peer.Routers))
+ for i, router := range peer.Routers {
+ routers[i] = &pb.PeeringDBMember_Router{}
+ if router.V4 != nil {
+ routers[i].Ipv4 = router.V4.String()
+ }
+ if router.V6 != nil {
+ routers[i].Ipv6 = router.V6.String()
+ }
+ }
+ p := &pb.PeeringDBMember{
+ Asn: peer.ASN,
+ Name: peer.Name,
+ Routers: routers,
+ }
+ res := &pb.PeerSummaryResponse{
+ PeeringdbInfo: p,
+ CheckStatus: pb.PeerSummaryResponse_STATUS_OK,
+ }
+ asnToRes[peer.ASN] = res
+ asns[i] = peer.ASN
+ }
+
+ checkres, err := s.model.GetPeerCheckResults(stream.Context(), asns)
+ if err != nil {
+ glog.Errorf("GetPeerCheckResults(%v): %v", asns, err)
+ for _, res := range asnToRes {
+ res.CheckStatus = pb.PeerSummaryResponse_STATUS_UNKNOWN
+ }
+ } else {
+ passedChecks := make(map[int64]map[string]bool)
+ for _, c := range checkres {
+ if _, ok := passedChecks[c.PeerASN]; !ok {
+ passedChecks[c.PeerASN] = make(map[string]bool)
+ }
+ passedChecks[c.PeerASN][c.CheckName] = c.Status == model.PeerCheckStatus_Okay
+ }
+
+ for asn, checks := range passedChecks {
+ for _, required := range s.requiredChecks {
+ if !checks[required] {
+ asnToRes[asn].CheckStatus = pb.PeerSummaryResponse_STATUS_FAILED
+ break
+ }
+ }
+ }
+ }
+
+ for _, res := range asnToRes {
+ if err := stream.Send(res); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (s *service) PeerDetails(ctx context.Context, req *pb.PeerDetailsRequest) (*pb.PeerDetailsResponse, error) {
+ if req.Asn <= 0 {
+ return nil, status.Error(codes.InvalidArgument, "asn must be set")
+ }
+
+ res := &pb.PeerDetailsResponse{}
+
+ peeringdb, err := s.model.GetPeeringDBPeer(ctx, req.Asn)
+ if err != nil {
+ glog.Errorf("GetPeeringDBPeer(%v): %v", req.Asn, err)
+ return nil, status.Error(codes.Unavailable, "could not get allowed prefixes")
+ }
+
+ if peeringdb.Asn != req.Asn {
+ return nil, status.Error(codes.NotFound, "no such ASN")
+ }
+
+ res.PeeringdbInfo = peeringdb
+
+ checkres, err := s.model.GetPeerCheckResults(ctx, []int64{req.Asn})
+ if err != nil {
+ glog.Errorf("GetPeerCheckResults(%v): %v", req.Asn, err)
+ return nil, status.Error(codes.Unavailable, "could not get check results")
+ }
+
+ res.Checks = make([]*pb.PeerDetailsResponse_Check, len(checkres))
+ for i, check := range checkres {
+ status := pb.PeerDetailsResponse_Check_STATUS_INVALID
+ switch check.Status {
+ case model.PeerCheckStatus_Okay:
+ status = pb.PeerDetailsResponse_Check_STATUS_OK
+ case model.PeerCheckStatus_SoftFailed:
+ status = pb.PeerDetailsResponse_Check_STATUS_OK
+ case model.PeerCheckStatus_Failed:
+ status = pb.PeerDetailsResponse_Check_STATUS_FAILED
+ }
+ res.Checks[i] = &pb.PeerDetailsResponse_Check{
+ Name: check.CheckName,
+ Status: status,
+ Time: check.Time.UnixNano(),
+ Msg: check.Message,
+ }
+ }
+
+ prefixes, err := s.model.GetAllowedPrefixes(ctx, req.Asn)
+ if err != nil {
+ glog.Errorf("GetAllowedPrefixes(%v): %v", req.Asn, err)
+ return nil, status.Error(codes.Unavailable, "could not get allowed prefixes")
+ }
+
+ res.AllowedPrefixes = make([]*pb.PeerDetailsResponse_AllowedPrefix, len(prefixes))
+ for i, prefix := range prefixes {
+ res.AllowedPrefixes[i] = &pb.PeerDetailsResponse_AllowedPrefix{
+ Prefix: prefix.Prefix.String(),
+ MaxLength: prefix.MaxLength,
+ Ta: prefix.TA,
+ }
+ }
+
+ return res, nil
+}
diff --git a/bgpwtf/cccampix/verifier/model/BUILD.bazel b/bgpwtf/cccampix/verifier/model/BUILD.bazel
new file mode 100644
index 0000000..bff1eeb
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/BUILD.bazel
@@ -0,0 +1,28 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "allowed_prefixes.go",
+ "checkable_peers.go",
+ "config.go",
+ "get_checks.go",
+ "model.go",
+ "peer_routers.go",
+ "peers.go",
+ "pgp.go",
+ "schema.go",
+ "submit_checks.go",
+ ],
+ importpath = "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//bgpwtf/cccampix/proto:go_default_library",
+ "//bgpwtf/cccampix/verifier/model/migrations:go_default_library",
+ "@com_github_golang_glog//:go_default_library",
+ "@com_github_golang_migrate_migrate_v4//:go_default_library",
+ "@com_github_golang_migrate_migrate_v4//database/cockroachdb:go_default_library",
+ "@com_github_jmoiron_sqlx//:go_default_library",
+ "@com_github_lib_pq//:go_default_library",
+ ],
+)
diff --git a/bgpwtf/cccampix/verifier/model/allowed_prefixes.go b/bgpwtf/cccampix/verifier/model/allowed_prefixes.go
new file mode 100644
index 0000000..e6b38d0
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/allowed_prefixes.go
@@ -0,0 +1,87 @@
+package model
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "net"
+ "time"
+)
+
+func (s *sqlModel) UpdateAllowedPrefixes(ctx context.Context, asn int64, prefixes []*AllowedPrefix) error {
+ tx := s.db.MustBeginTx(ctx, &sql.TxOptions{})
+ defer tx.Rollback()
+
+ timestamp := time.Now().UnixNano()
+
+ for _, prefix := range prefixes {
+ q := `
+ INSERT INTO allowed_prefixes
+ (peer_id, timestamp, prefix, max_length, ta)
+ SELECT
+ peers.id, :timestamp, :prefix, :max_length, :ta
+ FROM peers
+ WHERE peers.asn = :asn
+ ON CONFLICT (peer_id, prefix)
+ DO UPDATE SET
+ timestamp = :timestamp,
+ max_length = :max_length,
+ ta = :ta
+ `
+ ap := sqlAllowedPrefix{
+ Timestamp: timestamp,
+ Prefix: prefix.Prefix.String(),
+ MaxLength: prefix.MaxLength,
+ TA: prefix.TA,
+ ASN: fmt.Sprintf("%d", asn),
+ }
+
+ if _, err := tx.NamedExecContext(ctx, q, ap); err != nil {
+ return fmt.Errorf("INSERT allowed_prefixes: %v", err)
+ }
+ }
+
+ q := `
+ DELETE FROM allowed_prefixes
+ WHERE timestamp != $1
+ AND peer_id = (SELECT peers.id FROM peers WHERE peers.asn = $2)
+ `
+ if _, err := tx.ExecContext(ctx, q, timestamp, asn); err != nil {
+ return fmt.Errorf("DELETE FROM allowed_prefixes: %v", err)
+ }
+
+ return tx.Commit()
+}
+
+func (s *sqlModel) GetAllowedPrefixes(ctx context.Context, asn int64) ([]*AllowedPrefix, error) {
+ q := `
+ SELECT
+ allowed_prefixes.prefix,
+ allowed_prefixes.max_length,
+ allowed_prefixes.ta
+ FROM
+ allowed_prefixes
+ LEFT JOIN peers
+ ON peers.id = allowed_prefixes.peer_id
+ WHERE peers.asn = $1
+ `
+ data := []sqlAllowedPrefix{}
+ if err := s.db.SelectContext(ctx, &data, q, asn); err != nil {
+ return nil, fmt.Errorf("SELECT allowed_prefixes: %v", err)
+ }
+
+ res := make([]*AllowedPrefix, len(data))
+ for i, d := range data {
+ _, prefix, err := net.ParseCIDR(d.Prefix)
+ if err != nil {
+ return nil, fmt.Errorf("corrupted CIDR in database: %v", err)
+ }
+ res[i] = &AllowedPrefix{
+ Prefix: *prefix,
+ MaxLength: d.MaxLength,
+ TA: d.TA,
+ }
+ }
+
+ return res, nil
+}
diff --git a/bgpwtf/cccampix/verifier/model/checkable_peers.go b/bgpwtf/cccampix/verifier/model/checkable_peers.go
new file mode 100644
index 0000000..2fe133f
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/checkable_peers.go
@@ -0,0 +1,76 @@
+package model
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+)
+
+func (m *sqlModel) GetCheckablePeers(ctx context.Context) ([]*Peer, error) {
+
+ data := []struct {
+ sqlPeer `db:"peers"`
+ sqlPeerRouter `db:"peer_routers"`
+ }{}
+ q := `
+ SELECT
+ peers.id "peers.id",
+ peers.asn "peers.asn",
+ peers.name "peers.name",
+
+ peer_routers.peer_id "peer_routers.peer_id",
+ peer_routers.v6 "peer_routers.v6",
+ peer_routers.v4 "peer_routers.v4"
+ FROM peers
+ LEFT JOIN peer_routers
+ ON peer_routers.peer_id = peers.id
+ `
+ if err := m.db.SelectContext(ctx, &data, q); err != nil {
+ return nil, fmt.Errorf("SELECT peers/peerRouters: %v", err)
+ }
+
+ // Collapse peers into map
+ // ID -> Peer
+ peers := make(map[string]*Peer)
+
+ for _, row := range data {
+ peer, ok := peers[row.sqlPeer.ID]
+ if !ok {
+ asn, err := strconv.ParseInt(row.sqlPeer.ASN, 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("data corruption: invalid ASN %q", row.sqlPeer.ASN)
+ }
+ peer = &Peer{
+ ASN: asn,
+ Name: row.sqlPeer.Name,
+ Routers: []*Router{},
+ }
+ peers[row.sqlPeer.ID] = peer
+ }
+
+ var v6 net.IP
+ var v4 net.IP
+
+ if row.sqlPeerRouter.V6.Valid {
+ v6 = net.ParseIP(row.sqlPeerRouter.V6.String)
+ }
+ if row.sqlPeerRouter.V4.Valid {
+ v4 = net.ParseIP(row.sqlPeerRouter.V4.String)
+ }
+
+ peer.Routers = append(peer.Routers, &Router{
+ V6: v6,
+ V4: v4,
+ })
+ }
+
+ res := make([]*Peer, len(peers))
+ i := 0
+ for _, peer := range peers {
+ res[i] = peer
+ i += 1
+ }
+
+ return res, nil
+}
diff --git a/bgpwtf/cccampix/verifier/model/config.go b/bgpwtf/cccampix/verifier/model/config.go
new file mode 100644
index 0000000..bafd46f
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/config.go
@@ -0,0 +1,51 @@
+package model
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+)
+
+func (m *sqlModel) ConfigureMissingSessions(ctx context.Context, gen func() SessionConfig) error {
+ tx := m.db.MustBeginTx(ctx, &sql.TxOptions{})
+ defer tx.Rollback()
+
+ q := `
+ SELECT
+ peer_routers.peer_id "peer_id",
+ peer_routers.id "id"
+ FROM peer_routers
+ WHERE peer_routers.id NOT IN (
+ SELECT session_configs.peer_router_id
+ FROM session_configs
+ )
+ `
+ missing := []struct {
+ PeerID string `db:"peer_id"`
+ ID string `db:"id"`
+ }{}
+ if err := m.db.SelectContext(ctx, &missing, q); err != nil {
+ return fmt.Errorf("SELECT peerRouters: %v", err)
+ }
+
+ for _, m := range missing {
+ config := gen()
+ q = `
+ INSERT INTO
+ session_configs
+ (peer_id, peer_router_id, bgp_secret)
+ VALUES
+ (:peer_id, :peer_router_id, :bgp_secret)
+ `
+ data := sqlSessionConfig{
+ PeerID: m.PeerID,
+ PeerRouterID: m.ID,
+ BGPSecret: config.BGPSecret,
+ }
+ if _, err := tx.NamedExecContext(ctx, q, data); err != nil {
+ return err
+ }
+ }
+
+ return tx.Commit()
+}
diff --git a/bgpwtf/cccampix/verifier/model/get_checks.go b/bgpwtf/cccampix/verifier/model/get_checks.go
new file mode 100644
index 0000000..0879427
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/get_checks.go
@@ -0,0 +1,70 @@
+package model
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "time"
+
+ "github.com/golang/glog"
+ "github.com/jmoiron/sqlx"
+)
+
+func (s *sqlModel) GetPeerCheckResults(ctx context.Context, asn []int64) ([]*PeerCheckResult, error) {
+ asns := make([]string, len(asn))
+ for i, asn := range asn {
+ asns[i] = fmt.Sprintf("%d", asn)
+ }
+
+ data := []struct {
+ sqlPeer `db:"peers"`
+ sqlPeerCheck `db:"peer_checks"`
+ }{}
+ q := `
+ SELECT
+ peers.asn "peers.asn",
+ peer_checks.check_name "peer_checks.check_name",
+ peer_checks.check_time "peer_checks.check_time",
+ peer_checks.check_status "peer_checks.check_status",
+ peer_checks.check_message "peer_checks.check_message"
+ FROM peers
+ LEFT JOIN peer_checks
+ ON peers.id = peer_checks.peer_id
+ WHERE peers.asn IN (?)
+ `
+ query, args, err := sqlx.In(q, asns)
+ if err != nil {
+ return nil, fmt.Errorf("SELECT peers: %v", err)
+ }
+
+ query = s.db.Rebind(query)
+ if err := s.db.SelectContext(ctx, &data, query, args...); err != nil {
+ return nil, fmt.Errorf("SELECT peers: %v", err)
+ }
+
+ res := make([]*PeerCheckResult, len(data))
+ for i, d := range data {
+ asn, err := strconv.ParseInt(d.sqlPeer.ASN, 10, 64)
+ if err != nil {
+ return nil, err
+ }
+ status := PeerCheckStatus_Invalid
+ switch d.sqlPeerCheck.CheckStatus {
+ case "okay":
+ status = PeerCheckStatus_Okay
+ case "failed":
+ status = PeerCheckStatus_Failed
+ default:
+ glog.Errorf("Unhandled check status %q", d.sqlPeerCheck.CheckStatus)
+ }
+ res[i] = &PeerCheckResult{
+ PeerASN: asn,
+ CheckName: d.sqlPeerCheck.CheckName,
+ Time: time.Unix(0, d.sqlPeerCheck.CheckTime),
+ Status: status,
+ Message: d.sqlPeerCheck.CheckMessage,
+ }
+ }
+
+ return res, nil
+}
diff --git a/bgpwtf/cccampix/verifier/model/migrations/1564664855_initial.down.sql b/bgpwtf/cccampix/verifier/model/migrations/1564664855_initial.down.sql
new file mode 100644
index 0000000..d0ba745
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/1564664855_initial.down.sql
@@ -0,0 +1,2 @@
+DROP TABLE peers;
+DROP TABLE peer_routers;
diff --git a/bgpwtf/cccampix/verifier/model/migrations/1564664855_initial.up.sql b/bgpwtf/cccampix/verifier/model/migrations/1564664855_initial.up.sql
new file mode 100644
index 0000000..bab0c35
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/1564664855_initial.up.sql
@@ -0,0 +1,27 @@
+CREATE TABLE peers (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ asn STRING NOT NULL,
+ name STRING NOT NULL,
+ source STRING check (
+ source = 'from-peeringdb' or
+ source = 'manual'
+ ) NOT NULL,
+ UNIQUE (asn)
+);
+
+CREATE TABLE peer_routers (
+ peer_id UUID NOT NULL,
+ id UUID DEFAULT gen_random_uuid(),
+
+ v6 STRING,
+ v4 STRING,
+ source STRING check (
+ source = 'from-peeringdb' or
+ source = 'manual'
+ ) NOT NULL,
+
+ UNIQUE (v4),
+ UNIQUE (v6),
+ PRIMARY KEY (peer_id, id),
+ CONSTRAINT fk_peer FOREIGN KEY (peer_id) REFERENCES peers (id) ON DELETE CASCADE
+) INTERLEAVE IN PARENT peers (peer_id);
diff --git a/bgpwtf/cccampix/verifier/model/migrations/1564751360_membership.down.sql b/bgpwtf/cccampix/verifier/model/migrations/1564751360_membership.down.sql
new file mode 100644
index 0000000..c47fec6
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/1564751360_membership.down.sql
@@ -0,0 +1 @@
+DROP TABLE peer_checks;
diff --git a/bgpwtf/cccampix/verifier/model/migrations/1564751360_membership.up.sql b/bgpwtf/cccampix/verifier/model/migrations/1564751360_membership.up.sql
new file mode 100644
index 0000000..247c9eb
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/1564751360_membership.up.sql
@@ -0,0 +1,18 @@
+CREATE TABLE peer_checks (
+ peer_id UUID NOT NULL,
+ id UUID DEFAULT gen_random_uuid(),
+
+ check_name STRING NOT NULL,
+ check_time INT NOT NULL,
+ check_status STRING check (
+ check_status = 'unknown' or
+ check_status = 'okay' or
+ check_status = 'failed'
+ ) NOT NULL,
+ check_message STRING NOT NULL,
+ delete BOOL NOT NULL,
+
+ UNIQUE(peer_id, check_name),
+ PRIMARY KEY (peer_id, id),
+ CONSTRAINT fk_peer FOREIGN KEY (peer_id) REFERENCES peers (id) ON DELETE CASCADE
+) INTERLEAVE IN PARENT peers (peer_id);
diff --git a/bgpwtf/cccampix/verifier/model/migrations/1564867563_pgp.down.sql b/bgpwtf/cccampix/verifier/model/migrations/1564867563_pgp.down.sql
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/1564867563_pgp.down.sql
diff --git a/bgpwtf/cccampix/verifier/model/migrations/1564867563_pgp.up.sql b/bgpwtf/cccampix/verifier/model/migrations/1564867563_pgp.up.sql
new file mode 100644
index 0000000..6c6f752
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/1564867563_pgp.up.sql
@@ -0,0 +1,11 @@
+CREATE TABLE peer_pgp_keys (
+ peer_id UUID NOT NULL,
+ id UUID DEFAULT gen_random_uuid(),
+
+ fingerprint STRING NOT NULL,
+ time_created INT NOT NULL,
+
+ UNIQUE (peer_id),
+ PRIMARY KEY (peer_id, id),
+ CONSTRAINT fk_peer FOREIGN KEY (peer_id) REFERENCES peers (id) ON DELETE CASCADE
+) INTERLEAVE IN PARENT peers (peer_id);
diff --git a/bgpwtf/cccampix/verifier/model/migrations/1565471293_session_configs.down.sql b/bgpwtf/cccampix/verifier/model/migrations/1565471293_session_configs.down.sql
new file mode 100644
index 0000000..f286d79
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/1565471293_session_configs.down.sql
@@ -0,0 +1 @@
+DROP TABLE session_configs;
diff --git a/bgpwtf/cccampix/verifier/model/migrations/1565471293_session_configs.up.sql b/bgpwtf/cccampix/verifier/model/migrations/1565471293_session_configs.up.sql
new file mode 100644
index 0000000..7a1d07b
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/1565471293_session_configs.up.sql
@@ -0,0 +1,11 @@
+CREATE TABLE session_configs (
+ peer_id UUID NOT NULL,
+ peer_router_id UUID NOT NULL,
+ id UUID DEFAULT gen_random_uuid(),
+
+ bgp_secret STRING NOT NULL,
+
+ UNIQUE (peer_router_id),
+ PRIMARY KEY (peer_id, peer_router_id, id),
+ CONSTRAINT fk_peer_router FOREIGN KEY (peer_id, peer_router_id) REFERENCES peer_routers ON DELETE CASCADE
+) INTERLEAVE IN PARENT peer_routers (peer_id, peer_router_id);
diff --git a/bgpwtf/cccampix/verifier/model/migrations/1565545845_allowed_prefixes.down.sql b/bgpwtf/cccampix/verifier/model/migrations/1565545845_allowed_prefixes.down.sql
new file mode 100644
index 0000000..d0a98f3
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/1565545845_allowed_prefixes.down.sql
@@ -0,0 +1 @@
+DROP TABLE allowed_prefix;
diff --git a/bgpwtf/cccampix/verifier/model/migrations/1565545845_allowed_prefixes.up.sql b/bgpwtf/cccampix/verifier/model/migrations/1565545845_allowed_prefixes.up.sql
new file mode 100644
index 0000000..c033e3d
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/1565545845_allowed_prefixes.up.sql
@@ -0,0 +1,13 @@
+CREATE TABLE allowed_prefixes (
+ peer_id UUID NOT NULL,
+ id UUID DEFAULT gen_random_uuid(),
+ timestamp INT NOT NULL,
+
+ prefix STRING NOT NULL,
+ max_length INT NOT NULL,
+ ta STRING NOT NULL,
+
+ PRIMARY KEY (peer_id, id),
+ UNIQUE (peer_id, prefix),
+ CONSTRAINT fk_peer FOREIGN KEY (peer_id) REFERENCES peers (id) ON DELETE CASCADE
+) INTERLEAVE IN PARENT peers (peer_id);
diff --git a/bgpwtf/cccampix/verifier/model/migrations/BUILD.bazel b/bgpwtf/cccampix/verifier/model/migrations/BUILD.bazel
new file mode 100644
index 0000000..b20d35f
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/BUILD.bazel
@@ -0,0 +1,23 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//extras:embed_data.bzl", "go_embed_data")
+
+go_embed_data(
+ name = "migrations_data",
+ srcs = glob(["*.sql"]),
+ package = "migrations",
+ flatten = True,
+)
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "migrations.go",
+ ":migrations_data", # keep
+ ],
+ importpath = "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model/migrations",
+ visibility = ["//bgpwtf/cccampix/verifier/model:__subpackages__"],
+ deps = [
+ "//go/mirko:go_default_library",
+ "@com_github_golang_migrate_migrate_v4//:go_default_library",
+ ],
+)
diff --git a/bgpwtf/cccampix/verifier/model/migrations/migrations.go b/bgpwtf/cccampix/verifier/model/migrations/migrations.go
new file mode 100644
index 0000000..1782c2e
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/migrations/migrations.go
@@ -0,0 +1,17 @@
+package migrations
+
+import (
+ "fmt"
+
+ "code.hackerspace.pl/hscloud/go/mirko"
+
+ "github.com/golang-migrate/migrate/v4"
+)
+
+func New(dburl string) (*migrate.Migrate, error) {
+ source, err := mirko.NewMigrationsFromBazel(Data)
+ if err != nil {
+ return nil, fmt.Errorf("could not create migrations: %v", err)
+ }
+ return migrate.NewWithSourceInstance("bazel", source, dburl)
+}
diff --git a/bgpwtf/cccampix/verifier/model/model.go b/bgpwtf/cccampix/verifier/model/model.go
new file mode 100644
index 0000000..b9b81c9
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/model.go
@@ -0,0 +1,141 @@
+package model
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strings"
+ "time"
+
+ pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
+ "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model/migrations"
+ migrate "github.com/golang-migrate/migrate/v4"
+ _ "github.com/golang-migrate/migrate/v4/database/cockroachdb"
+ "github.com/jmoiron/sqlx"
+ _ "github.com/lib/pq"
+)
+
+type Model interface {
+ MigrateUp() error
+
+ RecordPeeringDBPeers(ctx context.Context, members []*pb.PeeringDBMember) error
+ RecordPeeringDBPeerRouters(ctx context.Context, members []*pb.PeeringDBMember) error
+ GetPeeringDBPeer(ctx context.Context, asn int64) (*pb.PeeringDBMember, error)
+
+ GetCheckablePeers(ctx context.Context) ([]*Peer, error)
+ SubmitPeerCheckResults(ctx context.Context, res []*PeerCheckResult) error
+ GetPeerCheckResults(ctx context.Context, asn []int64) ([]*PeerCheckResult, error)
+
+ UpdatePGPKey(ctx context.Context, key *PeerPGPKey) error
+
+ ConfigureMissingSessions(ctx context.Context, gen func() SessionConfig) error
+
+ UpdateAllowedPrefixes(ctx context.Context, asn int64, prefixes []*AllowedPrefix) error
+ GetAllowedPrefixes(ctx context.Context, asn int64) ([]*AllowedPrefix, error)
+}
+
+type stringer struct {
+}
+
+func (s *stringer) String() string {
+ if s == nil {
+ return "<nil>"
+ }
+ return fmt.Sprintf("%+v", *s)
+}
+
+type Router struct {
+ stringer
+ V6 net.IP
+ V4 net.IP
+}
+
+type Peer struct {
+ stringer
+ ASN int64
+ Name string
+ Routers []*Router
+}
+
+type PeerCheckStatus int
+
+const (
+ PeerCheckStatus_Invalid PeerCheckStatus = iota
+ PeerCheckStatus_Okay
+ PeerCheckStatus_Failed
+ PeerCheckStatus_SoftFailed
+)
+
+type PeerCheckResult struct {
+ PeerASN int64
+ CheckName string
+ Time time.Time
+ Status PeerCheckStatus
+ Message string
+}
+
+func (p *PeerCheckResult) String() string {
+ if p == nil {
+ return "<nil>"
+ }
+ return fmt.Sprintf("%+v", *p)
+}
+
+type PeerPGPKey struct {
+ stringer
+ PeerASN int64
+ Fingerprint string
+}
+
+type SessionConfig struct {
+ BGPSecret string
+}
+
+type AllowedPrefix struct {
+ Prefix net.IPNet
+ MaxLength int64
+ TA string
+}
+
+func (p *AllowedPrefix) String() string {
+ if p == nil {
+ return "<nil>"
+ }
+ return fmt.Sprintf("%+v", *p)
+}
+
+type sqlModel struct {
+ db *sqlx.DB
+ dsn string
+}
+
+func Connect(ctx context.Context, driver, dsn string) (Model, error) {
+ if dsn == "" {
+ return nil, fmt.Errorf("dsn cannot be empty")
+ }
+
+ db, err := sqlx.ConnectContext(ctx, driver, dsn)
+ if err != nil {
+ return nil, fmt.Errorf("could not connect to database: %v", err)
+ }
+
+ return &sqlModel{
+ db: db,
+ dsn: dsn,
+ }, nil
+}
+
+func (m *sqlModel) MigrateUp() error {
+ dsn := "cockroach://" + strings.TrimPrefix(m.dsn, "postgres://")
+ mig, err := migrations.New(dsn)
+ if err != nil {
+ return err
+ }
+ err = mig.Up()
+ switch err {
+ case migrate.ErrNoChange:
+ return nil
+ default:
+ return err
+ }
+}
diff --git a/bgpwtf/cccampix/verifier/model/peer_routers.go b/bgpwtf/cccampix/verifier/model/peer_routers.go
new file mode 100644
index 0000000..853b9e5
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/peer_routers.go
@@ -0,0 +1,128 @@
+package model
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
+ "github.com/golang/glog"
+ "github.com/jmoiron/sqlx"
+)
+
+func (m *sqlModel) RecordPeeringDBPeerRouters(ctx context.Context, members []*pb.PeeringDBMember) error {
+ tx := m.db.MustBeginTx(ctx, &sql.TxOptions{})
+ defer tx.Rollback()
+
+ for _, member := range members {
+ // Get existing routers for peer.
+ q := `
+ SELECT peer_routers.id, peer_routers.v4, peer_routers.v6
+ FROM peer_routers
+ LEFT JOIN peers ON (peer_routers.peer_id = peers.id)
+ WHERE peer_routers.source = 'from-peeringdb'
+ AND peers.asn = ?
+ `
+ q = tx.Rebind(q)
+ existing := []sqlPeerRouter{}
+ if err := tx.SelectContext(ctx, &existing, q, fmt.Sprintf("%d", member.Asn)); err != nil {
+ return fmt.Errorf("SELECT peerRouters: %v", err)
+ }
+
+ // Find all routers that need to be deleted because they're outdated.
+ // We do not attempt updates, only removals/recreations.
+
+ // UUID -> bool
+ toDelete := make(map[string]bool)
+
+ for _, ex := range existing {
+ // Try to find a requested router with same IP addresses.
+ found := false
+ for _, router := range member.Routers {
+ if router.Ipv4 == ex.V4.String && router.Ipv6 == ex.V6.String {
+ found = true
+ break
+ }
+ }
+
+ // Not found, mark for deletion.
+ if !found {
+ toDelete[ex.ID] = true
+ }
+ }
+
+ // Find all routers that need to be created.
+ toAdd := []sqlPeerRouter{}
+ for _, router := range member.Routers {
+ // Try to find an existing router with same IP addresses.
+ found := false
+ for _, ex := range existing {
+ if router.Ipv4 == ex.V4.String && router.Ipv6 == ex.V6.String {
+ found = true
+ break
+ }
+ }
+ // Not found, mark for creation.
+ if !found {
+ ta := sqlPeerRouter{
+ Source: "from-peeringdb",
+ ASN: fmt.Sprintf("%d", member.Asn),
+ }
+ if router.Ipv6 != "" {
+ ta.V6.String = router.Ipv6
+ ta.V6.Valid = true
+ }
+ if router.Ipv4 != "" {
+ ta.V4.String = router.Ipv4
+ ta.V4.Valid = true
+ }
+ toAdd = append(toAdd, ta)
+ }
+ }
+
+ if len(toDelete) > 0 {
+ glog.Infof("RecordPeeringDBPeers: deleting %v", toDelete)
+ }
+ if len(toAdd) > 0 {
+ glog.Infof("RecordPeeringDBPeers: adding %+v", toAdd)
+ }
+
+ // Delete any routers, if needed.
+ if len(toDelete) > 0 {
+ // Get list of IDs to delete.
+ deleteIds := make([]string, len(toDelete))
+ i := 0
+ for id, _ := range toDelete {
+ deleteIds[i] = id
+ i += 1
+ }
+ query, args, err := sqlx.In("DELETE FROM peer_Routers WHERE id IN (?)", deleteIds)
+ if err != nil {
+ return fmt.Errorf("DELETE peerRouters: %v", err)
+ }
+ query = tx.Rebind(query)
+ _, err = tx.ExecContext(ctx, query, args...)
+ if err != nil {
+ return fmt.Errorf("DELETE peerRouters: %v", err)
+ }
+ }
+
+ // Add any routers, if needed.
+ for _, ta := range toAdd {
+ q := `
+ INSERT INTO peer_routers
+ (peer_id, v6, v4, source)
+ SELECT
+ peers.id, :v6, :v4, :source
+ FROM
+ peers
+ WHERE peers.asn = :asn
+ `
+ if _, err := tx.NamedExecContext(ctx, q, ta); err != nil {
+ return fmt.Errorf("INSERT peerRouters: %v", err)
+ }
+ }
+ }
+
+ return tx.Commit()
+}
diff --git a/bgpwtf/cccampix/verifier/model/peers.go b/bgpwtf/cccampix/verifier/model/peers.go
new file mode 100644
index 0000000..1dc0e8f
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/peers.go
@@ -0,0 +1,199 @@
+package model
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "strconv"
+ "strings"
+
+ pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
+ "github.com/golang/glog"
+ "github.com/jmoiron/sqlx"
+)
+
+func (m *sqlModel) RecordPeeringDBPeers(ctx context.Context, members []*pb.PeeringDBMember) error {
+ tx := m.db.MustBeginTx(ctx, &sql.TxOptions{})
+ defer tx.Rollback()
+
+ wanted := make(map[string]*pb.PeeringDBMember)
+ for _, member := range members {
+ wanted[fmt.Sprintf("%d", member.Asn)] = member
+ }
+
+ toDelete := make(map[string]bool)
+ toAdd := make(map[string]bool)
+ toUpdate := make(map[string]bool)
+
+ existing := []sqlPeer{}
+ existingMap := make(map[string]*sqlPeer)
+
+ q := `
+ SELECT peers.id, peers.asn, peers.name, peers.source
+ FROM peers
+ `
+ if err := tx.SelectContext(ctx, &existing, q); err != nil {
+ return fmt.Errorf("SELECT peers: %v", err)
+ }
+
+ // Mark ASs to delete and note existing ASs
+ for _, ex := range existing {
+ ex := ex
+ if wanted[ex.ASN] == nil && ex.Source == "from-peeringdb" {
+ toDelete[ex.ASN] = true
+ }
+ existingMap[ex.ASN] = &ex
+ }
+
+ // Mark ASs to add
+ for k, _ := range wanted {
+ if existingMap[k] == nil {
+ toAdd[k] = true
+ }
+ }
+
+ // Mark ASs to update
+ for k, wd := range wanted {
+ if existingMap[k] == nil {
+ continue
+ }
+ if existingMap[k].Source != "from-peeringdb" {
+ continue
+ }
+ if wd.Name != existingMap[k].Name {
+ toUpdate[k] = true
+ continue
+ }
+ }
+
+ if len(toAdd) > 0 {
+ glog.Infof("RecordPeeringDBPeers: adding %v", toAdd)
+ }
+ if len(toDelete) > 0 {
+ glog.Infof("RecordPeeringDBPeers: deleting %v", toDelete)
+ }
+ if len(toUpdate) > 0 {
+ glog.Infof("RecordPeeringDBPeers: updating %v", toUpdate)
+ }
+
+ // Run INSERT to add new ASNs
+ if len(toAdd) > 0 {
+ q = `
+ INSERT INTO peers
+ (asn, name, source)
+ VALUES
+ (:asn, :name, :source)
+ `
+
+ add := make([]*sqlPeer, len(toAdd))
+ i := 0
+ for ta, _ := range toAdd {
+ add[i] = &sqlPeer{
+ ASN: ta,
+ Name: wanted[ta].Name,
+ Source: "from-peeringdb",
+ }
+ i += 1
+ }
+
+ if _, err := tx.NamedExecContext(ctx, q, add); err != nil {
+ return fmt.Errorf("INSERT peers: %v", err)
+ }
+ }
+
+ // Run DELETE to remove nonexistent ASNs
+ if len(toDelete) > 0 {
+ deleteIds := make([]string, len(toDelete))
+ i := 0
+ for td, _ := range toDelete {
+ deleteIds[i] = existingMap[td].ID
+ i += 1
+ }
+ query, args, err := sqlx.In("DELETE FROM peers WHERE id IN (?)", deleteIds)
+ if err != nil {
+ return fmt.Errorf("DELETE peers: %v", err)
+ }
+ query = tx.Rebind(query)
+ _, err = tx.ExecContext(ctx, query, args...)
+ if err != nil {
+ return fmt.Errorf("DELETE peers: %v", err)
+ }
+ }
+
+ // Run UPDATE to update existing ASNs
+ for k, _ := range toUpdate {
+ want := wanted[k]
+ got := existingMap[k]
+
+ fields := []string{}
+ args := []interface{}{}
+ if want.Name != got.Name {
+ fields = append(fields, "name = ?")
+ args = append(args, want.Name)
+ }
+
+ q = fmt.Sprintf(`
+ UPDATE peers
+ SET
+ %s
+ WHERE
+ id = ?
+ `, strings.Join(fields, ",\n"))
+ q = tx.Rebind(q)
+ args = append(args, got.ID)
+ _, err := tx.ExecContext(ctx, q, args...)
+ if err != nil {
+ return fmt.Errorf("UPDATE peers: %v", err)
+ }
+ }
+
+ return tx.Commit()
+}
+
+func (s *sqlModel) GetPeeringDBPeer(ctx context.Context, asn int64) (*pb.PeeringDBMember, error) {
+ data := []struct {
+ sqlPeer `db:"peers"`
+ sqlPeerRouter `db:"peer_routers"`
+ }{}
+ q := `
+ SELECT
+ peers.id "peers.id",
+ peers.asn "peers.asn",
+ peers.name "peers.name",
+
+ peer_routers.peer_id "peer_routers.peer_id",
+ peer_routers.v6 "peer_routers.v6",
+ peer_routers.v4 "peer_routers.v4"
+ FROM peers
+ LEFT JOIN peer_routers
+ ON peer_routers.peer_id = peers.id
+ WHERE peers.asn = $1
+ `
+ if err := s.db.SelectContext(ctx, &data, q, asn); err != nil {
+ return nil, fmt.Errorf("SELECT peers/peerRouters: %v", err)
+ }
+
+ res := &pb.PeeringDBMember{}
+
+ for i, row := range data {
+ if res.Routers == nil {
+ asn, err := strconv.ParseInt(row.sqlPeer.ASN, 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("data corruption: invalid ASN %q", row.sqlPeer.ASN)
+ }
+ res.Asn = asn
+ res.Name = row.sqlPeer.Name
+ res.Routers = make([]*pb.PeeringDBMember_Router, len(data))
+ }
+
+ res.Routers[i] = &pb.PeeringDBMember_Router{}
+ if row.sqlPeerRouter.V6.Valid {
+ res.Routers[i].Ipv6 = row.sqlPeerRouter.V6.String
+ }
+ if row.sqlPeerRouter.V4.Valid {
+ res.Routers[i].Ipv4 = row.sqlPeerRouter.V4.String
+ }
+ }
+
+ return res, nil
+}
diff --git a/bgpwtf/cccampix/verifier/model/pgp.go b/bgpwtf/cccampix/verifier/model/pgp.go
new file mode 100644
index 0000000..a76186e
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/pgp.go
@@ -0,0 +1,31 @@
+package model
+
+import (
+ "context"
+ "fmt"
+ "time"
+)
+
+func (s *sqlModel) UpdatePGPKey(ctx context.Context, key *PeerPGPKey) error {
+ q := `
+ INSERT INTO peer_pgp_keys
+ (peer_id, fingerprint, time_created)
+ SELECT
+ peers.id, :fingerprint, :time_created
+ FROM peers
+ WHERE peers.asn = :asn
+ ON CONFLICT (peer_id)
+ DO UPDATE SET
+ fingerprint = :fingerprint,
+ time_created = :time_created
+ `
+ data := &sqlPeerPGPKey{
+ Fingerprint: key.Fingerprint,
+ ASN: fmt.Sprintf("%d", key.PeerASN),
+ TimeCreated: time.Now().UnixNano(),
+ }
+ if _, err := s.db.NamedExecContext(ctx, q, data); err != nil {
+ return fmt.Errorf("INSERT peer_pgp_keys: %v", err)
+ }
+ return nil
+}
diff --git a/bgpwtf/cccampix/verifier/model/schema.go b/bgpwtf/cccampix/verifier/model/schema.go
new file mode 100644
index 0000000..093ecc8
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/schema.go
@@ -0,0 +1,65 @@
+package model
+
+import "database/sql"
+
+type sqlPeer struct {
+ ID string `db:"id"`
+ ASN string `db:"asn"`
+ Name string `db:"name"`
+ Source string `db:"source"`
+}
+
+type sqlPeerRouter struct {
+ ID string `db:"id"`
+ PeerID string `db:"peer_id"`
+ V6 sql.NullString `db:"v6"`
+ V4 sql.NullString `db:"v4"`
+ Source string `db:"source"`
+
+ // Fake, used by app logic.
+ ASN string `db:"asn"`
+}
+
+type sqlPeerCheck struct {
+ ID string `db:"id"`
+ PeerID string `db:"peer_id"`
+ CheckName string `db:"check_name"`
+ CheckTime int64 `db:"check_time"`
+ CheckStatus string `db:"check_status"`
+ CheckMessage string `db:"check_message"`
+ Delete bool `db:"delete"`
+
+ // Fake, used by app logic.
+ ASN string `db:"asn"`
+}
+
+type sqlPeerPGPKey struct {
+ ID string `db:"id"`
+ PeerID string `db:"peer_id"`
+ Fingerprint string `db:"fingerprint"`
+ TimeCreated int64 `db:"time_created"`
+
+ // Fake, used by app logic.
+ ASN string `db:"asn"`
+}
+
+type sqlSessionConfig struct {
+ ID string `db:"id"`
+ PeerID string `db:"peer_id"`
+ PeerRouterID string `db:"peer_router_id"`
+
+ BGPSecret string `db:"bgp_secret"`
+}
+
+type sqlAllowedPrefix struct {
+ ID string `db:"id"`
+ PeerID string `db:"peer_id"`
+ Timestamp int64 `db:"timestamp"`
+
+ Prefix string `db:"prefix"`
+ MaxLength int64 `db:"max_length"`
+ TA string `db:"ta"`
+
+ // Fake, used by app logic.
+ ASN string `db:"asn"`
+}
diff --git a/bgpwtf/cccampix/verifier/model/submit_checks.go b/bgpwtf/cccampix/verifier/model/submit_checks.go
new file mode 100644
index 0000000..79e2a84
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/model/submit_checks.go
@@ -0,0 +1,73 @@
+package model
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ "github.com/golang/glog"
+)
+
+func (s *sqlModel) SubmitPeerCheckResults(ctx context.Context, res []*PeerCheckResult) error {
+ tx := s.db.MustBeginTx(ctx, &sql.TxOptions{})
+ defer tx.Rollback()
+
+ q := `
+ UPDATE peer_checks
+ SET delete = true
+ `
+ if _, err := tx.ExecContext(ctx, q); err != nil {
+ return fmt.Errorf("UPDATE for deletion peer_checks: %v", err)
+ }
+
+ seenASNs := make(map[int64]bool)
+
+ for _, pcr := range res {
+ seenASNs[pcr.PeerASN] = true
+
+ q = `
+ INSERT INTO peer_checks
+ (peer_id, check_name, check_time, check_status, check_message, delete)
+ SELECT
+ peers.id, :check_name, :check_time, :check_status, :check_message, false
+ FROM peers
+ WHERE peers.asn = :asn
+ ON CONFLICT (peer_id, check_name)
+ DO UPDATE SET
+ check_time = :check_time,
+ check_status = :check_status,
+ check_message = :check_message,
+ delete = false
+ `
+ status := "uknown"
+ switch pcr.Status {
+ case PeerCheckStatus_Okay:
+ status = "okay"
+ case PeerCheckStatus_Failed:
+ status = "failed"
+ case PeerCheckStatus_SoftFailed:
+ glog.Infof("Skipping soft failure: %+v", pcr)
+ continue
+ }
+ cr := sqlPeerCheck{
+ CheckName: pcr.CheckName,
+ CheckTime: pcr.Time.UnixNano(),
+ CheckStatus: status,
+ CheckMessage: pcr.Message,
+ ASN: fmt.Sprintf("%d", pcr.PeerASN),
+ }
+ if _, err := tx.NamedExecContext(ctx, q, cr); err != nil {
+ return fmt.Errorf("INSERT peer_checks: %v", err)
+ }
+ }
+
+ q = `
+ DELETE FROM peer_checks
+ WHERE delete = true
+ `
+ if _, err := tx.ExecContext(ctx, q); err != nil {
+ return fmt.Errorf("DELETE FROM peer_checks: %v", err)
+ }
+
+ return tx.Commit()
+}
diff --git a/bgpwtf/cccampix/verifier/processor_irr.go b/bgpwtf/cccampix/verifier/processor_irr.go
new file mode 100644
index 0000000..49c4eb1
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/processor_irr.go
@@ -0,0 +1,247 @@
+package main
+
+import (
+ "context"
+ "encoding/hex"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "code.hackerspace.pl/hscloud/go/pki"
+ "github.com/golang/glog"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
+ "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
+)
+
+const (
+ RS_ASN = "AS208521"
+ RS_ASSET = "AS-CCCAMP19-IX"
+)
+
+type irr struct {
+ irrc pb.IRRClient
+}
+
+func newIRR(addr string) (processor, error) {
+ conn, err := grpc.Dial(addr, pki.WithClientHSPKI())
+ if err != nil {
+ return nil, fmt.Errorf("could not connect to irr service: %v", err)
+ }
+
+ return &irr{
+ irrc: pb.NewIRRClient(conn),
+ }, nil
+}
+
+func (i *irr) Name() string {
+ return "IRR"
+}
+
+func (i *irr) NextRun(now time.Time) time.Time {
+ return now.Add(5 * time.Minute)
+}
+
+func (i *irr) RunAll(ctx context.Context, m model.Model) error {
+ peers, err := m.GetCheckablePeers(ctx)
+ if err != nil {
+ return fmt.Errorf("could not retrieve peers: %v", err)
+ }
+
+ results := make(chan *model.PeerCheckResult)
+ pcr := []*model.PeerCheckResult{}
+ pcrDone := make(chan struct{})
+
+ pgpKeys := make(chan *model.PeerPGPKey)
+ pk := []*model.PeerPGPKey{}
+ pkDone := make(chan struct{})
+
+ go func() {
+ for res := range results {
+ pcr = append(pcr, res)
+ }
+ pcrDone <- struct{}{}
+ }()
+ go func() {
+ for res := range pgpKeys {
+ pk = append(pk, res)
+ }
+ pkDone <- struct{}{}
+ }()
+
+ fail := func(p *model.Peer, hard bool, f string, args ...interface{}) {
+ status := model.PeerCheckStatus_SoftFailed
+ if hard {
+ status = model.PeerCheckStatus_Failed
+ }
+ results <- &model.PeerCheckResult{
+ PeerASN: p.ASN,
+ CheckName: "irr",
+ Time: time.Now(),
+ Status: status,
+ Message: fmt.Sprintf(f, args...),
+ }
+
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(len(peers))
+
+ sem := make(chan struct{}, 10)
+
+ for _, peer := range peers {
+ go func(p *model.Peer) {
+ sem <- struct{}{}
+ defer func() {
+ <-sem
+ wg.Done()
+ }()
+
+ req := &pb.IRRQueryRequest{
+ As: fmt.Sprintf("%d", p.ASN),
+ }
+ res, err := i.irrc.Query(ctx, req)
+ if err != nil {
+ s, ok := status.FromError(err)
+ switch {
+ case ok && s.Code() == codes.NotFound:
+ fail(p, true, "ASN %d not found in IRR", p.ASN)
+ case ok && s.Code() == codes.Unimplemented:
+ fail(p, true, "ASN %d belongs to an unknown IRR/RIR", p.ASN)
+ case ok && s.Code() == codes.Unavailable:
+ fail(p, false, "could not contact IRR")
+ default:
+ glog.Errorf("IRR.Query(%d): %v", p.ASN, err)
+ fail(p, false, "unhandled IRR error")
+ }
+ return
+ }
+
+ importOkay := false
+ exportOkay := false
+ pgpKey := ""
+
+ for _, attr := range res.Attributes {
+ switch value := attr.Value.(type) {
+ case *pb.IRRAttribute_Remarks:
+ if ok, key := i.checkRemarks(value.Remarks); ok {
+ pgpKey = key
+ }
+ case *pb.IRRAttribute_Import:
+ if i.checkImport(value.Import) {
+ importOkay = true
+ }
+ case *pb.IRRAttribute_Export:
+ if i.checkExport(value.Export, p.ASN) {
+ exportOkay = true
+ }
+ }
+ }
+
+ switch {
+ case !importOkay:
+ fail(p, true, "no `import: from %s accept %s` entry", RS_ASN, RS_ASSET)
+ return
+ case !exportOkay:
+ fail(p, true, "no `export: to %s announce AS%d` entry", RS_ASN, p.ASN)
+ return
+ case pgpKey == "":
+ fail(p, true, "no `remarks: CCCAMP19-IX PGP: <...>` entry")
+ return
+ }
+
+ pgpKeys <- &model.PeerPGPKey{
+ PeerASN: p.ASN,
+ Fingerprint: pgpKey,
+ }
+
+ results <- &model.PeerCheckResult{
+ PeerASN: p.ASN,
+ CheckName: "irr",
+ Time: time.Now(),
+ Status: model.PeerCheckStatus_Okay,
+ Message: "",
+ }
+ }(peer)
+ }
+
+ wg.Wait()
+ close(results)
+ close(pgpKeys)
+ <-pcrDone
+ <-pkDone
+
+ err = m.SubmitPeerCheckResults(ctx, pcr)
+ if err != nil {
+ return err
+ }
+
+ for _, k := range pk {
+ err = m.UpdatePGPKey(ctx, k)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (i *irr) checkRemarks(remarks string) (bool, string) {
+ label := "cccamp19-ix pgp:"
+ remarks = strings.TrimSpace(strings.ToLower(remarks))
+ if !strings.HasPrefix(remarks, label) {
+ return false, ""
+ }
+
+ data := strings.TrimSpace(strings.TrimPrefix(remarks, label))
+ data = strings.ReplaceAll(data, " ", "")
+ data = strings.ReplaceAll(data, "\t", "")
+
+ if len(data) != 40 {
+ return false, ""
+ }
+
+ if _, err := hex.DecodeString(data); err != nil {
+ return false, ""
+ }
+
+ return true, data
+}
+
+func (i *irr) checkImport(imp *pb.IRRAttribute_ImportExport) bool {
+ if imp.ProtocolFrom != "" && strings.ToLower(imp.ProtocolFrom) != "bgp" {
+ return false
+ }
+ if strings.ToUpper(imp.Filter) != RS_ASSET {
+ return false
+ }
+
+ for _, expression := range imp.Expressions {
+ if strings.ToUpper(expression.Peering) == RS_ASN {
+ return true
+ }
+ }
+
+ return false
+}
+
+func (i *irr) checkExport(exp *pb.IRRAttribute_ImportExport, asn int64) bool {
+ if exp.ProtocolInto != "" && strings.ToLower(exp.ProtocolInto) != "bgp" {
+ return false
+ }
+ if strings.ToUpper(exp.Filter) != fmt.Sprintf("AS%d", asn) {
+ return false
+ }
+
+ for _, expression := range exp.Expressions {
+ if strings.ToUpper(expression.Peering) == RS_ASN {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/bgpwtf/cccampix/verifier/processor_peeringdb.go b/bgpwtf/cccampix/verifier/processor_peeringdb.go
new file mode 100644
index 0000000..8f29110
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/processor_peeringdb.go
@@ -0,0 +1,58 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "code.hackerspace.pl/hscloud/go/pki"
+ "google.golang.org/grpc"
+
+ pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
+ "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
+)
+
+type peeringDB struct {
+ pdb pb.PeeringDBProxyClient
+}
+
+func newPeeringDB(addr string) (processor, error) {
+ conn, err := grpc.Dial(addr, pki.WithClientHSPKI())
+ if err != nil {
+ return nil, fmt.Errorf("could not connect to peeringdb service: %v", err)
+ }
+
+ return &peeringDB{
+ pdb: pb.NewPeeringDBProxyClient(conn),
+ }, nil
+}
+
+func (p *peeringDB) Name() string {
+ return "PeeringDB"
+}
+
+func (p *peeringDB) NextRun(now time.Time) time.Time {
+ return now.Add(5 * time.Minute)
+}
+
+func (p *peeringDB) RunAll(ctx context.Context, m model.Model) error {
+ id := int64(2641)
+ req := &pb.GetIXMembersRequest{
+ Id: id,
+ }
+
+ res, err := p.pdb.GetIXMembers(ctx, req)
+ if err != nil {
+ return fmt.Errorf("GetIXMembers(%d): %v", id, err)
+ }
+
+ err = m.RecordPeeringDBPeers(ctx, res.Members)
+ if err != nil {
+ return fmt.Errorf("RecordPeeringDBPeers: %v", err)
+ }
+ err = m.RecordPeeringDBPeerRouters(ctx, res.Members)
+ if err != nil {
+ return fmt.Errorf("RecordPeeringDBPeerRouters: %v", err)
+ }
+ return nil
+}
diff --git a/bgpwtf/cccampix/verifier/processor_rpki.go b/bgpwtf/cccampix/verifier/processor_rpki.go
new file mode 100644
index 0000000..b00aed2
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/processor_rpki.go
@@ -0,0 +1,130 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+
+ "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
+ "github.com/golang/glog"
+)
+
+type rpki struct {
+ octorpki string
+}
+
+func newRPKI(octorpki string) (processor, error) {
+ return &rpki{
+ octorpki: octorpki,
+ }, nil
+}
+
+func (p *rpki) Name() string {
+ return "RPKI"
+}
+
+func (p *rpki) NextRun(now time.Time) time.Time {
+ return now.Add(1 * time.Minute)
+}
+
+type octorpkiRes struct {
+ Metadata struct {
+ Counts int64 `json:"counts"`
+ Generated int64 `json:"counts"`
+ Valid int64 `json:"counts"`
+ } `json:"metadata"`
+
+ ROAs []octorpkiROA `json:"roas"`
+}
+
+type octorpkiROA struct {
+ Prefix string `json:"prefix"`
+ MaxLength int64 `json:"maxLength"`
+ ASN string `json:"asn"`
+ TA string `json:"ta"`
+}
+
+func (p *rpki) RunAll(ctx context.Context, m model.Model) error {
+ peers, err := m.GetCheckablePeers(ctx)
+ if err != nil {
+ return err
+ }
+
+ wantASNs := make(map[string]bool)
+ for _, peer := range peers {
+ wantASNs[fmt.Sprintf("AS%d", peer.ASN)] = true
+ }
+
+ // Get RPKI data dump from OctoRPKI.
+ url := fmt.Sprintf("http://%s/output.json", p.octorpki)
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return fmt.Errorf("NewRequest(GET %q): %v", url, err)
+ }
+ req = req.WithContext(ctx)
+ client := http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return fmt.Errorf("GET %q: %v", url, err)
+ }
+ defer resp.Body.Close()
+
+ data, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf("GET %q: %v", url, err)
+ }
+
+ if strings.HasPrefix(string(data), "File not ready yet") {
+ return fmt.Errorf("OctoRPKI not yet ready")
+ }
+
+ var res octorpkiRes
+ if err := json.Unmarshal(data, &res); err != nil {
+ return fmt.Errorf("Could not decode OctoRPKI output: %v", err)
+ }
+
+ // Make list of prefixes we should honor.
+ prefixes := make(map[int64][]*model.AllowedPrefix)
+ for _, roa := range res.ROAs {
+ if !wantASNs[strings.ToUpper(roa.ASN)] {
+ continue
+ }
+
+ asn, err := strconv.ParseInt(roa.ASN[2:], 10, 64)
+ if err != nil {
+ glog.Errorf("Invalid ASN: %s %q", roa.ASN, roa.ASN)
+ continue
+ }
+
+ if _, ok := prefixes[asn]; !ok {
+ prefixes[asn] = []*model.AllowedPrefix{}
+ }
+
+ _, prefix, err := net.ParseCIDR(roa.Prefix)
+ if err != nil {
+ glog.Errorf("Invalid prefix: %s %q", roa.ASN, roa.Prefix)
+ continue
+ }
+
+ prefixes[asn] = append(prefixes[asn], &model.AllowedPrefix{
+ Prefix: *prefix,
+ MaxLength: roa.MaxLength,
+ TA: roa.TA,
+ })
+ }
+
+ for asn, p := range prefixes {
+ err := m.UpdateAllowedPrefixes(ctx, asn, p)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/bgpwtf/cccampix/verifier/processor_secretgen.go b/bgpwtf/cccampix/verifier/processor_secretgen.go
new file mode 100644
index 0000000..cefa1cc
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/processor_secretgen.go
@@ -0,0 +1,35 @@
+package main
+
+import (
+ "context"
+ "time"
+
+ "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
+ "github.com/sethvargo/go-password/password"
+)
+
+type secretGen struct {
+}
+
+func newSecretGen() (processor, error) {
+ return &secretGen{}, nil
+}
+
+func (p *secretGen) Name() string {
+ return "SecretGen"
+}
+
+func (p *secretGen) NextRun(now time.Time) time.Time {
+ return now.Add(1 * time.Minute)
+}
+
+func gen() model.SessionConfig {
+ secret := password.MustGenerate(16, 4, 0, false, true)
+ return model.SessionConfig{
+ BGPSecret: secret,
+ }
+}
+
+func (p *secretGen) RunAll(ctx context.Context, m model.Model) error {
+ return m.ConfigureMissingSessions(ctx, gen)
+}
diff --git a/bgpwtf/cccampix/verifier/processors.go b/bgpwtf/cccampix/verifier/processors.go
new file mode 100644
index 0000000..ff70e9d
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/processors.go
@@ -0,0 +1,15 @@
+package main
+
+import (
+ "context"
+ "time"
+
+ "code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
+)
+
+type processor interface {
+ Name() string
+ NextRun(time.Time) time.Time
+
+ RunAll(ctx context.Context, m model.Model) error
+}
diff --git a/bgpwtf/cccampix/verifier/state.go b/bgpwtf/cccampix/verifier/state.go
new file mode 100644
index 0000000..57e40c0
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/state.go
@@ -0,0 +1,4 @@
+package main
+
+type state struct {
+}
diff --git a/bgpwtf/cccampix/verifier/statusz.go b/bgpwtf/cccampix/verifier/statusz.go
new file mode 100644
index 0000000..46e9390
--- /dev/null
+++ b/bgpwtf/cccampix/verifier/statusz.go
@@ -0,0 +1,94 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "sort"
+
+ humanize "github.com/dustin/go-humanize"
+)
+
+const processorsFragment = `
+ <style type="text/css">
+ .table td,th {
+ background-color: #eee;
+ padding: 0.2em 0.4em 0.2em 0.4em;
+ }
+ .table th {
+ background-color: #c0c0c0;
+ }
+ .table {
+ background-color: #fff;
+ border-spacing: 0.2em;
+ margin-left: auto;
+ margin-right: auto;
+ }
+ </style>
+ <div>
+ <table class="table">
+ <tr>
+ <th>Name</th>
+ <th>Status</th>
+ <th>Last Run</th>
+ <th>Next Run</th>
+ </tr>
+ {{range .Processors }}
+ <tr>
+ <td>{{ .Name }}</td>
+ {{ if ne .Status "OK" }}
+ <td style="background-color: #ff3030;">{{ .Status }}</td>
+ {{ else }}
+ <td>{{ .Status }}</td>
+ {{ end }}
+ <td>{{ .LastRun }}</td>
+ <td>{{ .NextRun }}</td>
+ </tr>
+ {{end}}
+ </table>
+ </div>
+`
+
+type processorsFragmentEntry struct {
+ Name string
+ Status string
+ LastRun string
+ NextRun string
+}
+
+func (s *service) statuszProcessors(ctx context.Context) interface{} {
+ s.processorsMu.RLock()
+ defer s.processorsMu.RUnlock()
+
+ res := struct {
+ Processors []*processorsFragmentEntry
+ }{
+ Processors: make([]*processorsFragmentEntry, len(s.processors)),
+ }
+
+ i := 0
+ for _, processor := range s.processors {
+ lastRun := "never"
+ if processor.lastRun != nil {
+ lastRun = humanize.Time(*processor.lastRun)
+ }
+ nextRun := "any second now"
+ if nr := processor.nextRun(); nr != nil {
+ nextRun = humanize.Time(*nr)
+ }
+ status := "OK"
+ if processor.lastErr != nil {
+ status = fmt.Sprintf("%v", processor.lastErr)
+ }
+ res.Processors[i] = &processorsFragmentEntry{
+ Name: processor.name,
+ Status: status,
+ LastRun: lastRun,
+ NextRun: nextRun,
+ }
+ i += 1
+ }
+
+ sort.Slice(res.Processors, func(i, j int) bool { return res.Processors[i].Name < res.Processors[j].Name })
+
+ return res
+}