blob: df9bdb830af215103792b9dc5b588e1cd39ab8a8 [file] [log] [blame]
Serge Bazanskibe538db2020-11-12 00:22:42 +01001// Copyright 2017 The kubecfg authors
2//
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16// Changes:
17// * Merged updates from https://github.com/kubernetes/client-go/blob/kubernetes-1.18.1/discovery/cached/memory/memcache.go
18// --jjo, 2020-04-09
19
20package utils
21
22import (
23 "errors"
24 "fmt"
25 "net"
26 "net/url"
27 "sync"
28 "syscall"
29
Serge Bazanski97b5cd72023-07-28 17:14:50 +000030 openapi_v2 "github.com/google/gnostic/openapiv2"
Serge Bazanskibe538db2020-11-12 00:22:42 +010031
32 log "github.com/sirupsen/logrus"
33 errorsutil "k8s.io/apimachinery/pkg/api/errors"
34 "k8s.io/apimachinery/pkg/api/meta"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/apimachinery/pkg/runtime"
37 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
38 "k8s.io/apimachinery/pkg/version"
39 "k8s.io/client-go/discovery"
40 "k8s.io/client-go/dynamic"
Serge Bazanski97b5cd72023-07-28 17:14:50 +000041 openapi_v3 "k8s.io/client-go/openapi"
42 cachedopenapi_v3 "k8s.io/client-go/openapi/cached"
Serge Bazanskibe538db2020-11-12 00:22:42 +010043 restclient "k8s.io/client-go/rest"
44)
45
46type cacheEntry struct {
47 resourceList *metav1.APIResourceList
48 err error
49}
50
51// memcachedDiscoveryClient can Invalidate() to stay up-to-date with discovery
52// information.
53//
54// TODO: Switch to a watch interface. Right now it will poll after each
55// Invalidate() call.
56type memcachedDiscoveryClient struct {
57 delegate discovery.DiscoveryInterface
58
59 lock sync.RWMutex
60 groupToServerResources map[string]*cacheEntry
61 groupList *metav1.APIGroupList
Serge Bazanski97b5cd72023-07-28 17:14:50 +000062 openAPISchema *openapi_v2.Document
Serge Bazanskibe538db2020-11-12 00:22:42 +010063 cacheValid bool
Serge Bazanski97b5cd72023-07-28 17:14:50 +000064 openapiV3Client openapi_v3.Client
Serge Bazanskibe538db2020-11-12 00:22:42 +010065}
66
67// Error Constants
68var (
69 ErrCacheNotFound = errors.New("not found")
70)
71
72var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{}
73
74// isTransientConnectionError checks whether given error is "Connection refused" or
75// "Connection reset" error which usually means that apiserver is temporarily
76// unavailable.
77func isTransientConnectionError(err error) bool {
78 urlError, ok := err.(*url.Error)
79 if !ok {
80 return false
81 }
82 opError, ok := urlError.Err.(*net.OpError)
83 if !ok {
84 return false
85 }
86 errno, ok := opError.Err.(syscall.Errno)
87 if !ok {
88 return false
89 }
90 return errno == syscall.ECONNREFUSED || errno == syscall.ECONNRESET
91}
92
93func isTransientError(err error) bool {
94 if isTransientConnectionError(err) {
95 return true
96 }
97
98 if t, ok := err.(errorsutil.APIStatus); ok && t.Status().Code >= 500 {
99 return true
100 }
101
102 return errorsutil.IsTooManyRequests(err)
103}
104
105// ServerResourcesForGroupVersion returns the supported resources for a group and version.
106func (d *memcachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
107 d.lock.Lock()
108 defer d.lock.Unlock()
109 if !d.cacheValid {
110 if err := d.refreshLocked(); err != nil {
111 return nil, err
112 }
113 }
114 cachedVal, ok := d.groupToServerResources[groupVersion]
115 if !ok {
116 return nil, ErrCacheNotFound
117 }
118
119 if cachedVal.err != nil && isTransientError(cachedVal.err) {
120 r, err := d.serverResourcesForGroupVersion(groupVersion)
121 if err != nil {
122 utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", groupVersion, err))
123 }
124 cachedVal = &cacheEntry{r, err}
125 d.groupToServerResources[groupVersion] = cachedVal
126 }
127
128 return cachedVal.resourceList, cachedVal.err
129}
130
Serge Bazanskibe538db2020-11-12 00:22:42 +0100131// ServerGroupsAndResources returns the groups and supported resources for all groups and versions.
132func (d *memcachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
133 return discovery.ServerGroupsAndResources(d)
134}
135
136func (d *memcachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
137 d.lock.Lock()
138 defer d.lock.Unlock()
139 if !d.cacheValid {
140 if err := d.refreshLocked(); err != nil {
141 return nil, err
142 }
143 }
144 return d.groupList, nil
145}
146
147func (d *memcachedDiscoveryClient) RESTClient() restclient.Interface {
148 return d.delegate.RESTClient()
149}
150
151func (d *memcachedDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
152 return discovery.ServerPreferredResources(d)
153}
154
155func (d *memcachedDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
156 return discovery.ServerPreferredNamespacedResources(d)
157}
158
159func (d *memcachedDiscoveryClient) ServerVersion() (*version.Info, error) {
160 return d.delegate.ServerVersion()
161}
162
163func (d *memcachedDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
Serge Bazanski97b5cd72023-07-28 17:14:50 +0000164 d.lock.Lock()
165 defer d.lock.Unlock()
166
167 if d.openAPISchema == nil {
168 schema, err := d.delegate.OpenAPISchema()
169 if err != nil {
170 return nil, err
171 }
172 d.openAPISchema = schema
173 }
174
175 return d.openAPISchema, nil
Serge Bazanskibe538db2020-11-12 00:22:42 +0100176}
177
178func (d *memcachedDiscoveryClient) Fresh() bool {
179 d.lock.RLock()
180 defer d.lock.RUnlock()
181 // Return whether the cache is populated at all. It is still possible that
182 // a single entry is missing due to transient errors and the attempt to read
183 // that entry will trigger retry.
184 return d.cacheValid
185}
186
187// Invalidate enforces that no cached data that is older than the current time
188// is used.
189func (d *memcachedDiscoveryClient) Invalidate() {
190 d.lock.Lock()
191 defer d.lock.Unlock()
192 d.cacheValid = false
193 d.groupToServerResources = nil
194 d.groupList = nil
Serge Bazanski97b5cd72023-07-28 17:14:50 +0000195 d.openAPISchema = nil
196}
197
198// OpenAPIV3 retrieves and parses the OpenAPIV3 specs exposed by the server
199func (d *memcachedDiscoveryClient) OpenAPIV3() openapi_v3.Client {
200 d.lock.Lock()
201 defer d.lock.Unlock()
202
203 if d.openapiV3Client == nil {
204 // Delegate is discovery client created with special HTTP client which
205 // respects E-Tag cache responses to serve cache from disk.
206 d.openapiV3Client = cachedopenapi_v3.NewClient(d.delegate.OpenAPIV3())
207 }
208
209 return d.openapiV3Client
210}
211
212// taken from: https://github.com/kubernetes/client-go/commit/3ac73ea2c834b1268732024766f1e55a5d0327d2#diff-46edd694bf30a54d9f6e202e010134bedfce438de77f57830155b0762eda7bf6R280-R285
213// WithLegacy returns current cached discovery client;
214// current client does not support legacy-only discovery.
215func (d *memcachedDiscoveryClient) WithLegacy() discovery.DiscoveryInterface {
216 return d
Serge Bazanskibe538db2020-11-12 00:22:42 +0100217}
218
219// refreshLocked refreshes the state of cache. The caller must hold d.lock for
220// writing.
221func (d *memcachedDiscoveryClient) refreshLocked() error {
222 // TODO: Could this multiplicative set of calls be replaced by a single call
223 // to ServerResources? If it's possible for more than one resulting
224 // APIResourceList to have the same GroupVersion, the lists would need merged.
225 gl, err := d.delegate.ServerGroups()
226 if err != nil || len(gl.Groups) == 0 {
227 utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err))
228 return err
229 }
230
231 wg := &sync.WaitGroup{}
232 resultLock := &sync.Mutex{}
233 rl := map[string]*cacheEntry{}
234 for _, g := range gl.Groups {
235 for _, v := range g.Versions {
236 gv := v.GroupVersion
237 wg.Add(1)
238 go func() {
239 defer wg.Done()
240 defer utilruntime.HandleCrash()
241
242 r, err := d.serverResourcesForGroupVersion(gv)
243 if err != nil {
244 utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", gv, err))
245 }
246
247 resultLock.Lock()
248 defer resultLock.Unlock()
249 rl[gv] = &cacheEntry{r, err}
250 }()
251 }
252 }
253 wg.Wait()
254
255 d.groupToServerResources, d.groupList = rl, gl
256 d.cacheValid = true
257 return nil
258}
259
260func (d *memcachedDiscoveryClient) serverResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
261 r, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
262 if err != nil {
263 return r, err
264 }
265 if len(r.APIResources) == 0 {
266 return r, fmt.Errorf("Got empty response for: %v", groupVersion)
267 }
268 return r, nil
269}
270
271var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{}
272
273// MaybeMarkStale calls MarkStale on the discovery client, if the
274// client is a memcachedClient.
275func MaybeMarkStale(d discovery.DiscoveryInterface) {
276 if c, ok := d.(*memcachedDiscoveryClient); ok {
277 c.Invalidate()
278 }
279}
280
281func (c *memcachedDiscoveryClient) MarkStale() {
282 c.lock.Lock()
283 defer c.lock.Unlock()
284
285 log.Debug("Marking cached discovery info (potentially) stale")
286 c.cacheValid = false
287}
288
289// ClientForResource returns the ResourceClient for a given object
290func ClientForResource(client dynamic.Interface, mapper meta.RESTMapper, obj runtime.Object, defNs string) (dynamic.ResourceInterface, error) {
291 gvk := obj.GetObjectKind().GroupVersionKind()
292
293 mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
294 if err != nil {
295 return nil, err
296 }
297
298 rc := client.Resource(mapping.Resource)
299
300 switch mapping.Scope.Name() {
301 case meta.RESTScopeNameRoot:
302 return rc, nil
303 case meta.RESTScopeNameNamespace:
304 meta, err := meta.Accessor(obj)
305 if err != nil {
306 return nil, err
307 }
308 namespace := meta.GetNamespace()
309 if namespace == "" {
310 namespace = defNs
311 }
312 return rc.Namespace(namespace), nil
313 default:
314 return nil, fmt.Errorf("unexpected resource scope %q", mapping.Scope)
315 }
316}
317
318// NewmemcachedDiscoveryClient creates a new CachedDiscoveryInterface which caches
319// discovery information in memory and will stay up-to-date if Invalidate is
320// called with regularity.
321//
322// NOTE: The client will NOT resort to live lookups on cache misses.
323func NewMemcachedDiscoveryClient(delegate discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface {
324 return &memcachedDiscoveryClient{
325 delegate: delegate,
326 groupToServerResources: map[string]*cacheEntry{},
327 }
328}