blob: 07cf254d5765d699dd1ecb71389dad9598e2a837 [file] [log] [blame]
Serge Bazanskibe538db2020-11-12 00:22:42 +01001// Copyright 2017 The kubecfg authors
2//
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16// Changes:
17// * Merged updates from https://github.com/kubernetes/client-go/blob/kubernetes-1.18.1/discovery/cached/memory/memcache.go
18// --jjo, 2020-04-09
19
20package utils
21
22import (
23 "errors"
24 "fmt"
25 "net"
26 "net/url"
27 "sync"
28 "syscall"
29
30 openapi_v2 "github.com/googleapis/gnostic/openapiv2"
31
32 log "github.com/sirupsen/logrus"
33 errorsutil "k8s.io/apimachinery/pkg/api/errors"
34 "k8s.io/apimachinery/pkg/api/meta"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/apimachinery/pkg/runtime"
37 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
38 "k8s.io/apimachinery/pkg/version"
39 "k8s.io/client-go/discovery"
40 "k8s.io/client-go/dynamic"
41 restclient "k8s.io/client-go/rest"
42)
43
44type cacheEntry struct {
45 resourceList *metav1.APIResourceList
46 err error
47}
48
49// memcachedDiscoveryClient can Invalidate() to stay up-to-date with discovery
50// information.
51//
52// TODO: Switch to a watch interface. Right now it will poll after each
53// Invalidate() call.
54type memcachedDiscoveryClient struct {
55 delegate discovery.DiscoveryInterface
56
57 lock sync.RWMutex
58 groupToServerResources map[string]*cacheEntry
59 groupList *metav1.APIGroupList
60 cacheValid bool
61}
62
63// Error Constants
64var (
65 ErrCacheNotFound = errors.New("not found")
66)
67
68var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{}
69
70// isTransientConnectionError checks whether given error is "Connection refused" or
71// "Connection reset" error which usually means that apiserver is temporarily
72// unavailable.
73func isTransientConnectionError(err error) bool {
74 urlError, ok := err.(*url.Error)
75 if !ok {
76 return false
77 }
78 opError, ok := urlError.Err.(*net.OpError)
79 if !ok {
80 return false
81 }
82 errno, ok := opError.Err.(syscall.Errno)
83 if !ok {
84 return false
85 }
86 return errno == syscall.ECONNREFUSED || errno == syscall.ECONNRESET
87}
88
89func isTransientError(err error) bool {
90 if isTransientConnectionError(err) {
91 return true
92 }
93
94 if t, ok := err.(errorsutil.APIStatus); ok && t.Status().Code >= 500 {
95 return true
96 }
97
98 return errorsutil.IsTooManyRequests(err)
99}
100
101// ServerResourcesForGroupVersion returns the supported resources for a group and version.
102func (d *memcachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
103 d.lock.Lock()
104 defer d.lock.Unlock()
105 if !d.cacheValid {
106 if err := d.refreshLocked(); err != nil {
107 return nil, err
108 }
109 }
110 cachedVal, ok := d.groupToServerResources[groupVersion]
111 if !ok {
112 return nil, ErrCacheNotFound
113 }
114
115 if cachedVal.err != nil && isTransientError(cachedVal.err) {
116 r, err := d.serverResourcesForGroupVersion(groupVersion)
117 if err != nil {
118 utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", groupVersion, err))
119 }
120 cachedVal = &cacheEntry{r, err}
121 d.groupToServerResources[groupVersion] = cachedVal
122 }
123
124 return cachedVal.resourceList, cachedVal.err
125}
126
127// ServerResources returns the supported resources for all groups and versions.
128// Deprecated: use ServerGroupsAndResources instead.
129func (d *memcachedDiscoveryClient) ServerResources() ([]*metav1.APIResourceList, error) {
130 return discovery.ServerResources(d)
131}
132
133// ServerGroupsAndResources returns the groups and supported resources for all groups and versions.
134func (d *memcachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
135 return discovery.ServerGroupsAndResources(d)
136}
137
138func (d *memcachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
139 d.lock.Lock()
140 defer d.lock.Unlock()
141 if !d.cacheValid {
142 if err := d.refreshLocked(); err != nil {
143 return nil, err
144 }
145 }
146 return d.groupList, nil
147}
148
149func (d *memcachedDiscoveryClient) RESTClient() restclient.Interface {
150 return d.delegate.RESTClient()
151}
152
153func (d *memcachedDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
154 return discovery.ServerPreferredResources(d)
155}
156
157func (d *memcachedDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
158 return discovery.ServerPreferredNamespacedResources(d)
159}
160
161func (d *memcachedDiscoveryClient) ServerVersion() (*version.Info, error) {
162 return d.delegate.ServerVersion()
163}
164
165func (d *memcachedDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
166 return d.delegate.OpenAPISchema()
167}
168
169func (d *memcachedDiscoveryClient) Fresh() bool {
170 d.lock.RLock()
171 defer d.lock.RUnlock()
172 // Return whether the cache is populated at all. It is still possible that
173 // a single entry is missing due to transient errors and the attempt to read
174 // that entry will trigger retry.
175 return d.cacheValid
176}
177
178// Invalidate enforces that no cached data that is older than the current time
179// is used.
180func (d *memcachedDiscoveryClient) Invalidate() {
181 d.lock.Lock()
182 defer d.lock.Unlock()
183 d.cacheValid = false
184 d.groupToServerResources = nil
185 d.groupList = nil
186}
187
188// refreshLocked refreshes the state of cache. The caller must hold d.lock for
189// writing.
190func (d *memcachedDiscoveryClient) refreshLocked() error {
191 // TODO: Could this multiplicative set of calls be replaced by a single call
192 // to ServerResources? If it's possible for more than one resulting
193 // APIResourceList to have the same GroupVersion, the lists would need merged.
194 gl, err := d.delegate.ServerGroups()
195 if err != nil || len(gl.Groups) == 0 {
196 utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err))
197 return err
198 }
199
200 wg := &sync.WaitGroup{}
201 resultLock := &sync.Mutex{}
202 rl := map[string]*cacheEntry{}
203 for _, g := range gl.Groups {
204 for _, v := range g.Versions {
205 gv := v.GroupVersion
206 wg.Add(1)
207 go func() {
208 defer wg.Done()
209 defer utilruntime.HandleCrash()
210
211 r, err := d.serverResourcesForGroupVersion(gv)
212 if err != nil {
213 utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", gv, err))
214 }
215
216 resultLock.Lock()
217 defer resultLock.Unlock()
218 rl[gv] = &cacheEntry{r, err}
219 }()
220 }
221 }
222 wg.Wait()
223
224 d.groupToServerResources, d.groupList = rl, gl
225 d.cacheValid = true
226 return nil
227}
228
229func (d *memcachedDiscoveryClient) serverResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
230 r, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
231 if err != nil {
232 return r, err
233 }
234 if len(r.APIResources) == 0 {
235 return r, fmt.Errorf("Got empty response for: %v", groupVersion)
236 }
237 return r, nil
238}
239
240var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{}
241
242// MaybeMarkStale calls MarkStale on the discovery client, if the
243// client is a memcachedClient.
244func MaybeMarkStale(d discovery.DiscoveryInterface) {
245 if c, ok := d.(*memcachedDiscoveryClient); ok {
246 c.Invalidate()
247 }
248}
249
250func (c *memcachedDiscoveryClient) MarkStale() {
251 c.lock.Lock()
252 defer c.lock.Unlock()
253
254 log.Debug("Marking cached discovery info (potentially) stale")
255 c.cacheValid = false
256}
257
258// ClientForResource returns the ResourceClient for a given object
259func ClientForResource(client dynamic.Interface, mapper meta.RESTMapper, obj runtime.Object, defNs string) (dynamic.ResourceInterface, error) {
260 gvk := obj.GetObjectKind().GroupVersionKind()
261
262 mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
263 if err != nil {
264 return nil, err
265 }
266
267 rc := client.Resource(mapping.Resource)
268
269 switch mapping.Scope.Name() {
270 case meta.RESTScopeNameRoot:
271 return rc, nil
272 case meta.RESTScopeNameNamespace:
273 meta, err := meta.Accessor(obj)
274 if err != nil {
275 return nil, err
276 }
277 namespace := meta.GetNamespace()
278 if namespace == "" {
279 namespace = defNs
280 }
281 return rc.Namespace(namespace), nil
282 default:
283 return nil, fmt.Errorf("unexpected resource scope %q", mapping.Scope)
284 }
285}
286
287// NewmemcachedDiscoveryClient creates a new CachedDiscoveryInterface which caches
288// discovery information in memory and will stay up-to-date if Invalidate is
289// called with regularity.
290//
291// NOTE: The client will NOT resort to live lookups on cache misses.
292func NewMemcachedDiscoveryClient(delegate discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface {
293 return &memcachedDiscoveryClient{
294 delegate: delegate,
295 groupToServerResources: map[string]*cacheEntry{},
296 }
297}