blob: 38a68d9a5d2c2e5c01a755fbfc6fe4bdced9c504 [file] [log] [blame]
// kubenat implements a data source for undoing NAT on hosts running
// Kubernetes/containerd workloads.
//
// It parses the kernel conntrack NAT translation table to figure out the IP
// address of the pod that was making the connection.
//
// It then uses the containerd API to figure out what pod runs under what IP
// address.
//
// Both conntrack and containerd access is cached and only updated when needed.
// This means that as long as a TCP connection is open, identd will be able to
// respond about its information without having to perform any OS/containerd
// queries.
//
// Unfortunately, there is very little in terms of development/test harnesses
// for kubenat. You will have to have a locally running containerd, or do some
// mounts/forwards from a remote host.
package kubenat
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
)
// Resolver is the main interface for kubenat. It runs background processing to
// update conntrack/containerd state, and resolves Tuple4s into PodInfo.
type Resolver struct {
conntrackPath string
criPath string
translationC chan *translationReq
podInfoC chan *podInfoReq
}
// Tuple4 is a 4-tuple of a TCP connection. Local describes the machine running
// this code, not the listen/connect 'ends' of TCP.
type Tuple4 struct {
RemoteIP net.IP
RemotePort uint16
LocalIP net.IP
LocalPort uint16
}
func (t *Tuple4) String() string {
local := net.JoinHostPort(t.LocalIP.String(), fmt.Sprintf("%d", t.LocalPort))
remote := net.JoinHostPort(t.RemoteIP.String(), fmt.Sprintf("%d", t.RemotePort))
return fmt.Sprintf("L: %s R: %s", local, remote)
}
// PodInfo describes a Kubernetes pod which terminates a given Tuple4 connection.
type PodInfo struct {
// PodIP is the IP address of the pod within the pod network.
PodIP net.IP
// PodTranslatedPort is the port on the PodIP corresponding to the Tuple4
// that this PodInfo was requested for.
PodTranslatedPort uint16
// KubernetesNamespace is the kubernetes namespace in which this pod is
// running.
KubernetesNamespace string
// Name is the name of the pod, as seen by kubernetes.
Name string
}
// NewResolver startss a resolver with a given path to /paroc/net/nf_conntrack
// and a CRI gRPC domain socket.
func NewResolver(ctx context.Context, conntrackPath, criPath string) (*Resolver, error) {
r := Resolver{
conntrackPath: conntrackPath,
criPath: criPath,
translationC: make(chan *translationReq),
podInfoC: make(chan *podInfoReq),
}
// TODO(q3k): bubble up errors from the translation worker into here?
go r.runTranslationWorker(ctx)
// The pod worker might fail on CRI connectivity issues, so we attempt to
// restart it with a backoff if needed.
go func() {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 0
bo.Reset()
for {
err := r.runPodWorker(ctx)
if err == nil || errors.Is(err, ctx.Err()) {
glog.Infof("podWorker exiting")
return
}
glog.Errorf("podWorker failed: %v", err)
wait := bo.NextBackOff()
glog.Errorf("restarting podWorker in %v", wait)
time.Sleep(wait)
}
}()
return &r, nil
}
// ResolvePod returns information about a running pod for a given TCP 4-tuple.
// If the 4-tuple or pod cannot be resolved, an error will be returned.
func (r *Resolver) ResolvePod(ctx context.Context, t *Tuple4) (*PodInfo, error) {
// TODO(q3k): expose translation/pod not found errors as package-level
// vars, or use gRPC statuses?
podAddr, err := r.translate(ctx, t)
if err != nil {
return nil, fmt.Errorf("translate: %w", err)
}
if podAddr == nil {
return nil, fmt.Errorf("translation not found")
}
podInfo, err := r.getPodInfo(ctx, podAddr.localIP)
if err != nil {
return nil, fmt.Errorf("getPodInfo: %w", err)
}
if podInfo == nil {
return nil, fmt.Errorf("pod not found")
}
return &PodInfo{
PodIP: podAddr.localIP,
PodTranslatedPort: podAddr.localPort,
KubernetesNamespace: podInfo.namespace,
Name: podInfo.name,
}, nil
}