blob: eab0e22a3c5b8ca4eb0bf1e090710eed72f569a6 [file] [log] [blame]
package kubenat
import (
"context"
"fmt"
"net"
"github.com/golang/glog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"code.hackerspace.pl/hscloud/cluster/identd/cri"
)
// podInfoReq is a request passed to the podWorker.
type podInfoReq struct {
local net.IP
res chan *podInfoResp
}
// podInfoResp is a response from a podWorker, sent over the res channel in a
// podInfoReq.
type podInfoResp struct {
name string
namespace string
}
// reply sends a reply to the given podInfoReq based on a CRI PodSandboxStatus,
// sending nil if the status is nil.
func (r *podInfoReq) reply(s *cri.PodSandboxStatus) {
if s == nil {
r.res <- nil
return
}
r.res <- &podInfoResp{
name: s.Metadata.Name,
namespace: s.Metadata.Namespace,
}
}
// getPodInfo performs a podInfoReq/podInfoResp exchange under a context that
// can be used to time out the query.
func (r *Resolver) getPodInfo(ctx context.Context, local net.IP) (*podInfoResp, error) {
resC := make(chan *podInfoResp, 1)
r.podInfoC <- &podInfoReq{
local: local,
res: resC,
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case res := <-resC:
return res, nil
}
}
// podStatus is a cache of data retrieved from CRI.
type podStatus struct {
// info is a map from pod sandbox ID to PodSandboxStatus as retrieved from
// CRI.
info map[string]*cri.PodSandboxStatus
// byIP is a map from pod IP (as string) to pod sandbox ID.
byIP map[string]string
}
// update performs an update of the podStatus from CRI. It only retrieves
// information about pods that it doesn't yet have, and ensures that pods which
// do not exist in CRI are also removed from podStatus.
// TODO(q3k): make sure we don't cache PodSandboxStatus too early, eg. when
// it's not yet fully running?
func (p *podStatus) update(ctx context.Context, client cri.RuntimeServiceClient) error {
res, err := client.ListPodSandbox(ctx, &cri.ListPodSandboxRequest{})
if err != nil {
return fmt.Errorf("ListPodSandbox: %w", err)
}
// set of all pod sandbox IDs in CRI.
want := make(map[string]bool)
// set of pod sandbox IDs in CRI that are not in podStatus.
missing := make(map[string]bool)
for _, item := range res.Items {
want[item.Id] = true
if _, ok := p.info[item.Id]; ok {
continue
}
missing[item.Id] = true
}
// Get information about missing pod IDs into podStatus.
for id, _ := range missing {
res, err := client.PodSandboxStatus(ctx, &cri.PodSandboxStatusRequest{
PodSandboxId: id,
})
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
continue
} else {
return fmt.Errorf("while getting sandbox %s: %v", id, err)
}
}
p.info[id] = res.Status
}
// byIP is fully repopulated on each update.
p.byIP = make(map[string]string)
// remove is the set of pods sandbox IDs that should be removed from podStatus.
remove := make(map[string]bool)
// Populate remove and p.byId in a single pass.
for id, info := range p.info {
if _, ok := want[id]; !ok {
remove[id] = true
continue
}
if info.Network == nil {
continue
}
if info.Network.Ip == "" {
continue
}
p.byIP[info.Network.Ip] = id
}
// Remove stale pod sandbox IDs from podStatus.
for id, _ := range remove {
delete(p.info, id)
}
return nil
}
// findByPodID returns a PodSandboxStatus for the pod running under a given pod
// IP address, or nil if not found.
func (p *podStatus) findByPodIP(ip net.IP) *cri.PodSandboxStatus {
id, ok := p.byIP[ip.String()]
if !ok {
return nil
}
return p.info[id]
}
// runPodWorker runs the CRI cache 'pod worker'. It responds to requests over
// podInfoC until ctx is canceled.
func (r *Resolver) runPodWorker(ctx context.Context) error {
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", r.criPath), grpc.WithInsecure())
if err != nil {
return fmt.Errorf("Dial: %w", err)
}
defer conn.Close()
client := cri.NewRuntimeServiceClient(conn)
ps := &podStatus{
info: make(map[string]*cri.PodSandboxStatus),
}
if err := ps.update(ctx, client); err != nil {
return fmt.Errorf("initial pod update: %w", err)
}
for {
select {
case req := <-r.podInfoC:
info := ps.findByPodIP(req.local)
if info != nil {
req.reply(info)
continue
}
err := ps.update(ctx, client)
if err != nil {
glog.Errorf("Updating pods failed: %v", err)
continue
}
req.reply(ps.findByPodIP(req.local))
case <-ctx.Done():
return ctx.Err()
}
}
}