cluster/identd: implement
This implements the main identd service that will run on our production
hosts. It's comparatively small, as most of the functionality is
implemented in //cluster/identd/ident and //cluster/identd/kubenat.
Change-Id: I1861fe7c93d105faa19a2bafbe9c85fe36502f73
diff --git a/cluster/identd/main.go b/cluster/identd/main.go
new file mode 100644
index 0000000..b58f442
--- /dev/null
+++ b/cluster/identd/main.go
@@ -0,0 +1,195 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "net"
+ "os"
+ "os/signal"
+ "strings"
+
+ "github.com/golang/glog"
+ v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ "code.hackerspace.pl/hscloud/cluster/identd/ident"
+ "code.hackerspace.pl/hscloud/cluster/identd/kubenat"
+ "code.hackerspace.pl/hscloud/go/mirko"
+)
+
+func init() {
+ flag.Set("logtostderr", "true")
+}
+
+var (
+ flagIdentdListen = "127.0.0.1:8113"
+ flagContainerdSocket = "/var/run/containerd/containerd.sock"
+ flagConntrackProc = "/proc/net/nf_conntrack"
+ flagPodName = ""
+ flagPodNamespace = ""
+)
+
+func main() {
+ flag.StringVar(&flagIdentdListen, "identd_listen", flagIdentdListen, "Address at which to listen for incoming ident protocol connections")
+ flag.StringVar(&flagContainerdSocket, "identd_containerd_socket", flagContainerdSocket, "Containerd gRPC socket path")
+ flag.StringVar(&flagConntrackProc, "identd_conntrack_proc", flagConntrackProc, "Conntrack procfs file")
+ flag.StringVar(&flagPodName, "identd_pod_name", flagPodName, "Name of this pod, if on k8s. Needed for public IP resolution.")
+ flag.StringVar(&flagPodNamespace, "identd_pod_namespace", flagPodNamespace, "Namespace where this pod is running, if on k8s. Needed for public IP resolution.")
+ flag.Parse()
+
+ ctx, ctxC := context.WithCancel(context.Background())
+
+ resolver, err := kubenat.NewResolver(ctx, flagConntrackProc, flagContainerdSocket)
+ if err != nil {
+ glog.Exitf("Could not start kubenet resolver: %v", err)
+ }
+
+ var localIP net.IP
+
+ localIPStr, _, err := net.SplitHostPort(flagIdentdListen)
+ if err != nil {
+ glog.Warningf("Could not parse identd listen flag %q", flagIdentdListen)
+ } else {
+ localIP = net.ParseIP(localIPStr)
+ if localIP == nil || !localIP.IsGlobalUnicast() {
+ glog.Warningf("Could not parse unicast IP from identd flag %q", localIPStr)
+ localIP = nil
+ }
+ }
+
+ if localIP == nil {
+ glog.Infof("Could not figure out public IP address for identd, attempting to retrieve from k8s...")
+ cs := mirko.KubernetesClient()
+ if cs == nil {
+ glog.Exitf("Not in k8s and identd_listen set to invalid public IP address - exiting.")
+ }
+ if flagPodName == "" {
+ glog.Exitf("identd_pod_name must be set")
+ }
+ if flagPodNamespace == "" {
+ glog.Exitf("identd_pod_namespace must be set")
+ }
+ pod, err := cs.CoreV1().Pods(flagPodNamespace).Get(ctx, flagPodName, v1.GetOptions{})
+ if err != nil {
+ glog.Exitf("Could not find pod %q in namespace %q: %v", flagPodName, flagPodNamespace, err)
+ }
+ ipStr := pod.Status.HostIP
+ if ipStr == "" {
+ glog.Exitf("HostIP in status of pod %q is empty", flagPodName)
+ }
+ glog.Infof("Resolved k8s node IP to %s", ipStr)
+
+ localIP = net.ParseIP(ipStr)
+ if localIP == nil {
+ glog.Exitf("HostIP in status of pod %q is unparseable", flagPodName, ipStr)
+ }
+ }
+
+ glog.Infof("Will respond to identd queries on %s...", localIP)
+ s := &service{
+ resolver: resolver,
+ localIP: localIP,
+ }
+
+ lis, err := net.Listen("tcp", flagIdentdListen)
+ if err != nil {
+ glog.Exitf("Could not listen for identd: %v", err)
+ }
+ isrv := ident.NewServer()
+ isrv.HandleFunc(s.handleIdent)
+ go func() {
+ glog.Infof("Starting identd on %s...", flagIdentdListen)
+ err := isrv.Serve(lis)
+ if err != nil {
+ glog.Exitf("identd Serve: %v", err)
+ }
+ }()
+
+ signalChan := make(chan os.Signal, 1)
+ signal.Notify(signalChan, os.Interrupt)
+ go func() {
+ <-signalChan
+ ctxC()
+ }()
+
+ <-ctx.Done()
+ glog.Infof("Stopping identd...")
+ isrv.Stop()
+ lis.Close()
+}
+
+type service struct {
+ resolver *kubenat.Resolver
+ localIP net.IP
+}
+
+func (s *service) handleIdent(ctx context.Context, w ident.ResponseWriter, r *ident.Request) {
+ clientIPStr, _, err := net.SplitHostPort(r.ClientAddress.String())
+ if err != nil {
+ glog.Errorf("Unparseable ClientAddres %q", r.ClientAddress)
+ w.SendError(ident.UnknownError)
+ return
+ }
+ clientIP := net.ParseIP(clientIPStr)
+ if clientIP == nil {
+ glog.Errorf("Unparseable ClientAddres IP %q", r.ClientAddress)
+ w.SendError(ident.UnknownError)
+ return
+ }
+
+ t4 := kubenat.Tuple4{
+ RemoteIP: clientIP,
+ RemotePort: r.ClientPort,
+ LocalIP: s.localIP,
+ LocalPort: r.ServerPort,
+ }
+ glog.Infof("Running query for %s...", t4.String())
+ info, err := s.resolver.ResolvePod(ctx, &t4)
+ if err != nil {
+ glog.Errorf("ResolvePod(%q): %v", t4.String(), err)
+ w.SendError(ident.NoUser)
+ return
+ }
+
+ ns := info.KubernetesNamespace
+ pod := info.Name
+
+ if ns == "matrix" && strings.HasPrefix(pod, "appservice-irc-") {
+ target := net.JoinHostPort(info.PodIP.String(), "1113")
+ clientPort := r.ClientPort
+ serverPort := info.PodTranslatedPort
+ glog.Infof("Forwarding to appservice-irc at %q, clientPort: %d, serverPort: %d", target, clientPort, serverPort)
+ res, err := ident.Query(ctx, target, clientPort, serverPort)
+ if err != nil {
+ var identErr *ident.IdentError
+ if errors.As(err, &identErr) {
+ glog.Infof("appservice-irc: %s", identErr.Inner)
+ w.SendError(identErr.Inner)
+ } else {
+ glog.Infof("appservice-irc: error: %v", err)
+ w.SendError(ident.UnknownError)
+ }
+ } else {
+ glog.Infof("Response from appservice-irc: %q", res.UserID)
+ w.SendIdent(&ident.IdentResponse{
+ UserID: res.UserID,
+ })
+ }
+ return
+ }
+ // default to kns-*
+ user := fmt.Sprintf("kns-%s", ns)
+ // q3k's old personal namespace.
+ if ns == "q3k" {
+ user = "q3k"
+ }
+ // personal-* namespaces.
+ if strings.HasPrefix(ns, "personal-") {
+ user = strings.TrimPrefix(ns, "personal-")
+ }
+ glog.Infof("Returning %q (from %q) for %q", user, ns, t4.String())
+ w.SendIdent(&ident.IdentResponse{
+ UserID: user,
+ })
+}