Serge Bazanski | 5de0b32 | 2021-03-20 12:04:26 +0000 | [diff] [blame] | 1 | package main |
| 2 | |
| 3 | import ( |
| 4 | "bytes" |
| 5 | "crypto/sha256" |
| 6 | "encoding/hex" |
| 7 | "fmt" |
| 8 | "io" |
| 9 | "io/ioutil" |
| 10 | "net/http" |
| 11 | "strings" |
| 12 | |
| 13 | "github.com/golang/glog" |
| 14 | "github.com/minio/minio-go/v7" |
| 15 | ) |
| 16 | |
| 17 | type service struct { |
| 18 | objectClient *minio.Client |
| 19 | objectBucket string |
| 20 | objectPrefix string |
| 21 | publicHandler http.Handler |
| 22 | } |
| 23 | |
| 24 | func newService(objectClient *minio.Client, objectBucket, objectPrefix string) *service { |
| 25 | s := &service{ |
| 26 | objectClient: objectClient, |
| 27 | objectBucket: objectBucket, |
| 28 | objectPrefix: objectPrefix, |
| 29 | } |
| 30 | mux := http.NewServeMux() |
| 31 | mux.HandleFunc("/", s.handlePublic) |
| 32 | s.publicHandler = mux |
| 33 | return s |
| 34 | } |
| 35 | |
| 36 | func (s *service) handlePublic(w http.ResponseWriter, r *http.Request) { |
| 37 | ctx := r.Context() |
| 38 | switch r.Method { |
| 39 | case "GET": |
| 40 | // Always allow GET access to cache. |
| 41 | case "PUT": |
| 42 | // Require authentication for cache writes. |
| 43 | // TODO(q3k): implement |
| 44 | default: |
| 45 | http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) |
| 46 | return |
| 47 | } |
| 48 | |
| 49 | parts := strings.Split(strings.TrimPrefix(r.URL.Path, "/"), "/") |
| 50 | if len(parts) != 2 { |
| 51 | http.NotFound(w, r) |
| 52 | return |
| 53 | } |
| 54 | switch parts[0] { |
| 55 | case "ac": |
| 56 | case "cas": |
| 57 | default: |
| 58 | http.NotFound(w, r) |
| 59 | return |
| 60 | } |
| 61 | |
| 62 | if len(parts[1]) != 64 { |
| 63 | http.NotFound(w, r) |
| 64 | return |
| 65 | } |
| 66 | |
| 67 | cacheKey := fmt.Sprintf("%s%s/%s", s.objectPrefix, parts[0], parts[1]) |
| 68 | glog.Infof("%s %s %s", r.RemoteAddr, r.Method, cacheKey) |
| 69 | |
| 70 | if r.Method == "GET" { |
| 71 | obj, err := s.objectClient.GetObject(ctx, s.objectBucket, cacheKey, minio.GetObjectOptions{}) |
| 72 | if err != nil { |
| 73 | glog.Errorf("GetObject(%s, %s): %v", s.objectBucket, cacheKey, err) |
| 74 | http.Error(w, "could not contact object store", http.StatusInternalServerError) |
| 75 | return |
| 76 | } |
| 77 | |
| 78 | _, err = obj.Stat() |
| 79 | // Minio-go doesn't seem to let us do this in any nicer way :/ |
| 80 | if err != nil && err.Error() == "The specified key does not exist." { |
| 81 | http.NotFound(w, r) |
| 82 | return |
| 83 | } else if err != nil { |
| 84 | glog.Errorf("Stat(%s, %s): %v", s.objectBucket, cacheKey, err) |
| 85 | http.Error(w, "could not contact object store", http.StatusInternalServerError) |
| 86 | return |
| 87 | } |
| 88 | |
| 89 | // Stream object to client. |
| 90 | io.Copy(w, obj) |
| 91 | } |
| 92 | if r.Method == "PUT" { |
| 93 | // Buffer the file, as we need to check its sha256. |
| 94 | // TODO(q3k): check and limit body size. |
| 95 | data, err := ioutil.ReadAll(r.Body) |
| 96 | if err != nil { |
| 97 | glog.Errorf("ReadAll: %v", err) |
| 98 | return |
| 99 | } |
| 100 | hashBytes := sha256.Sum256(data) |
| 101 | hash := hex.EncodeToString(hashBytes[:]) |
| 102 | // Bazel cache uploads always seem to use lowercase sha256 |
| 103 | // representations. |
| 104 | if parts[0] == "cas" && hash != parts[1] { |
| 105 | glog.Warningf("%s: sent PUT for %s with invalid hash %s", r.RemoteAddr, cacheKey, hash) |
| 106 | // Don't tell the user anything - Bazel won't care, anyway, and us |
| 107 | // logging this is probably good enough for debugging purposes. |
| 108 | return |
| 109 | } |
| 110 | // If the file already exists in the cache, ignore it. S3 doesn't seem |
| 111 | // to give us an upload-if-missing functionality? |
| 112 | _, err = s.objectClient.StatObject(ctx, s.objectBucket, cacheKey, minio.StatObjectOptions{}) |
| 113 | if err == nil { |
| 114 | // File already exists, return early. |
| 115 | // This might not fire in case we fail to retrieve the object for |
| 116 | // some reason other than its nonexistence, but an error will be |
| 117 | // served for this at PutObject later on. |
| 118 | return |
| 119 | } |
| 120 | |
| 121 | buffer := bytes.NewBuffer(data) |
| 122 | _, err = s.objectClient.PutObject(ctx, s.objectBucket, cacheKey, buffer, int64(len(data)), minio.PutObjectOptions{ |
| 123 | UserMetadata: map[string]string{ |
| 124 | "remote-cache-origin": r.RemoteAddr, |
| 125 | }, |
| 126 | }) |
| 127 | if err != nil { |
| 128 | // Swallow the error. Can't do much for the bazel writer, anyway. |
| 129 | // Retrying here isn't easy, as we don't want to become a |
| 130 | // qeueue/buffer unless really needed. |
| 131 | glog.Errorf("%s: PUT %s failed: %v", r.RemoteAddr, cacheKey, err) |
| 132 | return |
| 133 | } |
| 134 | } |
| 135 | } |