blob: df9bdb830af215103792b9dc5b588e1cd39ab8a8 [file] [log] [blame]
// Copyright 2017 The kubecfg authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Changes:
// * Merged updates from https://github.com/kubernetes/client-go/blob/kubernetes-1.18.1/discovery/cached/memory/memcache.go
// --jjo, 2020-04-09
package utils
import (
"errors"
"fmt"
"net"
"net/url"
"sync"
"syscall"
openapi_v2 "github.com/google/gnostic/openapiv2"
log "github.com/sirupsen/logrus"
errorsutil "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
openapi_v3 "k8s.io/client-go/openapi"
cachedopenapi_v3 "k8s.io/client-go/openapi/cached"
restclient "k8s.io/client-go/rest"
)
type cacheEntry struct {
resourceList *metav1.APIResourceList
err error
}
// memcachedDiscoveryClient can Invalidate() to stay up-to-date with discovery
// information.
//
// TODO: Switch to a watch interface. Right now it will poll after each
// Invalidate() call.
type memcachedDiscoveryClient struct {
delegate discovery.DiscoveryInterface
lock sync.RWMutex
groupToServerResources map[string]*cacheEntry
groupList *metav1.APIGroupList
openAPISchema *openapi_v2.Document
cacheValid bool
openapiV3Client openapi_v3.Client
}
// Error Constants
var (
ErrCacheNotFound = errors.New("not found")
)
var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{}
// isTransientConnectionError checks whether given error is "Connection refused" or
// "Connection reset" error which usually means that apiserver is temporarily
// unavailable.
func isTransientConnectionError(err error) bool {
urlError, ok := err.(*url.Error)
if !ok {
return false
}
opError, ok := urlError.Err.(*net.OpError)
if !ok {
return false
}
errno, ok := opError.Err.(syscall.Errno)
if !ok {
return false
}
return errno == syscall.ECONNREFUSED || errno == syscall.ECONNRESET
}
func isTransientError(err error) bool {
if isTransientConnectionError(err) {
return true
}
if t, ok := err.(errorsutil.APIStatus); ok && t.Status().Code >= 500 {
return true
}
return errorsutil.IsTooManyRequests(err)
}
// ServerResourcesForGroupVersion returns the supported resources for a group and version.
func (d *memcachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
d.lock.Lock()
defer d.lock.Unlock()
if !d.cacheValid {
if err := d.refreshLocked(); err != nil {
return nil, err
}
}
cachedVal, ok := d.groupToServerResources[groupVersion]
if !ok {
return nil, ErrCacheNotFound
}
if cachedVal.err != nil && isTransientError(cachedVal.err) {
r, err := d.serverResourcesForGroupVersion(groupVersion)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", groupVersion, err))
}
cachedVal = &cacheEntry{r, err}
d.groupToServerResources[groupVersion] = cachedVal
}
return cachedVal.resourceList, cachedVal.err
}
// ServerGroupsAndResources returns the groups and supported resources for all groups and versions.
func (d *memcachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
return discovery.ServerGroupsAndResources(d)
}
func (d *memcachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
d.lock.Lock()
defer d.lock.Unlock()
if !d.cacheValid {
if err := d.refreshLocked(); err != nil {
return nil, err
}
}
return d.groupList, nil
}
func (d *memcachedDiscoveryClient) RESTClient() restclient.Interface {
return d.delegate.RESTClient()
}
func (d *memcachedDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
return discovery.ServerPreferredResources(d)
}
func (d *memcachedDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
return discovery.ServerPreferredNamespacedResources(d)
}
func (d *memcachedDiscoveryClient) ServerVersion() (*version.Info, error) {
return d.delegate.ServerVersion()
}
func (d *memcachedDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
d.lock.Lock()
defer d.lock.Unlock()
if d.openAPISchema == nil {
schema, err := d.delegate.OpenAPISchema()
if err != nil {
return nil, err
}
d.openAPISchema = schema
}
return d.openAPISchema, nil
}
func (d *memcachedDiscoveryClient) Fresh() bool {
d.lock.RLock()
defer d.lock.RUnlock()
// Return whether the cache is populated at all. It is still possible that
// a single entry is missing due to transient errors and the attempt to read
// that entry will trigger retry.
return d.cacheValid
}
// Invalidate enforces that no cached data that is older than the current time
// is used.
func (d *memcachedDiscoveryClient) Invalidate() {
d.lock.Lock()
defer d.lock.Unlock()
d.cacheValid = false
d.groupToServerResources = nil
d.groupList = nil
d.openAPISchema = nil
}
// OpenAPIV3 retrieves and parses the OpenAPIV3 specs exposed by the server
func (d *memcachedDiscoveryClient) OpenAPIV3() openapi_v3.Client {
d.lock.Lock()
defer d.lock.Unlock()
if d.openapiV3Client == nil {
// Delegate is discovery client created with special HTTP client which
// respects E-Tag cache responses to serve cache from disk.
d.openapiV3Client = cachedopenapi_v3.NewClient(d.delegate.OpenAPIV3())
}
return d.openapiV3Client
}
// taken from: https://github.com/kubernetes/client-go/commit/3ac73ea2c834b1268732024766f1e55a5d0327d2#diff-46edd694bf30a54d9f6e202e010134bedfce438de77f57830155b0762eda7bf6R280-R285
// WithLegacy returns current cached discovery client;
// current client does not support legacy-only discovery.
func (d *memcachedDiscoveryClient) WithLegacy() discovery.DiscoveryInterface {
return d
}
// refreshLocked refreshes the state of cache. The caller must hold d.lock for
// writing.
func (d *memcachedDiscoveryClient) refreshLocked() error {
// TODO: Could this multiplicative set of calls be replaced by a single call
// to ServerResources? If it's possible for more than one resulting
// APIResourceList to have the same GroupVersion, the lists would need merged.
gl, err := d.delegate.ServerGroups()
if err != nil || len(gl.Groups) == 0 {
utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err))
return err
}
wg := &sync.WaitGroup{}
resultLock := &sync.Mutex{}
rl := map[string]*cacheEntry{}
for _, g := range gl.Groups {
for _, v := range g.Versions {
gv := v.GroupVersion
wg.Add(1)
go func() {
defer wg.Done()
defer utilruntime.HandleCrash()
r, err := d.serverResourcesForGroupVersion(gv)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", gv, err))
}
resultLock.Lock()
defer resultLock.Unlock()
rl[gv] = &cacheEntry{r, err}
}()
}
}
wg.Wait()
d.groupToServerResources, d.groupList = rl, gl
d.cacheValid = true
return nil
}
func (d *memcachedDiscoveryClient) serverResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
r, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
if err != nil {
return r, err
}
if len(r.APIResources) == 0 {
return r, fmt.Errorf("Got empty response for: %v", groupVersion)
}
return r, nil
}
var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{}
// MaybeMarkStale calls MarkStale on the discovery client, if the
// client is a memcachedClient.
func MaybeMarkStale(d discovery.DiscoveryInterface) {
if c, ok := d.(*memcachedDiscoveryClient); ok {
c.Invalidate()
}
}
func (c *memcachedDiscoveryClient) MarkStale() {
c.lock.Lock()
defer c.lock.Unlock()
log.Debug("Marking cached discovery info (potentially) stale")
c.cacheValid = false
}
// ClientForResource returns the ResourceClient for a given object
func ClientForResource(client dynamic.Interface, mapper meta.RESTMapper, obj runtime.Object, defNs string) (dynamic.ResourceInterface, error) {
gvk := obj.GetObjectKind().GroupVersionKind()
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
rc := client.Resource(mapping.Resource)
switch mapping.Scope.Name() {
case meta.RESTScopeNameRoot:
return rc, nil
case meta.RESTScopeNameNamespace:
meta, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
namespace := meta.GetNamespace()
if namespace == "" {
namespace = defNs
}
return rc.Namespace(namespace), nil
default:
return nil, fmt.Errorf("unexpected resource scope %q", mapping.Scope)
}
}
// NewmemcachedDiscoveryClient creates a new CachedDiscoveryInterface which caches
// discovery information in memory and will stay up-to-date if Invalidate is
// called with regularity.
//
// NOTE: The client will NOT resort to live lookups on cache misses.
func NewMemcachedDiscoveryClient(delegate discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface {
return &memcachedDiscoveryClient{
delegate: delegate,
groupToServerResources: map[string]*cacheEntry{},
}
}