cluster/identd/kubenat: implement
This is a library to find pod information for a given TCP 4-tuple.
Change-Id: I254983e579e3aaa04c0c5491851f4af94a3f4249
diff --git a/cluster/identd/kubenat/BUILD.bazel b/cluster/identd/kubenat/BUILD.bazel
new file mode 100644
index 0000000..eeb97ef
--- /dev/null
+++ b/cluster/identd/kubenat/BUILD.bazel
@@ -0,0 +1,34 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "kubenat.go",
+ "pods.go",
+ "translation.go",
+ ],
+ importpath = "code.hackerspace.pl/hscloud/cluster/identd/kubenat",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//cluster/identd/cri:go_default_library",
+ "@com_github_cenkalti_backoff//:go_default_library",
+ "@com_github_golang_glog//:go_default_library",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//codes:go_default_library",
+ "@org_golang_google_grpc//status:go_default_library",
+ ],
+)
+
+go_test(
+ name = "go_default_test",
+ srcs = [
+ "kubenat_test.go",
+ "pods_test.go",
+ "translation_test.go",
+ ],
+ embed = [":go_default_library"],
+ deps = [
+ "@com_github_go_test_deep//:go_default_library",
+ "@com_github_golang_glog//:go_default_library",
+ ],
+)
diff --git a/cluster/identd/kubenat/kubenat.go b/cluster/identd/kubenat/kubenat.go
new file mode 100644
index 0000000..38a68d9
--- /dev/null
+++ b/cluster/identd/kubenat/kubenat.go
@@ -0,0 +1,130 @@
+// kubenat implements a data source for undoing NAT on hosts running
+// Kubernetes/containerd workloads.
+//
+// It parses the kernel conntrack NAT translation table to figure out the IP
+// address of the pod that was making the connection.
+//
+// It then uses the containerd API to figure out what pod runs under what IP
+// address.
+//
+// Both conntrack and containerd access is cached and only updated when needed.
+// This means that as long as a TCP connection is open, identd will be able to
+// respond about its information without having to perform any OS/containerd
+// queries.
+//
+// Unfortunately, there is very little in terms of development/test harnesses
+// for kubenat. You will have to have a locally running containerd, or do some
+// mounts/forwards from a remote host.
+package kubenat
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net"
+ "time"
+
+ "github.com/cenkalti/backoff"
+ "github.com/golang/glog"
+)
+
+// Resolver is the main interface for kubenat. It runs background processing to
+// update conntrack/containerd state, and resolves Tuple4s into PodInfo.
+type Resolver struct {
+ conntrackPath string
+ criPath string
+
+ translationC chan *translationReq
+ podInfoC chan *podInfoReq
+}
+
+// Tuple4 is a 4-tuple of a TCP connection. Local describes the machine running
+// this code, not the listen/connect 'ends' of TCP.
+type Tuple4 struct {
+ RemoteIP net.IP
+ RemotePort uint16
+ LocalIP net.IP
+ LocalPort uint16
+}
+
+func (t *Tuple4) String() string {
+ local := net.JoinHostPort(t.LocalIP.String(), fmt.Sprintf("%d", t.LocalPort))
+ remote := net.JoinHostPort(t.RemoteIP.String(), fmt.Sprintf("%d", t.RemotePort))
+ return fmt.Sprintf("L: %s R: %s", local, remote)
+}
+
+// PodInfo describes a Kubernetes pod which terminates a given Tuple4 connection.
+type PodInfo struct {
+ // PodIP is the IP address of the pod within the pod network.
+ PodIP net.IP
+ // PodTranslatedPort is the port on the PodIP corresponding to the Tuple4
+ // that this PodInfo was requested for.
+ PodTranslatedPort uint16
+ // KubernetesNamespace is the kubernetes namespace in which this pod is
+ // running.
+ KubernetesNamespace string
+ // Name is the name of the pod, as seen by kubernetes.
+ Name string
+}
+
+// NewResolver startss a resolver with a given path to /paroc/net/nf_conntrack
+// and a CRI gRPC domain socket.
+func NewResolver(ctx context.Context, conntrackPath, criPath string) (*Resolver, error) {
+ r := Resolver{
+ conntrackPath: conntrackPath,
+ criPath: criPath,
+
+ translationC: make(chan *translationReq),
+ podInfoC: make(chan *podInfoReq),
+ }
+ // TODO(q3k): bubble up errors from the translation worker into here?
+ go r.runTranslationWorker(ctx)
+ // The pod worker might fail on CRI connectivity issues, so we attempt to
+ // restart it with a backoff if needed.
+ go func() {
+ bo := backoff.NewExponentialBackOff()
+ bo.MaxElapsedTime = 0
+ bo.Reset()
+ for {
+ err := r.runPodWorker(ctx)
+ if err == nil || errors.Is(err, ctx.Err()) {
+ glog.Infof("podWorker exiting")
+ return
+ }
+ glog.Errorf("podWorker failed: %v", err)
+ wait := bo.NextBackOff()
+ glog.Errorf("restarting podWorker in %v", wait)
+ time.Sleep(wait)
+ }
+ }()
+
+ return &r, nil
+}
+
+// ResolvePod returns information about a running pod for a given TCP 4-tuple.
+// If the 4-tuple or pod cannot be resolved, an error will be returned.
+func (r *Resolver) ResolvePod(ctx context.Context, t *Tuple4) (*PodInfo, error) {
+ // TODO(q3k): expose translation/pod not found errors as package-level
+ // vars, or use gRPC statuses?
+ podAddr, err := r.translate(ctx, t)
+ if err != nil {
+ return nil, fmt.Errorf("translate: %w", err)
+ }
+ if podAddr == nil {
+ return nil, fmt.Errorf("translation not found")
+ }
+ podInfo, err := r.getPodInfo(ctx, podAddr.localIP)
+ if err != nil {
+ return nil, fmt.Errorf("getPodInfo: %w", err)
+ }
+ if podInfo == nil {
+ return nil, fmt.Errorf("pod not found")
+ }
+
+ return &PodInfo{
+ PodIP: podAddr.localIP,
+ PodTranslatedPort: podAddr.localPort,
+ KubernetesNamespace: podInfo.namespace,
+ Name: podInfo.name,
+ }, nil
+}
diff --git a/cluster/identd/kubenat/kubenat_test.go b/cluster/identd/kubenat/kubenat_test.go
new file mode 100644
index 0000000..78afd22
--- /dev/null
+++ b/cluster/identd/kubenat/kubenat_test.go
@@ -0,0 +1,43 @@
+package kubenat
+
+import (
+ "context"
+ "flag"
+ "net"
+ "testing"
+)
+
+func TestResolvePod(t *testing.T) {
+ t.Skip("needs containerd running on host and unhardcoded test data")
+ flag.Set("logtostderr", "true")
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ r, err := NewResolver(ctx, "/tmp/conntrack", "/tmp/containerd.sock")
+ if err != nil {
+ t.Fatalf("NewResolver: %v", err)
+ }
+
+ pi, err := r.ResolvePod(ctx, &Tuple4{
+ RemoteIP: net.IPv4(185, 191, 225, 10),
+ RemotePort: 6697,
+ LocalIP: net.IPv4(185, 236, 240, 36),
+ LocalPort: 53449,
+ })
+ if err != nil {
+ t.Fatalf("ResolvePod: %v", err)
+ }
+ if want, got := net.IPv4(10, 10, 26, 23), pi.PodIP; !want.Equal(got) {
+ t.Errorf("Wanted pod IP %v, got %v", want, got)
+ }
+ if want, got := uint16(54782), pi.PodTranslatedPort; want != got {
+ t.Errorf("Wanted pod port %d, got %d", want, got)
+ }
+ if want, got := "matrix", pi.KubernetesNamespace; want != got {
+ t.Errorf("Wanted pod namespace %q, got %q", want, got)
+ }
+ if want, got := "appservice-irc-freenode-68977cdd5f-kfzl6", pi.Name; want != got {
+ t.Errorf("Wanted pod name %q, got %q", want, got)
+ }
+}
diff --git a/cluster/identd/kubenat/pods.go b/cluster/identd/kubenat/pods.go
new file mode 100644
index 0000000..eab0e22
--- /dev/null
+++ b/cluster/identd/kubenat/pods.go
@@ -0,0 +1,176 @@
+package kubenat
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ "github.com/golang/glog"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "code.hackerspace.pl/hscloud/cluster/identd/cri"
+)
+
+// podInfoReq is a request passed to the podWorker.
+type podInfoReq struct {
+ local net.IP
+ res chan *podInfoResp
+}
+
+// podInfoResp is a response from a podWorker, sent over the res channel in a
+// podInfoReq.
+type podInfoResp struct {
+ name string
+ namespace string
+}
+
+// reply sends a reply to the given podInfoReq based on a CRI PodSandboxStatus,
+// sending nil if the status is nil.
+func (r *podInfoReq) reply(s *cri.PodSandboxStatus) {
+ if s == nil {
+ r.res <- nil
+ return
+ }
+ r.res <- &podInfoResp{
+ name: s.Metadata.Name,
+ namespace: s.Metadata.Namespace,
+ }
+}
+
+// getPodInfo performs a podInfoReq/podInfoResp exchange under a context that
+// can be used to time out the query.
+func (r *Resolver) getPodInfo(ctx context.Context, local net.IP) (*podInfoResp, error) {
+ resC := make(chan *podInfoResp, 1)
+ r.podInfoC <- &podInfoReq{
+ local: local,
+ res: resC,
+ }
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case res := <-resC:
+ return res, nil
+ }
+}
+
+// podStatus is a cache of data retrieved from CRI.
+type podStatus struct {
+ // info is a map from pod sandbox ID to PodSandboxStatus as retrieved from
+ // CRI.
+ info map[string]*cri.PodSandboxStatus
+ // byIP is a map from pod IP (as string) to pod sandbox ID.
+ byIP map[string]string
+}
+
+// update performs an update of the podStatus from CRI. It only retrieves
+// information about pods that it doesn't yet have, and ensures that pods which
+// do not exist in CRI are also removed from podStatus.
+// TODO(q3k): make sure we don't cache PodSandboxStatus too early, eg. when
+// it's not yet fully running?
+func (p *podStatus) update(ctx context.Context, client cri.RuntimeServiceClient) error {
+ res, err := client.ListPodSandbox(ctx, &cri.ListPodSandboxRequest{})
+ if err != nil {
+ return fmt.Errorf("ListPodSandbox: %w", err)
+ }
+
+ // set of all pod sandbox IDs in CRI.
+ want := make(map[string]bool)
+ // set of pod sandbox IDs in CRI that are not in podStatus.
+ missing := make(map[string]bool)
+ for _, item := range res.Items {
+ want[item.Id] = true
+ if _, ok := p.info[item.Id]; ok {
+ continue
+ }
+ missing[item.Id] = true
+ }
+
+ // Get information about missing pod IDs into podStatus.
+ for id, _ := range missing {
+ res, err := client.PodSandboxStatus(ctx, &cri.PodSandboxStatusRequest{
+ PodSandboxId: id,
+ })
+ if err != nil {
+ if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+ continue
+ } else {
+ return fmt.Errorf("while getting sandbox %s: %v", id, err)
+ }
+ }
+ p.info[id] = res.Status
+ }
+
+ // byIP is fully repopulated on each update.
+ p.byIP = make(map[string]string)
+
+ // remove is the set of pods sandbox IDs that should be removed from podStatus.
+ remove := make(map[string]bool)
+ // Populate remove and p.byId in a single pass.
+ for id, info := range p.info {
+ if _, ok := want[id]; !ok {
+ remove[id] = true
+ continue
+ }
+ if info.Network == nil {
+ continue
+ }
+ if info.Network.Ip == "" {
+ continue
+ }
+ p.byIP[info.Network.Ip] = id
+ }
+ // Remove stale pod sandbox IDs from podStatus.
+ for id, _ := range remove {
+ delete(p.info, id)
+ }
+ return nil
+}
+
+// findByPodID returns a PodSandboxStatus for the pod running under a given pod
+// IP address, or nil if not found.
+func (p *podStatus) findByPodIP(ip net.IP) *cri.PodSandboxStatus {
+ id, ok := p.byIP[ip.String()]
+ if !ok {
+ return nil
+ }
+ return p.info[id]
+}
+
+// runPodWorker runs the CRI cache 'pod worker'. It responds to requests over
+// podInfoC until ctx is canceled.
+func (r *Resolver) runPodWorker(ctx context.Context) error {
+ conn, err := grpc.Dial(fmt.Sprintf("unix://%s", r.criPath), grpc.WithInsecure())
+ if err != nil {
+ return fmt.Errorf("Dial: %w", err)
+ }
+ defer conn.Close()
+ client := cri.NewRuntimeServiceClient(conn)
+
+ ps := &podStatus{
+ info: make(map[string]*cri.PodSandboxStatus),
+ }
+ if err := ps.update(ctx, client); err != nil {
+ return fmt.Errorf("initial pod update: %w", err)
+ }
+
+ for {
+ select {
+ case req := <-r.podInfoC:
+ info := ps.findByPodIP(req.local)
+ if info != nil {
+ req.reply(info)
+ continue
+ }
+ err := ps.update(ctx, client)
+ if err != nil {
+ glog.Errorf("Updating pods failed: %v", err)
+ continue
+ }
+ req.reply(ps.findByPodIP(req.local))
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+}
diff --git a/cluster/identd/kubenat/pods_test.go b/cluster/identd/kubenat/pods_test.go
new file mode 100644
index 0000000..3fd26a3
--- /dev/null
+++ b/cluster/identd/kubenat/pods_test.go
@@ -0,0 +1,42 @@
+package kubenat
+
+import (
+ "context"
+ "flag"
+ "net"
+ "testing"
+
+ "github.com/golang/glog"
+)
+
+func TestPodWorker(t *testing.T) {
+ t.Skip("needs containerd running on host and unhardcoded test data")
+ flag.Set("logtostderr", "true")
+
+ r := &Resolver{
+ criPath: "/tmp/containerd.sock",
+ podInfoC: make(chan *podInfoReq),
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ go func() {
+ err := r.runPodWorker(ctx)
+ if err != nil && err != ctx.Err() {
+ glog.Errorf("runPodWorker: %v", err)
+ }
+ }()
+
+ res, err := r.getPodInfo(ctx, net.IPv4(10, 10, 26, 23))
+ if err != nil {
+ t.Fatalf("got err: %v", err)
+ }
+ if res == nil {
+ t.Fatalf("got nil pod response")
+ }
+
+ if want, got := "matrix", res.namespace; want != got {
+ t.Errorf("namespace: got %q, wanted %q", want, got)
+ }
+}
diff --git a/cluster/identd/kubenat/translation.go b/cluster/identd/kubenat/translation.go
new file mode 100644
index 0000000..ffb6c94
--- /dev/null
+++ b/cluster/identd/kubenat/translation.go
@@ -0,0 +1,341 @@
+package kubenat
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "fmt"
+ "io/ioutil"
+ "net"
+ "strconv"
+ "strings"
+
+ "github.com/golang/glog"
+)
+
+// translationReq is a request passed to the translationWorker.
+type translationReq struct {
+ t *Tuple4
+ res chan *translationResp
+}
+
+// translationResp is a response from the translationWorker, sent over the res
+// channel in a translationReq.
+type translationResp struct {
+ localIP net.IP
+ localPort uint16
+}
+
+// reply sends a reply to the given translationReq based on a conntrackEntry,
+// sending nil if the entry is nil.
+func (r *translationReq) reply(e *conntrackEntry) {
+ if e == nil {
+ r.res <- nil
+ return
+ }
+ localPort, err := strconv.ParseUint(e.request["sport"], 10, 16)
+ if err != nil {
+ r.res <- nil
+ return
+ }
+ r.res <- &translationResp{
+ localIP: net.ParseIP(e.request["src"]),
+ localPort: uint16(localPort),
+ }
+}
+
+// translate performs a translationReq/translationResp exchange under a context
+// that can be used to time out the query.
+func (r *Resolver) translate(ctx context.Context, t *Tuple4) (*translationResp, error) {
+ resC := make(chan *translationResp, 1)
+ r.translationC <- &translationReq{
+ t: t,
+ res: resC,
+ }
+
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case res := <-resC:
+ return res, nil
+ }
+}
+
+// conntrackEntry is an entry parsed from /proc/net/nf_conntrack. The format is
+// not well documented, and the best resource I could find is:
+// https://stackoverflow.com/questions/16034698/details-of-proc-net-ip-conntrack-and-proc-net-nf-conntrack
+type conntrackEntry struct {
+ // networkProtocol is currently always "ipv4".
+ networkProtocol string
+ // transmissionProtocol is currently "tcp" or "udp".
+ transmissionProtocol string
+ invalidateTimeout int64
+
+ state string
+
+ // request key-value pairs. For NAT, these are entries relating to the
+ // connection as seen as the 'inside' of the NAT, eg. the pod-originated
+ // connection.
+ request map[string]string
+ // response key-value parirs. For NAT, these are entries relating to the
+ // connection as seen by the 'outside' of the NAT, eg. the internet.
+ response map[string]string
+ tags map[string]bool
+}
+
+// conntrackParseEntry parses a line from /proc/net/nf_conntrack into a conntrackEntry.
+func conntrackParseEntry(line string) (*conntrackEntry, error) {
+ entry := conntrackEntry{
+ request: make(map[string]string),
+ response: make(map[string]string),
+ tags: make(map[string]bool),
+ }
+
+ fields := strings.Fields(line)
+ if len(fields) < 5 {
+ // This should never happen unless the file format drastically
+ // changed. Don't bother to parse the rest, error early, and let
+ // someone debug this.
+ return nil, fmt.Errorf("invalid field count: %v", fields)
+ }
+ switch fields[0] {
+ case "ipv4":
+ if fields[1] != "2" {
+ return nil, fmt.Errorf("ipv4 with proto number %q, wanted 2", fields[1])
+ }
+ // TODO(q3k): support IPv6 when we get it on prod.
+ default:
+ return nil, nil
+ }
+ entry.networkProtocol = fields[0]
+
+ rest := fields[5:]
+ switch fields[2] {
+ case "tcp":
+ if fields[3] != "6" {
+ return nil, fmt.Errorf("tcp with proto number %q, wanted 6", fields[3])
+ }
+ if len(fields) < 6 {
+ return nil, fmt.Errorf("tcp with missing state field")
+ }
+ entry.state = fields[5]
+ rest = fields[6:]
+ case "udp":
+ if fields[3] != "17" {
+ return nil, fmt.Errorf("udp with proto number %q, wanted 17", fields[3])
+ }
+ default:
+ return nil, nil
+ }
+ entry.transmissionProtocol = fields[2]
+
+ invalidateTimeout, err := strconv.ParseInt(fields[4], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("unparseable timeout %q", fields[4])
+ }
+ entry.invalidateTimeout = invalidateTimeout
+
+ for _, el := range rest {
+ parts := strings.Split(el, "=")
+ switch len(parts) {
+ case 1:
+ // This is a tag.
+ tag := parts[0]
+ // Ensure the tag starts and ends with [] (eg. [ASSURED].
+ if !strings.HasPrefix(tag, "[") || !strings.HasSuffix(tag, "]") {
+ continue
+ }
+ // Strip [ and ].
+ tag = tag[1:]
+ tag = tag[:len(tag)-1]
+ if _, ok := entry.tags[tag]; ok {
+ return nil, fmt.Errorf("repeated tag %q", tag)
+ }
+ entry.tags[tag] = true
+ case 2:
+ // This is a k/v field.
+ k := parts[0]
+ v := parts[1]
+ if _, ok := entry.request[k]; ok {
+ if _, ok := entry.response[k]; ok {
+ return nil, fmt.Errorf("field %q encountered more than twice", k)
+ } else {
+ entry.response[k] = v
+ }
+ } else {
+ entry.request[k] = v
+ }
+ default:
+ return nil, fmt.Errorf("unparseable column %q", el)
+ }
+ }
+ return &entry, nil
+
+}
+
+// conntrackParse parses the contents of a /proc/net/nf_conntrack file into
+// multiple entries. If the majority of the entries could not be parsed, an
+// error is returned.
+func conntrackParse(data []byte) ([]conntrackEntry, error) {
+ buf := bytes.NewBuffer(data)
+ scanner := bufio.NewScanner(buf)
+ var res []conntrackEntry
+ var errors []error
+ for scanner.Scan() {
+ line := strings.TrimSpace(scanner.Text())
+ if line == "" {
+ continue
+ }
+
+ entry, err := conntrackParseEntry(line)
+ if err != nil {
+ glog.Errorf("Error while parsing %q: %v", line, err)
+ errors = append(errors, err)
+ } else if entry != nil {
+ res = append(res, *entry)
+ }
+ }
+
+ if len(errors) == 0 || len(errors) < len(res) {
+ return res, nil
+ } else {
+ return nil, fmt.Errorf("encountered too many errors during conntrack parse, check logs; first error: %w", errors[0])
+ }
+}
+
+// contrackIndex is an index into a list of conntrackEntries. It allows lookup
+// by request/response k/v pairs.
+type conntrackIndex struct {
+ entries []conntrackEntry
+ // byRequest is a map from key to value to list of indixes into entries.
+ byRequest map[string]map[string][]int
+ // byResponse is a map from key to value to list of indixes into entries.
+ byResponse map[string]map[string][]int
+}
+
+// buildIndex builds a conntrackIndex from a list of conntrackEntries.
+func buildIndex(entries []conntrackEntry) *conntrackIndex {
+ ix := conntrackIndex{
+ entries: entries,
+ byRequest: make(map[string]map[string][]int),
+ byResponse: make(map[string]map[string][]int),
+ }
+ for i, entry := range ix.entries {
+ for k, v := range entry.request {
+ if _, ok := ix.byRequest[k]; !ok {
+ ix.byRequest[k] = make(map[string][]int)
+ }
+ ix.byRequest[k][v] = append(ix.byRequest[k][v], i)
+ }
+ for k, v := range entry.response {
+ if _, ok := ix.byResponse[k]; !ok {
+ ix.byResponse[k] = make(map[string][]int)
+ }
+ ix.byResponse[k][v] = append(ix.byResponse[k][v], i)
+ }
+ }
+ return &ix
+}
+
+// getByRequest returns conntrackEntries that match a given k/v pair in their
+// request fields.
+func (c *conntrackIndex) getByRequest(k, v string) []*conntrackEntry {
+ m, ok := c.byRequest[k]
+ if !ok {
+ return nil
+ }
+ ixes, ok := m[v]
+ if !ok {
+ return nil
+ }
+ res := make([]*conntrackEntry, len(ixes))
+ for i, ix := range ixes {
+ res[i] = &c.entries[ix]
+ }
+ return res
+}
+
+// getByResponse returns conntrackEntries that match a given k/v pair in their
+// response fields.
+func (c *conntrackIndex) getByResponse(k, v string) []*conntrackEntry {
+ m, ok := c.byResponse[k]
+ if !ok {
+ return nil
+ }
+ ixes, ok := m[v]
+ if !ok {
+ return nil
+ }
+ res := make([]*conntrackEntry, len(ixes))
+ for i, ix := range ixes {
+ res[i] = &c.entries[ix]
+ }
+ return res
+}
+
+// find returns a conntrackEntry corresponding to a TCP connection defined on
+// the 'outside' of the NAT by a 4-tuple, or nil if no such connection is
+// found.
+func (c *conntrackIndex) find(t *Tuple4) *conntrackEntry {
+ // TODO(q3k): support IPv6
+ if t.RemoteIP.To4() == nil || t.LocalIP.To4() == nil {
+ return nil
+ }
+ entries := c.getByResponse("src", t.RemoteIP.String())
+ for _, entry := range entries {
+ if entry.transmissionProtocol != "tcp" {
+ continue
+ }
+ if entry.response["sport"] != fmt.Sprintf("%d", t.RemotePort) {
+ continue
+ }
+ if entry.response["dst"] != t.LocalIP.String() {
+ continue
+ }
+ if entry.response["dport"] != fmt.Sprintf("%d", t.LocalPort) {
+ continue
+ }
+ return entry
+ }
+ return nil
+}
+
+// runTranslationWorker runs the conntrack 'translation worker'. It responds to
+// requests over translationC until ctx is canceled.
+func (r *Resolver) runTranslationWorker(ctx context.Context) {
+ var ix *conntrackIndex
+ readConntrack := func() {
+ var entries []conntrackEntry
+ data, err := ioutil.ReadFile(r.conntrackPath)
+ if err != nil {
+ glog.Errorf("Failed to read conntrack file: %v", err)
+ } else {
+ entries, err = conntrackParse(data)
+ if err != nil {
+ glog.Errorf("failed to parse conntrack entries: %v", err)
+ }
+ }
+ ix = buildIndex(entries)
+ }
+ readConntrack()
+
+ for {
+ select {
+ case req := <-r.translationC:
+ entry := ix.find(req.t)
+ if entry != nil {
+ req.reply(entry)
+ } else {
+ readConntrack()
+ entry = ix.find(req.t)
+ if entry != nil {
+ req.reply(entry)
+ } else {
+ req.reply(nil)
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
diff --git a/cluster/identd/kubenat/translation_test.go b/cluster/identd/kubenat/translation_test.go
new file mode 100644
index 0000000..353d291
--- /dev/null
+++ b/cluster/identd/kubenat/translation_test.go
@@ -0,0 +1,130 @@
+package kubenat
+
+import (
+ "context"
+ "flag"
+ "io/ioutil"
+ "net"
+ "os"
+ "testing"
+
+ "github.com/go-test/deep"
+)
+
+// testConntrack is the anonymized content of a production host.
+// The first entry is an appservice-irc connection from a pod to an IRC server.
+// The second connection is an UDP connection between two pods.
+// The third to last entry is not a NAT entry, but an incoming external
+// connection.
+// The fourth connection has a mangled/incomplete entry.
+const testConntrack = `
+ipv4 2 tcp 6 86384 ESTABLISHED src=10.10.26.23 dst=192.0.2.180 sport=51336 dport=6697 src=192.0.2.180 dst=185.236.240.36 sport=6697 dport=28706 [ASSURED] mark=0 zone=0 use=2
+ipv4 2 udp 17 35 src=10.10.24.162 dst=10.10.26.108 sport=49347 dport=53 src=10.10.26.108 dst=10.10.24.162 sport=53 dport=49347 [ASSURED] mark=0 zone=0 use=2
+ipv4 2 tcp 6 2 SYN_SENT src=198.51.100.67 dst=185.236.240.56 sport=51053 dport=3359 [UNREPLIED] src=185.236.240.56 dst=198.51.100.67 sport=3359 dport=51053 mark=0 zone=0 use=2
+ipv4 2 tcp 6 2
+`
+
+// TestConntrackParse exercises the conntrack parser for all entries in testConntrack.
+func TestConntrackParse(t *testing.T) {
+ // Last line is truncated and should be ignored.
+ got, err := conntrackParse([]byte(testConntrack))
+ if err != nil {
+ t.Fatalf("conntrackParse: %v", err)
+ }
+ want := []conntrackEntry{
+ {
+ "ipv4", "tcp", 86384, "ESTABLISHED",
+ map[string]string{
+ "src": "10.10.26.23", "dst": "192.0.2.180", "sport": "57640", "dport": "6697",
+ "mark": "0", "zone": "0", "use": "2",
+ },
+ map[string]string{
+ "src": "192.0.2.180", "dst": "185.236.240.36", "sport": "6697", "dport": "28706",
+ },
+ map[string]bool{
+ "ASSURED": true,
+ },
+ },
+ {
+ "ipv4", "udp", 35, "",
+ map[string]string{
+ "src": "10.10.24.162", "dst": "10.10.26.108", "sport": "49347", "dport": "53",
+ "mark": "0", "zone": "0", "use": "2",
+ },
+ map[string]string{
+ "src": "10.10.26.108", "dst": "10.10.24.162", "sport": "53", "dport": "49347",
+ },
+ map[string]bool{
+ "ASSURED": true,
+ },
+ },
+ {
+ "ipv4", "tcp", 2, "SYN_SENT",
+ map[string]string{
+ "src": "198.51.100.67", "dst": "185.236.240.56", "sport": "51053", "dport": "3359",
+ "mark": "0", "zone": "0", "use": "2",
+ },
+ map[string]string{
+ "src": "185.236.240.56", "dst": "198.51.100.67", "sport": "3359", "dport": "51053",
+ },
+ map[string]bool{
+ "UNREPLIED": true,
+ },
+ },
+ }
+ if diff := deep.Equal(want, got); diff != nil {
+ t.Error(diff)
+ }
+
+ ix := buildIndex(got)
+ if want, got := 0, len(ix.getByRequest("src", "1.2.3.4")); want != got {
+ t.Errorf("by request, src, 1.2.3.4 should have returned %d result, wanted %d", want, got)
+ }
+ if want, got := 1, len(ix.getByRequest("src", "10.10.26.23")); want != got {
+ t.Errorf("by request, src, 1.2.3.4 should have returned %d result, wanted %d", want, got)
+ }
+ if want, got := "10.10.26.23", ix.getByRequest("src", "10.10.26.23")[0].request["src"]; want != got {
+ t.Errorf("by request, wanted src %q, got %q", want, got)
+ }
+ if want, got := 3, len(ix.getByRequest("mark", "0")); want != got {
+ t.Errorf("by request, mark, 0 should have returned %d result, wanted %d", want, got)
+ }
+}
+
+// TestTranslationWorker exercises a translation worker with a
+// testConntrack-backed conntrack file.
+func TestTranslationWorker(t *testing.T) {
+ flag.Set("logtostderr", "true")
+ tmpfile, err := ioutil.TempFile("", "conntack")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.Remove(tmpfile.Name())
+ if _, err := tmpfile.Write([]byte(testConntrack)); err != nil {
+ t.Fatal(err)
+ }
+ r := &Resolver{
+ conntrackPath: tmpfile.Name(),
+ translationC: make(chan *translationReq),
+ }
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ go r.runTranslationWorker(ctx)
+
+ res, err := r.translate(ctx, &Tuple4{
+ RemoteIP: net.ParseIP("192.0.2.180"),
+ RemotePort: 6697,
+ LocalIP: net.ParseIP("185.236.240.36"),
+ LocalPort: 28706,
+ })
+ if err != nil {
+ t.Fatalf("translate: %v", err)
+ }
+ if want, got := net.ParseIP("10.10.26.23"), res.localIP; !want.Equal(got) {
+ t.Errorf("local ip: wanted %v, got %v", want, got)
+ }
+ if want, got := uint16(51336), res.localPort; want != got {
+ t.Errorf("local port: wanted %d, got %d", want, got)
+ }
+}