// kubenat implements a data source for undoing NAT on hosts running
// Kubernetes/containerd workloads.
//
// It parses the kernel conntrack NAT translation table to figure out the IP
// address of the pod that was making the connection.
//
// It then uses the containerd API to figure out what pod runs under what IP
// address.
//
// Both conntrack and containerd access is cached and only updated when needed.
// This means that as long as a TCP connection is open, identd will be able to
// respond about its information without having to perform any OS/containerd
// queries.
//
// Unfortunately, there is very little in terms of development/test harnesses
// for kubenat. You will have to have a locally running containerd, or do some
// mounts/forwards from a remote host.
package kubenat

import (
	"context"
	"errors"
	"fmt"
	"net"
	"time"

	"github.com/cenkalti/backoff"
	"github.com/golang/glog"
)

// Resolver is the main interface for kubenat. It runs background processing to
// update conntrack/containerd state, and resolves Tuple4s into PodInfo.
type Resolver struct {
	conntrackPath string
	criPath       string

	translationC chan *translationReq
	podInfoC     chan *podInfoReq
}

// Tuple4 is a 4-tuple of a TCP connection. Local describes the machine running
// this code, not the listen/connect 'ends' of TCP.
type Tuple4 struct {
	RemoteIP   net.IP
	RemotePort uint16
	LocalIP    net.IP
	LocalPort  uint16
}

func (t *Tuple4) String() string {
	local := net.JoinHostPort(t.LocalIP.String(), fmt.Sprintf("%d", t.LocalPort))
	remote := net.JoinHostPort(t.RemoteIP.String(), fmt.Sprintf("%d", t.RemotePort))
	return fmt.Sprintf("L: %s R: %s", local, remote)
}

// PodInfo describes a Kubernetes pod which terminates a given Tuple4 connection.
type PodInfo struct {
	// PodIP is the IP address of the pod within the pod network.
	PodIP net.IP
	// PodTranslatedPort is the port on the PodIP corresponding to the Tuple4
	// that this PodInfo was requested for.
	PodTranslatedPort uint16
	// KubernetesNamespace is the kubernetes namespace in which this pod is
	// running.
	KubernetesNamespace string
	// Name is the name of the pod, as seen by kubernetes.
	Name string
}

// NewResolver startss a resolver with a given path to /paroc/net/nf_conntrack
// and a CRI gRPC domain socket.
func NewResolver(ctx context.Context, conntrackPath, criPath string) (*Resolver, error) {
	r := Resolver{
		conntrackPath: conntrackPath,
		criPath:       criPath,

		translationC: make(chan *translationReq),
		podInfoC:     make(chan *podInfoReq),
	}
	// TODO(q3k): bubble up errors from the translation worker into here?
	go r.runTranslationWorker(ctx)
	// The pod worker might fail on CRI connectivity issues, so we attempt to
	// restart it with a backoff if needed.
	go func() {
		bo := backoff.NewExponentialBackOff()
		bo.MaxElapsedTime = 0
		bo.Reset()
		for {
			err := r.runPodWorker(ctx)
			if err == nil || errors.Is(err, ctx.Err()) {
				glog.Infof("podWorker exiting")
				return
			}
			glog.Errorf("podWorker failed: %v", err)
			wait := bo.NextBackOff()
			glog.Errorf("restarting podWorker in %v", wait)
			time.Sleep(wait)
		}
	}()

	return &r, nil
}

// ResolvePod returns information about a running pod for a given TCP 4-tuple.
// If the 4-tuple or pod cannot be resolved, an error will be returned.
func (r *Resolver) ResolvePod(ctx context.Context, t *Tuple4) (*PodInfo, error) {
	// TODO(q3k): expose translation/pod not found errors as package-level
	// vars, or use gRPC statuses?
	podAddr, err := r.translate(ctx, t)
	if err != nil {
		return nil, fmt.Errorf("translate: %w", err)
	}
	if podAddr == nil {
		return nil, fmt.Errorf("translation not found")
	}
	podInfo, err := r.getPodInfo(ctx, podAddr.localIP)
	if err != nil {
		return nil, fmt.Errorf("getPodInfo: %w", err)
	}
	if podInfo == nil {
		return nil, fmt.Errorf("pod not found")
	}

	return &PodInfo{
		PodIP:               podAddr.localIP,
		PodTranslatedPort:   podAddr.localPort,
		KubernetesNamespace: podInfo.namespace,
		Name:                podInfo.name,
	}, nil
}
