| package mirko |
| |
| import ( |
| "context" |
| "flag" |
| "fmt" |
| "net" |
| "net/http" |
| "net/http/pprof" |
| "os" |
| "os/signal" |
| "sort" |
| "strings" |
| "time" |
| |
| "github.com/golang/glog" |
| "golang.org/x/net/trace" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/reflection" |
| "k8s.io/client-go/kubernetes" |
| |
| "code.hackerspace.pl/hscloud/go/pki" |
| "code.hackerspace.pl/hscloud/go/statusz" |
| ) |
| |
| var ( |
| flagListenAddress string |
| flagDebugAddress string |
| flagDebugAllowAll bool |
| ) |
| |
| func init() { |
| flag.StringVar(&flagListenAddress, "listen_address", "127.0.0.1:4200", "gRPC listen address") |
| flag.StringVar(&flagDebugAddress, "debug_address", "127.0.0.1:4201", "HTTP debug/status listen address") |
| flag.BoolVar(&flagDebugAllowAll, "debug_allow_all", false, "HTTP debug/status available to everyone") |
| flag.Set("logtostderr", "true") |
| } |
| |
| type Mirko struct { |
| grpcListen net.Listener |
| grpcServer *grpc.Server |
| httpListen net.Listener |
| httpServer *http.Server |
| httpMux *http.ServeMux |
| |
| kubernetesCS *kubernetes.Clientset |
| |
| ctx context.Context |
| cancel context.CancelFunc |
| } |
| |
| func New() *Mirko { |
| ctx, cancel := context.WithCancel(context.Background()) |
| return &Mirko{ |
| ctx: ctx, |
| cancel: cancel, |
| } |
| } |
| |
| func authRequest(req *http.Request) (any, sensitive bool) { |
| host, _, err := net.SplitHostPort(req.RemoteAddr) |
| if err != nil { |
| host = req.RemoteAddr |
| } |
| |
| if flagDebugAllowAll { |
| return true, true |
| } |
| |
| switch host { |
| case "localhost", "127.0.0.1", "::1": |
| return true, true |
| default: |
| return false, false |
| } |
| } |
| |
| func (m *Mirko) Listen() error { |
| grpc.EnableTracing = true |
| trace.AuthRequest = authRequest |
| |
| grpcLis, err := net.Listen("tcp", flagListenAddress) |
| if err != nil { |
| return fmt.Errorf("net.Listen: %v", err) |
| } |
| m.grpcListen = grpcLis |
| m.grpcServer = grpc.NewServer(pki.WithServerHSPKI()...) |
| reflection.Register(m.grpcServer) |
| |
| httpLis, err := net.Listen("tcp", flagDebugAddress) |
| if err != nil { |
| return fmt.Errorf("net.Listen: %v", err) |
| } |
| |
| m.httpMux = http.NewServeMux() |
| // Canonical URLs |
| m.httpMux.HandleFunc("/debug/status", func(w http.ResponseWriter, r *http.Request) { |
| any, _ := authRequest(r) |
| if !any { |
| http.Error(w, "not allowed", http.StatusUnauthorized) |
| return |
| } |
| statusz.StatusHandler(w, r) |
| }) |
| m.httpMux.HandleFunc("/debug/requests", trace.Traces) |
| |
| // pprof endpoints |
| m.httpMux.HandleFunc("/debug/pprof/", pprof.Index) |
| m.httpMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) |
| m.httpMux.HandleFunc("/debug/pprof/profile", pprof.Profile) |
| m.httpMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) |
| m.httpMux.HandleFunc("/debug/pprof/trace", pprof.Trace) |
| |
| // -z legacy URLs |
| m.httpMux.HandleFunc("/statusz", func(w http.ResponseWriter, r *http.Request) { |
| http.Redirect(w, r, "/debug/status", http.StatusSeeOther) |
| }) |
| m.httpMux.HandleFunc("/rpcz", func(w http.ResponseWriter, r *http.Request) { |
| http.Redirect(w, r, "/debug/requests", http.StatusSeeOther) |
| }) |
| m.httpMux.HandleFunc("/requestz", func(w http.ResponseWriter, r *http.Request) { |
| http.Redirect(w, r, "/debug/requests", http.StatusSeeOther) |
| }) |
| m.httpMux.HandleFunc("/profilez", func(w http.ResponseWriter, r *http.Request) { |
| http.Redirect(w, r, "/debug/pprof", http.StatusSeeOther) |
| }) |
| |
| // root redirect |
| m.httpMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { |
| http.Redirect(w, r, "/debug/status", http.StatusSeeOther) |
| }) |
| |
| m.kubernetesConnect() |
| |
| debugParts := strings.Split(flagDebugAddress, ":") |
| debugPort := debugParts[len(debugParts)-1] |
| statusz.PublicAddress = fmt.Sprintf("http://%s:%s/", m.Address().String(), debugPort) |
| |
| m.httpListen = httpLis |
| m.httpServer = &http.Server{ |
| Addr: flagDebugAddress, |
| Handler: m.httpMux, |
| } |
| |
| return nil |
| } |
| |
| // Trace logs debug information to either a context trace (if present) |
| // or stderr (if not) |
| func Trace(ctx context.Context, f string, args ...interface{}) { |
| tr, ok := trace.FromContext(ctx) |
| if !ok { |
| fmtd := fmt.Sprintf(f, args...) |
| glog.Warningf("No trace in %v: %s", ctx, fmtd) |
| return |
| } |
| tr.LazyPrintf(f, args...) |
| } |
| |
| // GRPC returns the microservice's grpc.Server object |
| func (m *Mirko) GRPC() *grpc.Server { |
| if m.grpcServer == nil { |
| panic("GRPC() called before Listen()") |
| } |
| return m.grpcServer |
| } |
| |
| // HTTPMux returns the microservice's debug HTTP mux |
| func (m *Mirko) HTTPMux() *http.ServeMux { |
| if m.httpMux == nil { |
| panic("HTTPMux() called before Listen()") |
| } |
| return m.httpMux |
| } |
| |
| // Context returns a background microservice context that will be canceled |
| // when the service is shut down |
| func (m *Mirko) Context() context.Context { |
| return m.ctx |
| } |
| |
| // Done() returns a channel that will emit a value when the service is |
| // shut down. This should be used in the main() function instead of a select{} |
| // call, to allow the background context to be canceled fully. |
| func (m *Mirko) Done() <-chan struct{} { |
| return m.Context().Done() |
| } |
| |
| // Serve starts serving HTTP and gRPC requests |
| func (m *Mirko) Serve() error { |
| errs := make(chan error, 1) |
| go func() { |
| if err := m.grpcServer.Serve(m.grpcListen); err != nil { |
| errs <- err |
| } |
| }() |
| go func() { |
| if err := m.httpServer.Serve(m.httpListen); err != nil { |
| errs <- err |
| } |
| }() |
| |
| signalCh := make(chan os.Signal, 1) |
| signal.Notify(signalCh, os.Interrupt) |
| go func() { |
| select { |
| case <-signalCh: |
| m.cancel() |
| } |
| }() |
| |
| ticker := time.NewTicker(1 * time.Second) |
| select { |
| case <-ticker.C: |
| glog.Infof("gRPC listening on %s", flagListenAddress) |
| glog.Infof("HTTP listening on %s", flagDebugAddress) |
| return nil |
| case err := <-errs: |
| return err |
| } |
| } |
| |
| // Address returns a linkable address where this service is running, sans port. |
| // If running within kubernetes, this will return the pod IP. |
| // Otherwise, this will guess the main, 'external' IP address of the machine it's running on. |
| // On failures, returns loopback address. |
| func (m *Mirko) Address() net.IP { |
| // If we're not running in Kubernetes and binding to 127.0.0.1, return loopback. |
| if m.kubernetesCS == nil && strings.HasPrefix(flagListenAddress, "127.0.0.1:") { |
| return net.ParseIP("127.0.0.1") |
| } |
| |
| ifaces, err := net.Interfaces() |
| if err != nil { |
| glog.Errorf("net.Interface(): %v", err) |
| return net.ParseIP("127.0.0.1") |
| } |
| |
| addrmap := make(map[string]net.IP) |
| |
| for _, iface := range ifaces { |
| addrs, err := iface.Addrs() |
| if err != nil { |
| glog.Errorf("iface(%q).Addrs(): %v", iface.Name, err) |
| continue |
| } |
| |
| for _, addr := range addrs { |
| var ip net.IP |
| switch v := addr.(type) { |
| case *net.IPNet: |
| ip = v.IP |
| case *net.IPAddr: |
| ip = v.IP |
| default: |
| continue |
| } |
| |
| if strings.HasPrefix(ip.String(), "fe80:") { |
| continue |
| } |
| addrmap[iface.Name] = ip |
| } |
| } |
| |
| if m.kubernetesCS != nil { |
| addr, ok := addrmap["eth0"] |
| if !ok { |
| glog.Errorf("Running on Kubernetes but no eth0! Available interfaces: %v", addrmap) |
| return net.ParseIP("127.0.0.1") |
| } |
| |
| return addr |
| } |
| |
| if len(addrmap) == 0 { |
| glog.Errorf("No interfaces found!") |
| return net.ParseIP("127.0.0.1") |
| } |
| |
| // Heuristics ahoy! |
| prioritized := []*ifaceWithPriority{} |
| for iface, addr := range addrmap { |
| prio := &ifaceWithPriority{ |
| iface: iface, |
| addr: addr, |
| } |
| switch { |
| case strings.HasPrefix(iface, "lo"): |
| prio.priority = -10 |
| case strings.HasPrefix(iface, "tap"): |
| prio.priority = -5 |
| case strings.HasPrefix(iface, "tun"): |
| prio.priority = -5 |
| case strings.HasPrefix(iface, "veth"): |
| prio.priority = 5 |
| case strings.HasPrefix(iface, "wl"): |
| prio.priority = 5 |
| case strings.HasPrefix(iface, "enp"): |
| prio.priority = 10 |
| case strings.HasPrefix(iface, "eth"): |
| prio.priority = 10 |
| } |
| |
| prioritized = append(prioritized, prio) |
| } |
| |
| sort.Slice(prioritized, func(i, j int) bool { return prioritized[i].priority > prioritized[j].priority }) |
| return prioritized[0].addr |
| } |
| |
| type ifaceWithPriority struct { |
| iface string |
| addr net.IP |
| priority int |
| } |