cluster/identd/kubenat: implement
This is a library to find pod information for a given TCP 4-tuple.
Change-Id: I254983e579e3aaa04c0c5491851f4af94a3f4249
diff --git a/cluster/identd/kubenat/kubenat.go b/cluster/identd/kubenat/kubenat.go
new file mode 100644
index 0000000..38a68d9
--- /dev/null
+++ b/cluster/identd/kubenat/kubenat.go
@@ -0,0 +1,130 @@
+// kubenat implements a data source for undoing NAT on hosts running
+// Kubernetes/containerd workloads.
+//
+// It parses the kernel conntrack NAT translation table to figure out the IP
+// address of the pod that was making the connection.
+//
+// It then uses the containerd API to figure out what pod runs under what IP
+// address.
+//
+// Both conntrack and containerd access is cached and only updated when needed.
+// This means that as long as a TCP connection is open, identd will be able to
+// respond about its information without having to perform any OS/containerd
+// queries.
+//
+// Unfortunately, there is very little in terms of development/test harnesses
+// for kubenat. You will have to have a locally running containerd, or do some
+// mounts/forwards from a remote host.
+package kubenat
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net"
+ "time"
+
+ "github.com/cenkalti/backoff"
+ "github.com/golang/glog"
+)
+
+// Resolver is the main interface for kubenat. It runs background processing to
+// update conntrack/containerd state, and resolves Tuple4s into PodInfo.
+type Resolver struct {
+ conntrackPath string
+ criPath string
+
+ translationC chan *translationReq
+ podInfoC chan *podInfoReq
+}
+
+// Tuple4 is a 4-tuple of a TCP connection. Local describes the machine running
+// this code, not the listen/connect 'ends' of TCP.
+type Tuple4 struct {
+ RemoteIP net.IP
+ RemotePort uint16
+ LocalIP net.IP
+ LocalPort uint16
+}
+
+func (t *Tuple4) String() string {
+ local := net.JoinHostPort(t.LocalIP.String(), fmt.Sprintf("%d", t.LocalPort))
+ remote := net.JoinHostPort(t.RemoteIP.String(), fmt.Sprintf("%d", t.RemotePort))
+ return fmt.Sprintf("L: %s R: %s", local, remote)
+}
+
+// PodInfo describes a Kubernetes pod which terminates a given Tuple4 connection.
+type PodInfo struct {
+ // PodIP is the IP address of the pod within the pod network.
+ PodIP net.IP
+ // PodTranslatedPort is the port on the PodIP corresponding to the Tuple4
+ // that this PodInfo was requested for.
+ PodTranslatedPort uint16
+ // KubernetesNamespace is the kubernetes namespace in which this pod is
+ // running.
+ KubernetesNamespace string
+ // Name is the name of the pod, as seen by kubernetes.
+ Name string
+}
+
+// NewResolver startss a resolver with a given path to /paroc/net/nf_conntrack
+// and a CRI gRPC domain socket.
+func NewResolver(ctx context.Context, conntrackPath, criPath string) (*Resolver, error) {
+ r := Resolver{
+ conntrackPath: conntrackPath,
+ criPath: criPath,
+
+ translationC: make(chan *translationReq),
+ podInfoC: make(chan *podInfoReq),
+ }
+ // TODO(q3k): bubble up errors from the translation worker into here?
+ go r.runTranslationWorker(ctx)
+ // The pod worker might fail on CRI connectivity issues, so we attempt to
+ // restart it with a backoff if needed.
+ go func() {
+ bo := backoff.NewExponentialBackOff()
+ bo.MaxElapsedTime = 0
+ bo.Reset()
+ for {
+ err := r.runPodWorker(ctx)
+ if err == nil || errors.Is(err, ctx.Err()) {
+ glog.Infof("podWorker exiting")
+ return
+ }
+ glog.Errorf("podWorker failed: %v", err)
+ wait := bo.NextBackOff()
+ glog.Errorf("restarting podWorker in %v", wait)
+ time.Sleep(wait)
+ }
+ }()
+
+ return &r, nil
+}
+
+// ResolvePod returns information about a running pod for a given TCP 4-tuple.
+// If the 4-tuple or pod cannot be resolved, an error will be returned.
+func (r *Resolver) ResolvePod(ctx context.Context, t *Tuple4) (*PodInfo, error) {
+ // TODO(q3k): expose translation/pod not found errors as package-level
+ // vars, or use gRPC statuses?
+ podAddr, err := r.translate(ctx, t)
+ if err != nil {
+ return nil, fmt.Errorf("translate: %w", err)
+ }
+ if podAddr == nil {
+ return nil, fmt.Errorf("translation not found")
+ }
+ podInfo, err := r.getPodInfo(ctx, podAddr.localIP)
+ if err != nil {
+ return nil, fmt.Errorf("getPodInfo: %w", err)
+ }
+ if podInfo == nil {
+ return nil, fmt.Errorf("pod not found")
+ }
+
+ return &PodInfo{
+ PodIP: podAddr.localIP,
+ PodTranslatedPort: podAddr.localPort,
+ KubernetesNamespace: podInfo.namespace,
+ Name: podInfo.name,
+ }, nil
+}