blob: f4c45eb0e3051d070c153660914827c4ea6d4744 [file] [log] [blame]
package mirko
import (
"context"
"flag"
"fmt"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"sort"
"strings"
"time"
"github.com/golang/glog"
"golang.org/x/net/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"k8s.io/client-go/kubernetes"
"code.hackerspace.pl/hscloud/go/pki"
"code.hackerspace.pl/hscloud/go/statusz"
)
var (
flagListenAddress string
flagDebugAddress string
flagDebugAllowAll bool
)
func init() {
flag.StringVar(&flagListenAddress, "listen_address", "127.0.0.1:4200", "gRPC listen address")
flag.StringVar(&flagDebugAddress, "debug_address", "127.0.0.1:4201", "HTTP debug/status listen address")
flag.BoolVar(&flagDebugAllowAll, "debug_allow_all", false, "HTTP debug/status available to everyone")
flag.Set("logtostderr", "true")
}
type Mirko struct {
grpcListen net.Listener
grpcServer *grpc.Server
httpListen net.Listener
httpServer *http.Server
httpMux *http.ServeMux
kubernetesCS *kubernetes.Clientset
ctx context.Context
cancel context.CancelFunc
}
func New() *Mirko {
ctx, cancel := context.WithCancel(context.Background())
return &Mirko{
ctx: ctx,
cancel: cancel,
}
}
func authRequest(req *http.Request) (any, sensitive bool) {
host, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
host = req.RemoteAddr
}
if flagDebugAllowAll {
return true, true
}
switch host {
case "localhost", "127.0.0.1", "::1":
return true, true
default:
return false, false
}
}
func (m *Mirko) Listen() error {
grpc.EnableTracing = true
trace.AuthRequest = authRequest
grpcLis, err := net.Listen("tcp", flagListenAddress)
if err != nil {
return fmt.Errorf("net.Listen: %v", err)
}
m.grpcListen = grpcLis
m.grpcServer = grpc.NewServer(pki.WithServerHSPKI()...)
reflection.Register(m.grpcServer)
httpLis, err := net.Listen("tcp", flagDebugAddress)
if err != nil {
return fmt.Errorf("net.Listen: %v", err)
}
m.httpMux = http.NewServeMux()
// Canonical URLs
m.httpMux.HandleFunc("/debug/status", func(w http.ResponseWriter, r *http.Request) {
any, _ := authRequest(r)
if !any {
http.Error(w, "not allowed", http.StatusUnauthorized)
return
}
statusz.StatusHandler(w, r)
})
m.httpMux.HandleFunc("/debug/requests", trace.Traces)
// pprof endpoints
m.httpMux.HandleFunc("/debug/pprof/", pprof.Index)
m.httpMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.httpMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.httpMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.httpMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
// -z legacy URLs
m.httpMux.HandleFunc("/statusz", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/debug/status", http.StatusSeeOther)
})
m.httpMux.HandleFunc("/rpcz", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/debug/requests", http.StatusSeeOther)
})
m.httpMux.HandleFunc("/requestz", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/debug/requests", http.StatusSeeOther)
})
m.httpMux.HandleFunc("/profilez", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/debug/pprof", http.StatusSeeOther)
})
// root redirect
m.httpMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/debug/status", http.StatusSeeOther)
})
m.kubernetesConnect()
debugParts := strings.Split(flagDebugAddress, ":")
debugPort := debugParts[len(debugParts)-1]
statusz.PublicAddress = fmt.Sprintf("http://%s:%s/", m.Address().String(), debugPort)
m.httpListen = httpLis
m.httpServer = &http.Server{
Addr: flagDebugAddress,
Handler: m.httpMux,
}
return nil
}
// Trace logs debug information to either a context trace (if present)
// or stderr (if not)
func Trace(ctx context.Context, f string, args ...interface{}) {
tr, ok := trace.FromContext(ctx)
if !ok {
fmtd := fmt.Sprintf(f, args...)
glog.Warningf("No trace in %v: %s", ctx, fmtd)
return
}
tr.LazyPrintf(f, args...)
}
// GRPC returns the microservice's grpc.Server object
func (m *Mirko) GRPC() *grpc.Server {
if m.grpcServer == nil {
panic("GRPC() called before Listen()")
}
return m.grpcServer
}
// HTTPMux returns the microservice's debug HTTP mux
func (m *Mirko) HTTPMux() *http.ServeMux {
if m.httpMux == nil {
panic("HTTPMux() called before Listen()")
}
return m.httpMux
}
// Context returns a background microservice context that will be canceled
// when the service is shut down
func (m *Mirko) Context() context.Context {
return m.ctx
}
// Done() returns a channel that will emit a value when the service is
// shut down. This should be used in the main() function instead of a select{}
// call, to allow the background context to be canceled fully.
func (m *Mirko) Done() <-chan struct{} {
return m.Context().Done()
}
// Serve starts serving HTTP and gRPC requests
func (m *Mirko) Serve() error {
errs := make(chan error, 1)
go func() {
if err := m.grpcServer.Serve(m.grpcListen); err != nil {
errs <- err
}
}()
go func() {
if err := m.httpServer.Serve(m.httpListen); err != nil {
errs <- err
}
}()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
go func() {
select {
case <-signalCh:
m.cancel()
}
}()
ticker := time.NewTicker(1 * time.Second)
select {
case <-ticker.C:
glog.Infof("gRPC listening on %s", flagListenAddress)
glog.Infof("HTTP listening on %s", flagDebugAddress)
return nil
case err := <-errs:
return err
}
}
// Address returns a linkable address where this service is running, sans port.
// If running within kubernetes, this will return the pod IP.
// Otherwise, this will guess the main, 'external' IP address of the machine it's running on.
// On failures, returns loopback address.
func (m *Mirko) Address() net.IP {
// If we're not running in Kubernetes and binding to 127.0.0.1, return loopback.
if m.kubernetesCS == nil && strings.HasPrefix(flagListenAddress, "127.0.0.1:") {
return net.ParseIP("127.0.0.1")
}
ifaces, err := net.Interfaces()
if err != nil {
glog.Errorf("net.Interface(): %v", err)
return net.ParseIP("127.0.0.1")
}
addrmap := make(map[string]net.IP)
for _, iface := range ifaces {
addrs, err := iface.Addrs()
if err != nil {
glog.Errorf("iface(%q).Addrs(): %v", iface.Name, err)
continue
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
continue
}
if strings.HasPrefix(ip.String(), "fe80:") {
continue
}
addrmap[iface.Name] = ip
}
}
if m.kubernetesCS != nil {
addr, ok := addrmap["eth0"]
if !ok {
glog.Errorf("Running on Kubernetes but no eth0! Available interfaces: %v", addrmap)
return net.ParseIP("127.0.0.1")
}
return addr
}
if len(addrmap) == 0 {
glog.Errorf("No interfaces found!")
return net.ParseIP("127.0.0.1")
}
// Heuristics ahoy!
prioritized := []*ifaceWithPriority{}
for iface, addr := range addrmap {
prio := &ifaceWithPriority{
iface: iface,
addr: addr,
}
switch {
case strings.HasPrefix(iface, "lo"):
prio.priority = -10
case strings.HasPrefix(iface, "tap"):
prio.priority = -5
case strings.HasPrefix(iface, "tun"):
prio.priority = -5
case strings.HasPrefix(iface, "veth"):
prio.priority = 5
case strings.HasPrefix(iface, "wl"):
prio.priority = 5
case strings.HasPrefix(iface, "enp"):
prio.priority = 10
case strings.HasPrefix(iface, "eth"):
prio.priority = 10
}
prioritized = append(prioritized, prio)
}
sort.Slice(prioritized, func(i, j int) bool { return prioritized[i].priority > prioritized[j].priority })
return prioritized[0].addr
}
type ifaceWithPriority struct {
iface string
addr net.IP
priority int
}