Serge Bazanski | be538db | 2020-11-12 00:22:42 +0100 | [diff] [blame] | 1 | // Copyright 2017 The kubecfg authors |
| 2 | // |
| 3 | // |
| 4 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | // you may not use this file except in compliance with the License. |
| 6 | // You may obtain a copy of the License at |
| 7 | // |
| 8 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | // |
| 10 | // Unless required by applicable law or agreed to in writing, software |
| 11 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | // See the License for the specific language governing permissions and |
| 14 | // limitations under the License. |
| 15 | |
| 16 | // Changes: |
| 17 | // * Merged updates from https://github.com/kubernetes/client-go/blob/kubernetes-1.18.1/discovery/cached/memory/memcache.go |
| 18 | // --jjo, 2020-04-09 |
| 19 | |
| 20 | package utils |
| 21 | |
| 22 | import ( |
| 23 | "errors" |
| 24 | "fmt" |
| 25 | "net" |
| 26 | "net/url" |
| 27 | "sync" |
| 28 | "syscall" |
| 29 | |
| 30 | openapi_v2 "github.com/googleapis/gnostic/openapiv2" |
| 31 | |
| 32 | log "github.com/sirupsen/logrus" |
| 33 | errorsutil "k8s.io/apimachinery/pkg/api/errors" |
| 34 | "k8s.io/apimachinery/pkg/api/meta" |
| 35 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 36 | "k8s.io/apimachinery/pkg/runtime" |
| 37 | utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| 38 | "k8s.io/apimachinery/pkg/version" |
| 39 | "k8s.io/client-go/discovery" |
| 40 | "k8s.io/client-go/dynamic" |
| 41 | restclient "k8s.io/client-go/rest" |
| 42 | ) |
| 43 | |
| 44 | type cacheEntry struct { |
| 45 | resourceList *metav1.APIResourceList |
| 46 | err error |
| 47 | } |
| 48 | |
| 49 | // memcachedDiscoveryClient can Invalidate() to stay up-to-date with discovery |
| 50 | // information. |
| 51 | // |
| 52 | // TODO: Switch to a watch interface. Right now it will poll after each |
| 53 | // Invalidate() call. |
| 54 | type memcachedDiscoveryClient struct { |
| 55 | delegate discovery.DiscoveryInterface |
| 56 | |
| 57 | lock sync.RWMutex |
| 58 | groupToServerResources map[string]*cacheEntry |
| 59 | groupList *metav1.APIGroupList |
| 60 | cacheValid bool |
| 61 | } |
| 62 | |
| 63 | // Error Constants |
| 64 | var ( |
| 65 | ErrCacheNotFound = errors.New("not found") |
| 66 | ) |
| 67 | |
| 68 | var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{} |
| 69 | |
| 70 | // isTransientConnectionError checks whether given error is "Connection refused" or |
| 71 | // "Connection reset" error which usually means that apiserver is temporarily |
| 72 | // unavailable. |
| 73 | func isTransientConnectionError(err error) bool { |
| 74 | urlError, ok := err.(*url.Error) |
| 75 | if !ok { |
| 76 | return false |
| 77 | } |
| 78 | opError, ok := urlError.Err.(*net.OpError) |
| 79 | if !ok { |
| 80 | return false |
| 81 | } |
| 82 | errno, ok := opError.Err.(syscall.Errno) |
| 83 | if !ok { |
| 84 | return false |
| 85 | } |
| 86 | return errno == syscall.ECONNREFUSED || errno == syscall.ECONNRESET |
| 87 | } |
| 88 | |
| 89 | func isTransientError(err error) bool { |
| 90 | if isTransientConnectionError(err) { |
| 91 | return true |
| 92 | } |
| 93 | |
| 94 | if t, ok := err.(errorsutil.APIStatus); ok && t.Status().Code >= 500 { |
| 95 | return true |
| 96 | } |
| 97 | |
| 98 | return errorsutil.IsTooManyRequests(err) |
| 99 | } |
| 100 | |
| 101 | // ServerResourcesForGroupVersion returns the supported resources for a group and version. |
| 102 | func (d *memcachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { |
| 103 | d.lock.Lock() |
| 104 | defer d.lock.Unlock() |
| 105 | if !d.cacheValid { |
| 106 | if err := d.refreshLocked(); err != nil { |
| 107 | return nil, err |
| 108 | } |
| 109 | } |
| 110 | cachedVal, ok := d.groupToServerResources[groupVersion] |
| 111 | if !ok { |
| 112 | return nil, ErrCacheNotFound |
| 113 | } |
| 114 | |
| 115 | if cachedVal.err != nil && isTransientError(cachedVal.err) { |
| 116 | r, err := d.serverResourcesForGroupVersion(groupVersion) |
| 117 | if err != nil { |
| 118 | utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", groupVersion, err)) |
| 119 | } |
| 120 | cachedVal = &cacheEntry{r, err} |
| 121 | d.groupToServerResources[groupVersion] = cachedVal |
| 122 | } |
| 123 | |
| 124 | return cachedVal.resourceList, cachedVal.err |
| 125 | } |
| 126 | |
| 127 | // ServerResources returns the supported resources for all groups and versions. |
| 128 | // Deprecated: use ServerGroupsAndResources instead. |
| 129 | func (d *memcachedDiscoveryClient) ServerResources() ([]*metav1.APIResourceList, error) { |
| 130 | return discovery.ServerResources(d) |
| 131 | } |
| 132 | |
| 133 | // ServerGroupsAndResources returns the groups and supported resources for all groups and versions. |
| 134 | func (d *memcachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) { |
| 135 | return discovery.ServerGroupsAndResources(d) |
| 136 | } |
| 137 | |
| 138 | func (d *memcachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) { |
| 139 | d.lock.Lock() |
| 140 | defer d.lock.Unlock() |
| 141 | if !d.cacheValid { |
| 142 | if err := d.refreshLocked(); err != nil { |
| 143 | return nil, err |
| 144 | } |
| 145 | } |
| 146 | return d.groupList, nil |
| 147 | } |
| 148 | |
| 149 | func (d *memcachedDiscoveryClient) RESTClient() restclient.Interface { |
| 150 | return d.delegate.RESTClient() |
| 151 | } |
| 152 | |
| 153 | func (d *memcachedDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) { |
| 154 | return discovery.ServerPreferredResources(d) |
| 155 | } |
| 156 | |
| 157 | func (d *memcachedDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { |
| 158 | return discovery.ServerPreferredNamespacedResources(d) |
| 159 | } |
| 160 | |
| 161 | func (d *memcachedDiscoveryClient) ServerVersion() (*version.Info, error) { |
| 162 | return d.delegate.ServerVersion() |
| 163 | } |
| 164 | |
| 165 | func (d *memcachedDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) { |
| 166 | return d.delegate.OpenAPISchema() |
| 167 | } |
| 168 | |
| 169 | func (d *memcachedDiscoveryClient) Fresh() bool { |
| 170 | d.lock.RLock() |
| 171 | defer d.lock.RUnlock() |
| 172 | // Return whether the cache is populated at all. It is still possible that |
| 173 | // a single entry is missing due to transient errors and the attempt to read |
| 174 | // that entry will trigger retry. |
| 175 | return d.cacheValid |
| 176 | } |
| 177 | |
| 178 | // Invalidate enforces that no cached data that is older than the current time |
| 179 | // is used. |
| 180 | func (d *memcachedDiscoveryClient) Invalidate() { |
| 181 | d.lock.Lock() |
| 182 | defer d.lock.Unlock() |
| 183 | d.cacheValid = false |
| 184 | d.groupToServerResources = nil |
| 185 | d.groupList = nil |
| 186 | } |
| 187 | |
| 188 | // refreshLocked refreshes the state of cache. The caller must hold d.lock for |
| 189 | // writing. |
| 190 | func (d *memcachedDiscoveryClient) refreshLocked() error { |
| 191 | // TODO: Could this multiplicative set of calls be replaced by a single call |
| 192 | // to ServerResources? If it's possible for more than one resulting |
| 193 | // APIResourceList to have the same GroupVersion, the lists would need merged. |
| 194 | gl, err := d.delegate.ServerGroups() |
| 195 | if err != nil || len(gl.Groups) == 0 { |
| 196 | utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err)) |
| 197 | return err |
| 198 | } |
| 199 | |
| 200 | wg := &sync.WaitGroup{} |
| 201 | resultLock := &sync.Mutex{} |
| 202 | rl := map[string]*cacheEntry{} |
| 203 | for _, g := range gl.Groups { |
| 204 | for _, v := range g.Versions { |
| 205 | gv := v.GroupVersion |
| 206 | wg.Add(1) |
| 207 | go func() { |
| 208 | defer wg.Done() |
| 209 | defer utilruntime.HandleCrash() |
| 210 | |
| 211 | r, err := d.serverResourcesForGroupVersion(gv) |
| 212 | if err != nil { |
| 213 | utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", gv, err)) |
| 214 | } |
| 215 | |
| 216 | resultLock.Lock() |
| 217 | defer resultLock.Unlock() |
| 218 | rl[gv] = &cacheEntry{r, err} |
| 219 | }() |
| 220 | } |
| 221 | } |
| 222 | wg.Wait() |
| 223 | |
| 224 | d.groupToServerResources, d.groupList = rl, gl |
| 225 | d.cacheValid = true |
| 226 | return nil |
| 227 | } |
| 228 | |
| 229 | func (d *memcachedDiscoveryClient) serverResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { |
| 230 | r, err := d.delegate.ServerResourcesForGroupVersion(groupVersion) |
| 231 | if err != nil { |
| 232 | return r, err |
| 233 | } |
| 234 | if len(r.APIResources) == 0 { |
| 235 | return r, fmt.Errorf("Got empty response for: %v", groupVersion) |
| 236 | } |
| 237 | return r, nil |
| 238 | } |
| 239 | |
| 240 | var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{} |
| 241 | |
| 242 | // MaybeMarkStale calls MarkStale on the discovery client, if the |
| 243 | // client is a memcachedClient. |
| 244 | func MaybeMarkStale(d discovery.DiscoveryInterface) { |
| 245 | if c, ok := d.(*memcachedDiscoveryClient); ok { |
| 246 | c.Invalidate() |
| 247 | } |
| 248 | } |
| 249 | |
| 250 | func (c *memcachedDiscoveryClient) MarkStale() { |
| 251 | c.lock.Lock() |
| 252 | defer c.lock.Unlock() |
| 253 | |
| 254 | log.Debug("Marking cached discovery info (potentially) stale") |
| 255 | c.cacheValid = false |
| 256 | } |
| 257 | |
| 258 | // ClientForResource returns the ResourceClient for a given object |
| 259 | func ClientForResource(client dynamic.Interface, mapper meta.RESTMapper, obj runtime.Object, defNs string) (dynamic.ResourceInterface, error) { |
| 260 | gvk := obj.GetObjectKind().GroupVersionKind() |
| 261 | |
| 262 | mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) |
| 263 | if err != nil { |
| 264 | return nil, err |
| 265 | } |
| 266 | |
| 267 | rc := client.Resource(mapping.Resource) |
| 268 | |
| 269 | switch mapping.Scope.Name() { |
| 270 | case meta.RESTScopeNameRoot: |
| 271 | return rc, nil |
| 272 | case meta.RESTScopeNameNamespace: |
| 273 | meta, err := meta.Accessor(obj) |
| 274 | if err != nil { |
| 275 | return nil, err |
| 276 | } |
| 277 | namespace := meta.GetNamespace() |
| 278 | if namespace == "" { |
| 279 | namespace = defNs |
| 280 | } |
| 281 | return rc.Namespace(namespace), nil |
| 282 | default: |
| 283 | return nil, fmt.Errorf("unexpected resource scope %q", mapping.Scope) |
| 284 | } |
| 285 | } |
| 286 | |
| 287 | // NewmemcachedDiscoveryClient creates a new CachedDiscoveryInterface which caches |
| 288 | // discovery information in memory and will stay up-to-date if Invalidate is |
| 289 | // called with regularity. |
| 290 | // |
| 291 | // NOTE: The client will NOT resort to live lookups on cache misses. |
| 292 | func NewMemcachedDiscoveryClient(delegate discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface { |
| 293 | return &memcachedDiscoveryClient{ |
| 294 | delegate: delegate, |
| 295 | groupToServerResources: map[string]*cacheEntry{}, |
| 296 | } |
| 297 | } |