cluster/identd/ident: add basic ident protocol server
This adds an ident protocol server and tests for it.
Change-Id: I830f85faa7dce4220bd7001635b20e88b4a8b417
diff --git a/cluster/identd/ident/server.go b/cluster/identd/ident/server.go
new file mode 100644
index 0000000..ac985d9
--- /dev/null
+++ b/cluster/identd/ident/server.go
@@ -0,0 +1,282 @@
+package ident
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/golang/glog"
+)
+
+// NewServer returns an ident Server.
+func NewServer() *Server {
+ return &Server{
+ handler: unsetHandler,
+ }
+}
+
+// Server is an ident protocol server (per RFC1413). It must be configured with
+// a HandlerFunc before Serve is called.
+// Multiple goroutines may invoke methods on Server simultaneously, but the
+// Server can only Serve one listener at a time.
+type Server struct {
+ handler HandlerFunc
+ // mu guards stopC and stop.
+ mu sync.Mutex
+ // stopC is set if Serve() is already running. If it gets closed, Serve()
+ // will quit and set stopC to nil.
+ stopC chan struct{}
+ // stop can be set to true if Serve() is not yet running but has already
+ // been requested to Stop() (eg. if Serve() is ran in a goroutine which
+ // hasn't yet scheduled). It will be set back to false when Serve() sees it
+ // set and exits.
+ stop bool
+}
+
+// ResponseWriter is passed to HandlerFuncs and is used to signal to the Server
+// that the HandlerFunc wants to respond to the incoming Request in a certain
+// way.
+// Only the goroutine that the HandlerFunc has been started in may invoke
+// methods on the ResponseWriter.
+type ResponseWriter interface {
+ // SendError returns an ident ErrorResponse to the ident client. This can
+ // only be called once, and cannot be called after SendIdent.
+ SendError(ErrorResponse) error
+ // SendIdent returns an ident IdentResponse to the ident client. This can
+ // only be called once, and cannot be called after SendError.
+ SendIdent(*IdentResponse) error
+}
+
+// HandlerFunc is a function that will be called to serve a given ident
+// Request. Users of the Server must implement this and configure a Server to
+// use it by invoking Server.HandleFunc.
+// Each HandlerFunc will be started in its own goroutine. When HandlerFunc
+// returns, the Server will attempt to serve more incoming requests from the
+// ident client.
+// The Server does not limit the amount of concurrent ident connections that it
+// serves. If the Server user wishes to limit concurrency, she must do it
+// herself, eg. by using a semaphore. The Server will continue accepting new
+// connections and starting new HandlerFuncs, if the user code needs to push
+// back it should return as early as possible. There currently is no way to
+// make the Server refuse connections above some concurrncy limit.
+// The Server does not impose any execution timeout on handlers. If the Server
+// user wishes to impose an execution timeout, she must do it herself, eg.
+// using context.WithTimeout or time.After.
+// The passed Context will be canceled when the ident client disconnects or the
+// Server shuts down. The HandlerFunc must return as early as it can detect
+// that the context is done.
+type HandlerFunc func(ctx context.Context, w ResponseWriter, r *Request)
+
+// responseWriter implements ResponseWriter for a Server.
+type responseWriter struct {
+ conn net.Conn
+ req *Request
+ responded bool
+}
+
+// sendResponse sends a Response to the ident client. The Response must already
+// be fully populated.
+func (w *responseWriter) sendResponse(r *Response) error {
+ if w.responded {
+ return fmt.Errorf("handler already sent a response")
+ }
+ w.responded = true
+ data := r.encode()
+ if data == nil {
+ return fmt.Errorf("failed to encode response")
+ }
+ glog.V(3).Infof(" -> %q", data)
+ _, err := w.conn.Write(data)
+ if err != nil {
+ return fmt.Errorf("writing response failed: %w", err)
+ }
+ return nil
+}
+
+func (w *responseWriter) SendError(e ErrorResponse) error {
+ if !e.IsError() {
+ return fmt.Errorf("error response must contain a valid error")
+ }
+ return w.sendResponse(&Response{
+ ClientPort: w.req.ClientPort,
+ ServerPort: w.req.ServerPort,
+ Error: e,
+ })
+}
+
+func (w *responseWriter) SendIdent(i *IdentResponse) error {
+ ir := *i
+ // TODO(q3k): enforce RFC1413 limits.
+ if ir.OperatingSystem == "" {
+ ir.OperatingSystem = "UNIX"
+ }
+ if ir.UserID == "" {
+ return fmt.Errorf("ident response must have UserID set")
+ }
+ return w.sendResponse(&Response{
+ ClientPort: w.req.ClientPort,
+ ServerPort: w.req.ServerPort,
+ Ident: &ir,
+ })
+}
+
+var (
+ unsetHandlerErrorOnce sync.Once
+)
+
+// unsetHandler is the default handler that is configured for a Server. It
+// returns UNKNOWN-ERROR to the ident client and logs an error once if it's
+// called (telling the user about a misconfiguration / programming error).
+func unsetHandler(ctx context.Context, w ResponseWriter, r *Request) {
+ unsetHandlerErrorOnce.Do(func() {
+ glog.Errorf("Server with no handler configured - will always return UNKNOWN-ERROR")
+ })
+ w.SendError(UnknownError)
+}
+
+// HandleFunc sets the HandlerFunc that the server will call for every incoming
+// ident request. If a HandlerFunc is already set, it will be overwritten by
+// the given new function.
+func (s *Server) HandleFunc(fn HandlerFunc) {
+ s.handler = fn
+}
+
+// Serve runs the ident server, blocking until a transport-level error occurs
+// or Stop() is invoked. The returned error will be nil on Stop(), and will
+// wrap the underlying transport-level error otherwise.
+//
+// Only one invokation of Serve() can be run at a time, but Serve can be called
+// again after Stop() is called, and can be ran on a different Listener - no
+// state is kept in between subsequent Serve() runs.
+func (s *Server) Serve(lis net.Listener) error {
+ s.mu.Lock()
+ if s.stopC != nil {
+ s.mu.Unlock()
+ return fmt.Errorf("cannot Serve() an already serving server")
+ }
+ // Stop() has been invoked before Serve() started.
+ if s.stop == true {
+ s.stop = false
+ s.mu.Unlock()
+ return nil
+ }
+ // Set stopC to allow Stop() calls to stop this running Serve(). It will be
+ // set to nil on exit.
+ stopC := make(chan struct{})
+ s.stopC = stopC
+ s.mu.Unlock()
+
+ defer func() {
+ s.mu.Lock()
+ s.stopC = nil
+ s.mu.Unlock()
+ }()
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ for {
+ lisConnC := make(chan net.Conn)
+ lisErrC := make(chan error)
+ go func() {
+ conn, err := lis.Accept()
+ select {
+ case <-stopC:
+ // Server stopped, drop the accepted connection (if any)
+ // and return.
+ glog.V(2).Infof("Accept goroutine stopping...")
+ if err == nil {
+ conn.Close()
+ }
+ return
+ default:
+ }
+ if err == nil {
+ glog.V(5).Infof("Accept ok: %v", conn.RemoteAddr())
+ lisConnC <- conn
+ } else {
+ glog.V(5).Infof("Accept err: %v", err)
+ lisErrC <- err
+ }
+ }()
+
+ select {
+ case <-stopC:
+ // Server stopped, return.
+ ctxC()
+ return nil
+ case err := <-lisErrC:
+ ctxC()
+ // Accept() failed, return error.
+ return err
+ case conn := <-lisConnC:
+ // Accept() succeeded, serve request.
+ go s.serve(ctx, conn)
+ }
+ }
+}
+
+func (s *Server) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.stopC != nil {
+ close(s.stopC)
+ } else {
+ s.stop = true
+ }
+}
+
+func (s *Server) serve(ctx context.Context, conn net.Conn) {
+ glog.V(2).Infof("Serving connection %v", conn.RemoteAddr())
+ scanner := bufio.NewScanner(conn)
+ // The RFC does not place a limit on the request line length, only on
+ // response length. We set an arbitrary limit to 1024 bytes.
+ scanner.Buffer(nil, 1024)
+
+ for {
+ // Implement an arbitrary timeout for receiving data from client.
+ // TODO(q3k): make this configurable
+ go func() {
+ timer := time.NewTimer(10 * time.Second)
+ defer timer.Stop()
+ select {
+ case <-ctx.Done():
+ return
+ case <-timer.C:
+ glog.V(1).Infof("Connection %v: terminating on receive timeout", conn.RemoteAddr())
+ conn.Close()
+ }
+ }()
+ if !scanner.Scan() {
+ err := scanner.Err()
+ if err == nil {
+ // EOF, just return.
+ return
+ }
+ // Some other transport level error occured, or the request line
+ // was too long. We can only log this and be done.
+ glog.V(1).Infof("Connection %v: scan failed: %v", conn.RemoteAddr(), err)
+ conn.Close()
+ return
+ }
+ data := scanner.Bytes()
+ glog.V(3).Infof(" <- %q", data)
+ req, err := decodeRequest(data)
+ if err != nil {
+ glog.V(1).Infof("Connection %v: could not decode request: %v", conn.RemoteAddr(), err)
+ conn.Close()
+ return
+ }
+ req.ClientAddress = conn.RemoteAddr()
+ rw := responseWriter{
+ req: req,
+ conn: conn,
+ }
+ s.handler(ctx, &rw, req)
+ if !rw.responded {
+ glog.Warningf("Connection %v: handler did not send response, sending UNKNOWN-ERROR", conn.RemoteAddr())
+ rw.SendError(UnknownError)
+ }
+ }
+}