| package kubenat |
| |
| import ( |
| "context" |
| "fmt" |
| "net" |
| |
| "github.com/golang/glog" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| |
| "code.hackerspace.pl/hscloud/cluster/identd/cri" |
| ) |
| |
| // podInfoReq is a request passed to the podWorker. |
| type podInfoReq struct { |
| local net.IP |
| res chan *podInfoResp |
| } |
| |
| // podInfoResp is a response from a podWorker, sent over the res channel in a |
| // podInfoReq. |
| type podInfoResp struct { |
| name string |
| namespace string |
| } |
| |
| // reply sends a reply to the given podInfoReq based on a CRI PodSandboxStatus, |
| // sending nil if the status is nil. |
| func (r *podInfoReq) reply(s *cri.PodSandboxStatus) { |
| if s == nil { |
| r.res <- nil |
| return |
| } |
| r.res <- &podInfoResp{ |
| name: s.Metadata.Name, |
| namespace: s.Metadata.Namespace, |
| } |
| } |
| |
| // getPodInfo performs a podInfoReq/podInfoResp exchange under a context that |
| // can be used to time out the query. |
| func (r *Resolver) getPodInfo(ctx context.Context, local net.IP) (*podInfoResp, error) { |
| resC := make(chan *podInfoResp, 1) |
| r.podInfoC <- &podInfoReq{ |
| local: local, |
| res: resC, |
| } |
| select { |
| case <-ctx.Done(): |
| return nil, ctx.Err() |
| case res := <-resC: |
| return res, nil |
| } |
| } |
| |
| // podStatus is a cache of data retrieved from CRI. |
| type podStatus struct { |
| // info is a map from pod sandbox ID to PodSandboxStatus as retrieved from |
| // CRI. |
| info map[string]*cri.PodSandboxStatus |
| // byIP is a map from pod IP (as string) to pod sandbox ID. |
| byIP map[string]string |
| } |
| |
| // update performs an update of the podStatus from CRI. It only retrieves |
| // information about pods that it doesn't yet have, and ensures that pods which |
| // do not exist in CRI are also removed from podStatus. |
| // TODO(q3k): make sure we don't cache PodSandboxStatus too early, eg. when |
| // it's not yet fully running? |
| func (p *podStatus) update(ctx context.Context, client cri.RuntimeServiceClient) error { |
| res, err := client.ListPodSandbox(ctx, &cri.ListPodSandboxRequest{}) |
| if err != nil { |
| return fmt.Errorf("ListPodSandbox: %w", err) |
| } |
| |
| // set of all pod sandbox IDs in CRI. |
| want := make(map[string]bool) |
| // set of pod sandbox IDs in CRI that are not in podStatus. |
| missing := make(map[string]bool) |
| for _, item := range res.Items { |
| want[item.Id] = true |
| if _, ok := p.info[item.Id]; ok { |
| continue |
| } |
| missing[item.Id] = true |
| } |
| |
| // Get information about missing pod IDs into podStatus. |
| for id, _ := range missing { |
| res, err := client.PodSandboxStatus(ctx, &cri.PodSandboxStatusRequest{ |
| PodSandboxId: id, |
| }) |
| if err != nil { |
| if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { |
| continue |
| } else { |
| return fmt.Errorf("while getting sandbox %s: %v", id, err) |
| } |
| } |
| p.info[id] = res.Status |
| } |
| |
| // byIP is fully repopulated on each update. |
| p.byIP = make(map[string]string) |
| |
| // remove is the set of pods sandbox IDs that should be removed from podStatus. |
| remove := make(map[string]bool) |
| // Populate remove and p.byId in a single pass. |
| for id, info := range p.info { |
| if _, ok := want[id]; !ok { |
| remove[id] = true |
| continue |
| } |
| if info.Network == nil { |
| continue |
| } |
| if info.Network.Ip == "" { |
| continue |
| } |
| p.byIP[info.Network.Ip] = id |
| } |
| // Remove stale pod sandbox IDs from podStatus. |
| for id, _ := range remove { |
| delete(p.info, id) |
| } |
| return nil |
| } |
| |
| // findByPodID returns a PodSandboxStatus for the pod running under a given pod |
| // IP address, or nil if not found. |
| func (p *podStatus) findByPodIP(ip net.IP) *cri.PodSandboxStatus { |
| id, ok := p.byIP[ip.String()] |
| if !ok { |
| return nil |
| } |
| return p.info[id] |
| } |
| |
| // runPodWorker runs the CRI cache 'pod worker'. It responds to requests over |
| // podInfoC until ctx is canceled. |
| func (r *Resolver) runPodWorker(ctx context.Context) error { |
| conn, err := grpc.Dial(fmt.Sprintf("unix://%s", r.criPath), grpc.WithInsecure()) |
| if err != nil { |
| return fmt.Errorf("Dial: %w", err) |
| } |
| defer conn.Close() |
| client := cri.NewRuntimeServiceClient(conn) |
| |
| ps := &podStatus{ |
| info: make(map[string]*cri.PodSandboxStatus), |
| } |
| if err := ps.update(ctx, client); err != nil { |
| return fmt.Errorf("initial pod update: %w", err) |
| } |
| |
| for { |
| select { |
| case req := <-r.podInfoC: |
| info := ps.findByPodIP(req.local) |
| if info != nil { |
| req.reply(info) |
| continue |
| } |
| err := ps.update(ctx, client) |
| if err != nil { |
| glog.Errorf("Updating pods failed: %v", err) |
| continue |
| } |
| req.reply(ps.findByPodIP(req.local)) |
| case <-ctx.Done(): |
| return ctx.Err() |
| } |
| } |
| } |