| // kubenat implements a data source for undoing NAT on hosts running |
| // Kubernetes/containerd workloads. |
| // |
| // It parses the kernel conntrack NAT translation table to figure out the IP |
| // address of the pod that was making the connection. |
| // |
| // It then uses the containerd API to figure out what pod runs under what IP |
| // address. |
| // |
| // Both conntrack and containerd access is cached and only updated when needed. |
| // This means that as long as a TCP connection is open, identd will be able to |
| // respond about its information without having to perform any OS/containerd |
| // queries. |
| // |
| // Unfortunately, there is very little in terms of development/test harnesses |
| // for kubenat. You will have to have a locally running containerd, or do some |
| // mounts/forwards from a remote host. |
| package kubenat |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "net" |
| "time" |
| |
| "github.com/cenkalti/backoff" |
| "github.com/golang/glog" |
| ) |
| |
| // Resolver is the main interface for kubenat. It runs background processing to |
| // update conntrack/containerd state, and resolves Tuple4s into PodInfo. |
| type Resolver struct { |
| conntrackPath string |
| criPath string |
| |
| translationC chan *translationReq |
| podInfoC chan *podInfoReq |
| } |
| |
| // Tuple4 is a 4-tuple of a TCP connection. Local describes the machine running |
| // this code, not the listen/connect 'ends' of TCP. |
| type Tuple4 struct { |
| RemoteIP net.IP |
| RemotePort uint16 |
| LocalIP net.IP |
| LocalPort uint16 |
| } |
| |
| func (t *Tuple4) String() string { |
| local := net.JoinHostPort(t.LocalIP.String(), fmt.Sprintf("%d", t.LocalPort)) |
| remote := net.JoinHostPort(t.RemoteIP.String(), fmt.Sprintf("%d", t.RemotePort)) |
| return fmt.Sprintf("L: %s R: %s", local, remote) |
| } |
| |
| // PodInfo describes a Kubernetes pod which terminates a given Tuple4 connection. |
| type PodInfo struct { |
| // PodIP is the IP address of the pod within the pod network. |
| PodIP net.IP |
| // PodTranslatedPort is the port on the PodIP corresponding to the Tuple4 |
| // that this PodInfo was requested for. |
| PodTranslatedPort uint16 |
| // KubernetesNamespace is the kubernetes namespace in which this pod is |
| // running. |
| KubernetesNamespace string |
| // Name is the name of the pod, as seen by kubernetes. |
| Name string |
| } |
| |
| // NewResolver startss a resolver with a given path to /paroc/net/nf_conntrack |
| // and a CRI gRPC domain socket. |
| func NewResolver(ctx context.Context, conntrackPath, criPath string) (*Resolver, error) { |
| r := Resolver{ |
| conntrackPath: conntrackPath, |
| criPath: criPath, |
| |
| translationC: make(chan *translationReq), |
| podInfoC: make(chan *podInfoReq), |
| } |
| // TODO(q3k): bubble up errors from the translation worker into here? |
| go r.runTranslationWorker(ctx) |
| // The pod worker might fail on CRI connectivity issues, so we attempt to |
| // restart it with a backoff if needed. |
| go func() { |
| bo := backoff.NewExponentialBackOff() |
| bo.MaxElapsedTime = 0 |
| bo.Reset() |
| for { |
| err := r.runPodWorker(ctx) |
| if err == nil || errors.Is(err, ctx.Err()) { |
| glog.Infof("podWorker exiting") |
| return |
| } |
| glog.Errorf("podWorker failed: %v", err) |
| wait := bo.NextBackOff() |
| glog.Errorf("restarting podWorker in %v", wait) |
| time.Sleep(wait) |
| } |
| }() |
| |
| return &r, nil |
| } |
| |
| // ResolvePod returns information about a running pod for a given TCP 4-tuple. |
| // If the 4-tuple or pod cannot be resolved, an error will be returned. |
| func (r *Resolver) ResolvePod(ctx context.Context, t *Tuple4) (*PodInfo, error) { |
| // TODO(q3k): expose translation/pod not found errors as package-level |
| // vars, or use gRPC statuses? |
| podAddr, err := r.translate(ctx, t) |
| if err != nil { |
| return nil, fmt.Errorf("translate: %w", err) |
| } |
| if podAddr == nil { |
| return nil, fmt.Errorf("translation not found") |
| } |
| podInfo, err := r.getPodInfo(ctx, podAddr.localIP) |
| if err != nil { |
| return nil, fmt.Errorf("getPodInfo: %w", err) |
| } |
| if podInfo == nil { |
| return nil, fmt.Errorf("pod not found") |
| } |
| |
| return &PodInfo{ |
| PodIP: podAddr.localIP, |
| PodTranslatedPort: podAddr.localPort, |
| KubernetesNamespace: podInfo.namespace, |
| Name: podInfo.name, |
| }, nil |
| } |