cluster/identd/kubenat: implement

This is a library to find pod information for a given TCP 4-tuple.

Change-Id: I254983e579e3aaa04c0c5491851f4af94a3f4249
diff --git a/cluster/identd/kubenat/pods.go b/cluster/identd/kubenat/pods.go
new file mode 100644
index 0000000..eab0e22
--- /dev/null
+++ b/cluster/identd/kubenat/pods.go
@@ -0,0 +1,176 @@
+package kubenat
+
+import (
+	"context"
+	"fmt"
+	"net"
+
+	"github.com/golang/glog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+
+	"code.hackerspace.pl/hscloud/cluster/identd/cri"
+)
+
+// podInfoReq is a request passed to the podWorker.
+type podInfoReq struct {
+	local net.IP
+	res   chan *podInfoResp
+}
+
+// podInfoResp is a response from a podWorker, sent over the res channel in a
+// podInfoReq.
+type podInfoResp struct {
+	name      string
+	namespace string
+}
+
+// reply sends a reply to the given podInfoReq based on a CRI PodSandboxStatus,
+// sending nil if the status is nil.
+func (r *podInfoReq) reply(s *cri.PodSandboxStatus) {
+	if s == nil {
+		r.res <- nil
+		return
+	}
+	r.res <- &podInfoResp{
+		name:      s.Metadata.Name,
+		namespace: s.Metadata.Namespace,
+	}
+}
+
+// getPodInfo performs a podInfoReq/podInfoResp exchange under a context that
+// can be used to time out the query.
+func (r *Resolver) getPodInfo(ctx context.Context, local net.IP) (*podInfoResp, error) {
+	resC := make(chan *podInfoResp, 1)
+	r.podInfoC <- &podInfoReq{
+		local: local,
+		res:   resC,
+	}
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case res := <-resC:
+		return res, nil
+	}
+}
+
+// podStatus is a cache of data retrieved from CRI.
+type podStatus struct {
+	// info is a map from pod sandbox ID to PodSandboxStatus as retrieved from
+	// CRI.
+	info map[string]*cri.PodSandboxStatus
+	// byIP is a map from pod IP (as string) to pod sandbox ID.
+	byIP map[string]string
+}
+
+// update performs an update of the podStatus from CRI. It only retrieves
+// information about pods that it doesn't yet have, and ensures that pods which
+// do not exist in CRI are also removed from podStatus.
+// TODO(q3k): make sure we don't cache PodSandboxStatus too early, eg. when
+// it's not yet fully running?
+func (p *podStatus) update(ctx context.Context, client cri.RuntimeServiceClient) error {
+	res, err := client.ListPodSandbox(ctx, &cri.ListPodSandboxRequest{})
+	if err != nil {
+		return fmt.Errorf("ListPodSandbox: %w", err)
+	}
+
+	// set of all pod sandbox IDs in CRI.
+	want := make(map[string]bool)
+	// set of pod sandbox IDs in CRI that are not in podStatus.
+	missing := make(map[string]bool)
+	for _, item := range res.Items {
+		want[item.Id] = true
+		if _, ok := p.info[item.Id]; ok {
+			continue
+		}
+		missing[item.Id] = true
+	}
+
+	// Get information about missing pod IDs into podStatus.
+	for id, _ := range missing {
+		res, err := client.PodSandboxStatus(ctx, &cri.PodSandboxStatusRequest{
+			PodSandboxId: id,
+		})
+		if err != nil {
+			if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+				continue
+			} else {
+				return fmt.Errorf("while getting sandbox %s: %v", id, err)
+			}
+		}
+		p.info[id] = res.Status
+	}
+
+	// byIP is fully repopulated on each update.
+	p.byIP = make(map[string]string)
+
+	// remove is the set of pods sandbox IDs that should be removed from podStatus.
+	remove := make(map[string]bool)
+	// Populate remove and p.byId in a single pass.
+	for id, info := range p.info {
+		if _, ok := want[id]; !ok {
+			remove[id] = true
+			continue
+		}
+		if info.Network == nil {
+			continue
+		}
+		if info.Network.Ip == "" {
+			continue
+		}
+		p.byIP[info.Network.Ip] = id
+	}
+	// Remove stale pod sandbox IDs from podStatus.
+	for id, _ := range remove {
+		delete(p.info, id)
+	}
+	return nil
+}
+
+// findByPodID returns a PodSandboxStatus for the pod running under a given pod
+// IP address, or nil if not found.
+func (p *podStatus) findByPodIP(ip net.IP) *cri.PodSandboxStatus {
+	id, ok := p.byIP[ip.String()]
+	if !ok {
+		return nil
+	}
+	return p.info[id]
+}
+
+// runPodWorker runs the CRI cache 'pod worker'. It responds to requests over
+// podInfoC until ctx is canceled.
+func (r *Resolver) runPodWorker(ctx context.Context) error {
+	conn, err := grpc.Dial(fmt.Sprintf("unix://%s", r.criPath), grpc.WithInsecure())
+	if err != nil {
+		return fmt.Errorf("Dial: %w", err)
+	}
+	defer conn.Close()
+	client := cri.NewRuntimeServiceClient(conn)
+
+	ps := &podStatus{
+		info: make(map[string]*cri.PodSandboxStatus),
+	}
+	if err := ps.update(ctx, client); err != nil {
+		return fmt.Errorf("initial pod update: %w", err)
+	}
+
+	for {
+		select {
+		case req := <-r.podInfoC:
+			info := ps.findByPodIP(req.local)
+			if info != nil {
+				req.reply(info)
+				continue
+			}
+			err := ps.update(ctx, client)
+			if err != nil {
+				glog.Errorf("Updating pods failed: %v", err)
+				continue
+			}
+			req.reply(ps.findByPodIP(req.local))
+		case <-ctx.Done():
+			return ctx.Err()
+		}
+	}
+}