cluster/identd/kubenat: implement

This is a library to find pod information for a given TCP 4-tuple.

Change-Id: I254983e579e3aaa04c0c5491851f4af94a3f4249
diff --git a/cluster/identd/kubenat/kubenat.go b/cluster/identd/kubenat/kubenat.go
new file mode 100644
index 0000000..38a68d9
--- /dev/null
+++ b/cluster/identd/kubenat/kubenat.go
@@ -0,0 +1,130 @@
+// kubenat implements a data source for undoing NAT on hosts running
+// Kubernetes/containerd workloads.
+//
+// It parses the kernel conntrack NAT translation table to figure out the IP
+// address of the pod that was making the connection.
+//
+// It then uses the containerd API to figure out what pod runs under what IP
+// address.
+//
+// Both conntrack and containerd access is cached and only updated when needed.
+// This means that as long as a TCP connection is open, identd will be able to
+// respond about its information without having to perform any OS/containerd
+// queries.
+//
+// Unfortunately, there is very little in terms of development/test harnesses
+// for kubenat. You will have to have a locally running containerd, or do some
+// mounts/forwards from a remote host.
+package kubenat
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"net"
+	"time"
+
+	"github.com/cenkalti/backoff"
+	"github.com/golang/glog"
+)
+
+// Resolver is the main interface for kubenat. It runs background processing to
+// update conntrack/containerd state, and resolves Tuple4s into PodInfo.
+type Resolver struct {
+	conntrackPath string
+	criPath       string
+
+	translationC chan *translationReq
+	podInfoC     chan *podInfoReq
+}
+
+// Tuple4 is a 4-tuple of a TCP connection. Local describes the machine running
+// this code, not the listen/connect 'ends' of TCP.
+type Tuple4 struct {
+	RemoteIP   net.IP
+	RemotePort uint16
+	LocalIP    net.IP
+	LocalPort  uint16
+}
+
+func (t *Tuple4) String() string {
+	local := net.JoinHostPort(t.LocalIP.String(), fmt.Sprintf("%d", t.LocalPort))
+	remote := net.JoinHostPort(t.RemoteIP.String(), fmt.Sprintf("%d", t.RemotePort))
+	return fmt.Sprintf("L: %s R: %s", local, remote)
+}
+
+// PodInfo describes a Kubernetes pod which terminates a given Tuple4 connection.
+type PodInfo struct {
+	// PodIP is the IP address of the pod within the pod network.
+	PodIP net.IP
+	// PodTranslatedPort is the port on the PodIP corresponding to the Tuple4
+	// that this PodInfo was requested for.
+	PodTranslatedPort uint16
+	// KubernetesNamespace is the kubernetes namespace in which this pod is
+	// running.
+	KubernetesNamespace string
+	// Name is the name of the pod, as seen by kubernetes.
+	Name string
+}
+
+// NewResolver startss a resolver with a given path to /paroc/net/nf_conntrack
+// and a CRI gRPC domain socket.
+func NewResolver(ctx context.Context, conntrackPath, criPath string) (*Resolver, error) {
+	r := Resolver{
+		conntrackPath: conntrackPath,
+		criPath:       criPath,
+
+		translationC: make(chan *translationReq),
+		podInfoC:     make(chan *podInfoReq),
+	}
+	// TODO(q3k): bubble up errors from the translation worker into here?
+	go r.runTranslationWorker(ctx)
+	// The pod worker might fail on CRI connectivity issues, so we attempt to
+	// restart it with a backoff if needed.
+	go func() {
+		bo := backoff.NewExponentialBackOff()
+		bo.MaxElapsedTime = 0
+		bo.Reset()
+		for {
+			err := r.runPodWorker(ctx)
+			if err == nil || errors.Is(err, ctx.Err()) {
+				glog.Infof("podWorker exiting")
+				return
+			}
+			glog.Errorf("podWorker failed: %v", err)
+			wait := bo.NextBackOff()
+			glog.Errorf("restarting podWorker in %v", wait)
+			time.Sleep(wait)
+		}
+	}()
+
+	return &r, nil
+}
+
+// ResolvePod returns information about a running pod for a given TCP 4-tuple.
+// If the 4-tuple or pod cannot be resolved, an error will be returned.
+func (r *Resolver) ResolvePod(ctx context.Context, t *Tuple4) (*PodInfo, error) {
+	// TODO(q3k): expose translation/pod not found errors as package-level
+	// vars, or use gRPC statuses?
+	podAddr, err := r.translate(ctx, t)
+	if err != nil {
+		return nil, fmt.Errorf("translate: %w", err)
+	}
+	if podAddr == nil {
+		return nil, fmt.Errorf("translation not found")
+	}
+	podInfo, err := r.getPodInfo(ctx, podAddr.localIP)
+	if err != nil {
+		return nil, fmt.Errorf("getPodInfo: %w", err)
+	}
+	if podInfo == nil {
+		return nil, fmt.Errorf("pod not found")
+	}
+
+	return &PodInfo{
+		PodIP:               podAddr.localIP,
+		PodTranslatedPort:   podAddr.localPort,
+		KubernetesNamespace: podInfo.namespace,
+		Name:                podInfo.name,
+	}, nil
+}