cluster/identd/kubenat: implement
This is a library to find pod information for a given TCP 4-tuple.
Change-Id: I254983e579e3aaa04c0c5491851f4af94a3f4249
diff --git a/cluster/identd/kubenat/pods.go b/cluster/identd/kubenat/pods.go
new file mode 100644
index 0000000..eab0e22
--- /dev/null
+++ b/cluster/identd/kubenat/pods.go
@@ -0,0 +1,176 @@
+package kubenat
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ "github.com/golang/glog"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "code.hackerspace.pl/hscloud/cluster/identd/cri"
+)
+
+// podInfoReq is a request passed to the podWorker.
+type podInfoReq struct {
+ local net.IP
+ res chan *podInfoResp
+}
+
+// podInfoResp is a response from a podWorker, sent over the res channel in a
+// podInfoReq.
+type podInfoResp struct {
+ name string
+ namespace string
+}
+
+// reply sends a reply to the given podInfoReq based on a CRI PodSandboxStatus,
+// sending nil if the status is nil.
+func (r *podInfoReq) reply(s *cri.PodSandboxStatus) {
+ if s == nil {
+ r.res <- nil
+ return
+ }
+ r.res <- &podInfoResp{
+ name: s.Metadata.Name,
+ namespace: s.Metadata.Namespace,
+ }
+}
+
+// getPodInfo performs a podInfoReq/podInfoResp exchange under a context that
+// can be used to time out the query.
+func (r *Resolver) getPodInfo(ctx context.Context, local net.IP) (*podInfoResp, error) {
+ resC := make(chan *podInfoResp, 1)
+ r.podInfoC <- &podInfoReq{
+ local: local,
+ res: resC,
+ }
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case res := <-resC:
+ return res, nil
+ }
+}
+
+// podStatus is a cache of data retrieved from CRI.
+type podStatus struct {
+ // info is a map from pod sandbox ID to PodSandboxStatus as retrieved from
+ // CRI.
+ info map[string]*cri.PodSandboxStatus
+ // byIP is a map from pod IP (as string) to pod sandbox ID.
+ byIP map[string]string
+}
+
+// update performs an update of the podStatus from CRI. It only retrieves
+// information about pods that it doesn't yet have, and ensures that pods which
+// do not exist in CRI are also removed from podStatus.
+// TODO(q3k): make sure we don't cache PodSandboxStatus too early, eg. when
+// it's not yet fully running?
+func (p *podStatus) update(ctx context.Context, client cri.RuntimeServiceClient) error {
+ res, err := client.ListPodSandbox(ctx, &cri.ListPodSandboxRequest{})
+ if err != nil {
+ return fmt.Errorf("ListPodSandbox: %w", err)
+ }
+
+ // set of all pod sandbox IDs in CRI.
+ want := make(map[string]bool)
+ // set of pod sandbox IDs in CRI that are not in podStatus.
+ missing := make(map[string]bool)
+ for _, item := range res.Items {
+ want[item.Id] = true
+ if _, ok := p.info[item.Id]; ok {
+ continue
+ }
+ missing[item.Id] = true
+ }
+
+ // Get information about missing pod IDs into podStatus.
+ for id, _ := range missing {
+ res, err := client.PodSandboxStatus(ctx, &cri.PodSandboxStatusRequest{
+ PodSandboxId: id,
+ })
+ if err != nil {
+ if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+ continue
+ } else {
+ return fmt.Errorf("while getting sandbox %s: %v", id, err)
+ }
+ }
+ p.info[id] = res.Status
+ }
+
+ // byIP is fully repopulated on each update.
+ p.byIP = make(map[string]string)
+
+ // remove is the set of pods sandbox IDs that should be removed from podStatus.
+ remove := make(map[string]bool)
+ // Populate remove and p.byId in a single pass.
+ for id, info := range p.info {
+ if _, ok := want[id]; !ok {
+ remove[id] = true
+ continue
+ }
+ if info.Network == nil {
+ continue
+ }
+ if info.Network.Ip == "" {
+ continue
+ }
+ p.byIP[info.Network.Ip] = id
+ }
+ // Remove stale pod sandbox IDs from podStatus.
+ for id, _ := range remove {
+ delete(p.info, id)
+ }
+ return nil
+}
+
+// findByPodID returns a PodSandboxStatus for the pod running under a given pod
+// IP address, or nil if not found.
+func (p *podStatus) findByPodIP(ip net.IP) *cri.PodSandboxStatus {
+ id, ok := p.byIP[ip.String()]
+ if !ok {
+ return nil
+ }
+ return p.info[id]
+}
+
+// runPodWorker runs the CRI cache 'pod worker'. It responds to requests over
+// podInfoC until ctx is canceled.
+func (r *Resolver) runPodWorker(ctx context.Context) error {
+ conn, err := grpc.Dial(fmt.Sprintf("unix://%s", r.criPath), grpc.WithInsecure())
+ if err != nil {
+ return fmt.Errorf("Dial: %w", err)
+ }
+ defer conn.Close()
+ client := cri.NewRuntimeServiceClient(conn)
+
+ ps := &podStatus{
+ info: make(map[string]*cri.PodSandboxStatus),
+ }
+ if err := ps.update(ctx, client); err != nil {
+ return fmt.Errorf("initial pod update: %w", err)
+ }
+
+ for {
+ select {
+ case req := <-r.podInfoC:
+ info := ps.findByPodIP(req.local)
+ if info != nil {
+ req.reply(info)
+ continue
+ }
+ err := ps.update(ctx, client)
+ if err != nil {
+ glog.Errorf("Updating pods failed: %v", err)
+ continue
+ }
+ req.reply(ps.findByPodIP(req.local))
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+}