blob: eab0e22a3c5b8ca4eb0bf1e090710eed72f569a6 [file] [log] [blame]
Serge Bazanski6b649f82021-05-24 15:09:25 +02001package kubenat
2
3import (
4 "context"
5 "fmt"
6 "net"
7
8 "github.com/golang/glog"
9 "google.golang.org/grpc"
10 "google.golang.org/grpc/codes"
11 "google.golang.org/grpc/status"
12
13 "code.hackerspace.pl/hscloud/cluster/identd/cri"
14)
15
16// podInfoReq is a request passed to the podWorker.
17type podInfoReq struct {
18 local net.IP
19 res chan *podInfoResp
20}
21
22// podInfoResp is a response from a podWorker, sent over the res channel in a
23// podInfoReq.
24type podInfoResp struct {
25 name string
26 namespace string
27}
28
29// reply sends a reply to the given podInfoReq based on a CRI PodSandboxStatus,
30// sending nil if the status is nil.
31func (r *podInfoReq) reply(s *cri.PodSandboxStatus) {
32 if s == nil {
33 r.res <- nil
34 return
35 }
36 r.res <- &podInfoResp{
37 name: s.Metadata.Name,
38 namespace: s.Metadata.Namespace,
39 }
40}
41
42// getPodInfo performs a podInfoReq/podInfoResp exchange under a context that
43// can be used to time out the query.
44func (r *Resolver) getPodInfo(ctx context.Context, local net.IP) (*podInfoResp, error) {
45 resC := make(chan *podInfoResp, 1)
46 r.podInfoC <- &podInfoReq{
47 local: local,
48 res: resC,
49 }
50 select {
51 case <-ctx.Done():
52 return nil, ctx.Err()
53 case res := <-resC:
54 return res, nil
55 }
56}
57
58// podStatus is a cache of data retrieved from CRI.
59type podStatus struct {
60 // info is a map from pod sandbox ID to PodSandboxStatus as retrieved from
61 // CRI.
62 info map[string]*cri.PodSandboxStatus
63 // byIP is a map from pod IP (as string) to pod sandbox ID.
64 byIP map[string]string
65}
66
67// update performs an update of the podStatus from CRI. It only retrieves
68// information about pods that it doesn't yet have, and ensures that pods which
69// do not exist in CRI are also removed from podStatus.
70// TODO(q3k): make sure we don't cache PodSandboxStatus too early, eg. when
71// it's not yet fully running?
72func (p *podStatus) update(ctx context.Context, client cri.RuntimeServiceClient) error {
73 res, err := client.ListPodSandbox(ctx, &cri.ListPodSandboxRequest{})
74 if err != nil {
75 return fmt.Errorf("ListPodSandbox: %w", err)
76 }
77
78 // set of all pod sandbox IDs in CRI.
79 want := make(map[string]bool)
80 // set of pod sandbox IDs in CRI that are not in podStatus.
81 missing := make(map[string]bool)
82 for _, item := range res.Items {
83 want[item.Id] = true
84 if _, ok := p.info[item.Id]; ok {
85 continue
86 }
87 missing[item.Id] = true
88 }
89
90 // Get information about missing pod IDs into podStatus.
91 for id, _ := range missing {
92 res, err := client.PodSandboxStatus(ctx, &cri.PodSandboxStatusRequest{
93 PodSandboxId: id,
94 })
95 if err != nil {
96 if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
97 continue
98 } else {
99 return fmt.Errorf("while getting sandbox %s: %v", id, err)
100 }
101 }
102 p.info[id] = res.Status
103 }
104
105 // byIP is fully repopulated on each update.
106 p.byIP = make(map[string]string)
107
108 // remove is the set of pods sandbox IDs that should be removed from podStatus.
109 remove := make(map[string]bool)
110 // Populate remove and p.byId in a single pass.
111 for id, info := range p.info {
112 if _, ok := want[id]; !ok {
113 remove[id] = true
114 continue
115 }
116 if info.Network == nil {
117 continue
118 }
119 if info.Network.Ip == "" {
120 continue
121 }
122 p.byIP[info.Network.Ip] = id
123 }
124 // Remove stale pod sandbox IDs from podStatus.
125 for id, _ := range remove {
126 delete(p.info, id)
127 }
128 return nil
129}
130
131// findByPodID returns a PodSandboxStatus for the pod running under a given pod
132// IP address, or nil if not found.
133func (p *podStatus) findByPodIP(ip net.IP) *cri.PodSandboxStatus {
134 id, ok := p.byIP[ip.String()]
135 if !ok {
136 return nil
137 }
138 return p.info[id]
139}
140
141// runPodWorker runs the CRI cache 'pod worker'. It responds to requests over
142// podInfoC until ctx is canceled.
143func (r *Resolver) runPodWorker(ctx context.Context) error {
144 conn, err := grpc.Dial(fmt.Sprintf("unix://%s", r.criPath), grpc.WithInsecure())
145 if err != nil {
146 return fmt.Errorf("Dial: %w", err)
147 }
148 defer conn.Close()
149 client := cri.NewRuntimeServiceClient(conn)
150
151 ps := &podStatus{
152 info: make(map[string]*cri.PodSandboxStatus),
153 }
154 if err := ps.update(ctx, client); err != nil {
155 return fmt.Errorf("initial pod update: %w", err)
156 }
157
158 for {
159 select {
160 case req := <-r.podInfoC:
161 info := ps.findByPodIP(req.local)
162 if info != nil {
163 req.reply(info)
164 continue
165 }
166 err := ps.update(ctx, client)
167 if err != nil {
168 glog.Errorf("Updating pods failed: %v", err)
169 continue
170 }
171 req.reply(ps.findByPodIP(req.local))
172 case <-ctx.Done():
173 return ctx.Err()
174 }
175 }
176}