blob: 928104bdac9b78c5c00152c52928dbc70eed3d7b [file] [log] [blame]
Serge Bazanskibe538db2020-11-12 00:22:42 +01001package kubecfg
2
3import (
4 "context"
5 "fmt"
6 "sort"
7 "time"
8
9 jsonpatch "github.com/evanphx/json-patch"
10 log "github.com/sirupsen/logrus"
11 apiext_v1b1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
12 apiequality "k8s.io/apimachinery/pkg/api/equality"
13 "k8s.io/apimachinery/pkg/api/errors"
14 "k8s.io/apimachinery/pkg/api/meta"
15 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
17 "k8s.io/apimachinery/pkg/runtime"
18 "k8s.io/apimachinery/pkg/runtime/schema"
19 "k8s.io/apimachinery/pkg/util/diff"
20 "k8s.io/apimachinery/pkg/util/jsonmergepatch"
21 "k8s.io/apimachinery/pkg/util/sets"
22 "k8s.io/apimachinery/pkg/util/strategicpatch"
23 "k8s.io/apimachinery/pkg/util/wait"
24 "k8s.io/client-go/discovery"
25 "k8s.io/client-go/dynamic"
26 "k8s.io/client-go/util/retry"
27 "k8s.io/kube-openapi/pkg/util/proto"
28 "k8s.io/kubectl/pkg/util/openapi"
29
30 "code.hackerspace.pl/hscloud/cluster/tools/kartongips/utils"
31)
32
33const (
34 // AnnotationOrigObject annotation records the resource as it
35 // was most recently specified by kubecfg (serialised to
36 // JSON). This is used as input to the strategic-merge-patch
37 // 3-way merge when performing updates.
38 AnnotationOrigObject = "kubecfg.ksonnet.io/last-applied-configuration"
39
40 // AnnotationGcTag annotation that triggers
41 // garbage collection. Objects with value equal to
42 // command-line flag that are *not* in config will be deleted.
43 //
44 // NB: this is in phase1 of a migration to use a label instead.
45 // At this stage, both label+migration are written, but the
46 // annotation (only) is still used to trigger GC. [gctag-migration]
47 AnnotationGcTag = LabelGcTag
48
49 // LabelGcTag label that triggers garbage collection. Objects
50 // with value equal to command-line flag that are *not* in
51 // config will be deleted.
52 //
53 // NB: this is in phase1 of a migration from an annotation.
54 // At this stage, both label+migration are written, but the
55 // annotation (only) is still used to trigger GC. [gctag-migration]
56 LabelGcTag = "kubecfg.ksonnet.io/garbage-collect-tag"
57
58 // AnnotationGcStrategy controls gc logic. Current values:
59 // `auto` (default if absent) - do garbage collection
60 // `ignore` - never garbage collect this object
61 AnnotationGcStrategy = "kubecfg.ksonnet.io/garbage-collect-strategy"
62
63 // GcStrategyAuto is the default automatic gc logic
64 GcStrategyAuto = "auto"
65 // GcStrategyIgnore means this object should be ignored by garbage collection
66 GcStrategyIgnore = "ignore"
67)
68
69var (
70 gkCRD = schema.GroupKind{Group: "apiextensions.k8s.io", Kind: "CustomResourceDefinition"}
71)
72
73// UpdateCmd represents the update subcommand
74type UpdateCmd struct {
75 Client dynamic.Interface
76 Mapper meta.RESTMapper
77 Discovery discovery.DiscoveryInterface
78 DefaultNamespace string
79
80 Create bool
81 GcTag string
82 SkipGc bool
83 DryRun bool
84}
85
86func isValidKindSchema(schema proto.Schema) bool {
87 if schema == nil {
88 return false
89 }
90 patchMeta := strategicpatch.NewPatchMetaFromOpenAPI(schema)
91 _, _, err := patchMeta.LookupPatchMetadataForStruct("metadata")
92 if err != nil {
93 log.Debugf("Rejecting schema due to missing 'metadata' property (encountered %q)", err)
94 }
95 return err == nil
96}
97
98func patch(existing, new *unstructured.Unstructured, schema proto.Schema) (*unstructured.Unstructured, error) {
99 annos := existing.GetAnnotations()
100 var origData []byte
101 if data := annos[AnnotationOrigObject]; data != "" {
102 tmp := unstructured.Unstructured{}
103 err := utils.CompactDecodeObject(data, &tmp)
104 if err != nil {
105 return nil, err
106 }
107 origData, err = tmp.MarshalJSON()
108 if err != nil {
109 return nil, err
110 }
111 }
112
113 log.Debugf("origData: %s", origData)
114
115 new = new.DeepCopy()
116 utils.DeleteMetaDataAnnotation(new, AnnotationOrigObject)
117 data, err := utils.CompactEncodeObject(new)
118 if err != nil {
119 return nil, err
120 }
121 utils.SetMetaDataAnnotation(new, AnnotationOrigObject, data)
122
123 // Note origData may be empty if last-applied annotation didn't exist
124
125 newData, err := new.MarshalJSON()
126 if err != nil {
127 return nil, err
128 }
129
130 existingData, err := existing.MarshalJSON()
131 if err != nil {
132 return nil, err
133 }
134
135 var resData []byte
136 if schema == nil {
137 // No schema information - fallback to JSON merge patch
138 patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(origData, newData, existingData)
139 if err != nil {
140 return nil, err
141 }
142 resData, err = jsonpatch.MergePatch(existingData, patch)
143 if err != nil {
144 return nil, err
145 }
146 } else {
147 patchMeta := strategicpatch.NewPatchMetaFromOpenAPI(schema)
148
149 patch, err := strategicpatch.CreateThreeWayMergePatch(origData, newData, existingData, patchMeta, true)
150 if err != nil {
151 return nil, err
152 }
153 resData, err = strategicpatch.StrategicMergePatchUsingLookupPatchMeta(existingData, patch, patchMeta)
154 if err != nil {
155 return nil, err
156 }
157 }
158
159 result, _, err := unstructured.UnstructuredJSONScheme.Decode(resData, nil, nil)
160 if err != nil {
161 return nil, err
162 }
163
164 return result.(*unstructured.Unstructured), nil
165}
166
167func createOrUpdate(ctx context.Context, rc dynamic.ResourceInterface, obj *unstructured.Unstructured, create bool, dryRun bool, schema proto.Schema, desc, dryRunText string) (*unstructured.Unstructured, error) {
168 existing, err := rc.Get(ctx, obj.GetName(), metav1.GetOptions{})
169 if create && errors.IsNotFound(err) {
170 log.Info("Creating ", desc, dryRunText)
171
172 data, err := utils.CompactEncodeObject(obj)
173 if err != nil {
174 return nil, err
175 }
176 utils.SetMetaDataAnnotation(obj, AnnotationOrigObject, data)
177
178 if dryRun {
179 return obj, nil
180 }
181 newobj, err := rc.Create(ctx, obj, metav1.CreateOptions{})
182 log.Debugf("Create(%s) returned (%v, %v)", obj.GetName(), newobj, err)
183 return newobj, err
184 }
185 if err != nil {
186 return nil, err
187 }
188
189 mergedObj, err := patch(existing, obj, schema)
190 if err != nil {
191 return nil, err
192 }
193
194 // Kubernetes is a bit odd when/how it reports
195 // metadata.creationTimestamp. Here, patch() gets confused by
196 // the explicit creationTimestamp=null (it's not omitEmpty).
197 // It's easiest here to just nuke any existing timestamp,
198 // since we don't care.
199 if ts := mergedObj.GetCreationTimestamp(); ts.IsZero() {
200 existing.SetCreationTimestamp(metav1.Time{})
201 }
202 if apiequality.Semantic.DeepEqual(existing, mergedObj) {
203 log.Debugf("Not updating %s - unchanged", desc)
204 return mergedObj, nil
205 }
206
207 log.Debug("About to make change: ", diff.ObjectDiff(existing, mergedObj))
208 log.Info("Updating ", desc, dryRunText)
209 if dryRun {
210 return mergedObj, nil
211 }
212 newobj, err := rc.Update(ctx, mergedObj, metav1.UpdateOptions{})
213 log.Debugf("Update(%s) returned (%v, %v)", mergedObj.GetName(), newobj, err)
214 if err != nil {
215 log.Debug("Updated object: ", diff.ObjectDiff(existing, newobj))
216 }
217 return newobj, err
218}
219
220// CustomResourceDefinitions modify the discovery metadata, so need
221// some extra help. NB: This is also true of other things like
222// APIService registrations - we don't handle those automatically yet
223// (and perhaps never will in the full general case).
224func isSchemaEstablished(obj *unstructured.Unstructured) bool {
225 if obj.GroupVersionKind().GroupKind() != gkCRD {
226 // Not a CRD
227 return true
228 }
229
230 crd := apiext_v1b1.CustomResourceDefinition{}
231 converter := runtime.DefaultUnstructuredConverter
232 if err := converter.FromUnstructured(obj.UnstructuredContent(), &crd); err != nil {
233 log.Warnf("failed to parse CustomResourceDefinition: %v", err)
234 return false // retry
235 }
236
237 for _, cond := range crd.Status.Conditions {
238 if cond.Type == apiext_v1b1.Established && cond.Status == apiext_v1b1.ConditionTrue {
239 return true
240 }
241 }
242 return false
243}
244
245func waitForSchemaChange(ctx context.Context, disco discovery.DiscoveryInterface, rc dynamic.ResourceInterface, obj *unstructured.Unstructured) {
246 if isSchemaEstablished(obj) {
247 return
248 }
249 log.Debugf("Waiting for schema change from %v to become established", obj.GetName())
250 err := wait.Poll(100*time.Millisecond, 30*time.Minute, func() (bool, error) {
251 // Re-fetch discovery metadata
252 utils.MaybeMarkStale(disco)
253
254 var err error
255 obj, err = rc.Get(ctx, obj.GetName(), metav1.GetOptions{})
256 if err != nil {
257 if errors.IsNotFound(err) {
258 // continue polling
259 return false, nil
260 }
261 return false, err
262 }
263
264 return isSchemaEstablished(obj), nil
265 })
266 if err != nil {
267 log.Warnf("Encountered an error while waiting for new schema change to propagate (%v). Ignoring and continuing, which may lead to further errors.", err)
268 }
269}
270
271// Run executes the update command
272func (c UpdateCmd) Run(ctx context.Context, apiObjects []*unstructured.Unstructured) error {
273 dryRunText := ""
274 if c.DryRun {
275 dryRunText = " (dry-run)"
276 }
277
278 log.Infof("Fetching schemas for %d resources", len(apiObjects))
279 depOrder, err := utils.DependencyOrder(c.Discovery, c.Mapper, apiObjects)
280 if err != nil {
281 return err
282 }
283 sort.Sort(depOrder)
284
285 seenUids := sets.NewString()
286
287 schemaDoc, err := c.Discovery.OpenAPISchema()
288 if err != nil {
289 return err
290 }
291 schemaResources, err := openapi.NewOpenAPIData(schemaDoc)
292 if err != nil {
293 return err
294 }
295
296 for _, obj := range apiObjects {
297 log.Debugf("Starting update of %s", utils.FqName(obj))
298
299 if c.GcTag != "" {
300 // [gctag-migration]: Remove annotation in phase2
301 utils.SetMetaDataAnnotation(obj, AnnotationGcTag, c.GcTag)
302 utils.SetMetaDataLabel(obj, LabelGcTag, c.GcTag)
303 }
304
305 desc := fmt.Sprintf("%s %s", utils.ResourceNameFor(c.Mapper, obj), utils.FqName(obj))
306
307 rc, err := utils.ClientForResource(c.Client, c.Mapper, obj, c.DefaultNamespace)
308 if err != nil {
309 return err
310 }
311
312 schema := schemaResources.LookupResource(obj.GroupVersionKind())
313 if !isValidKindSchema(schema) {
314 // Invalid schema (eg: custom resource without
315 // schema returns trivial type:object with k8s >=1.15)
316 log.Debugf("Ignoring invalid schema for %s", obj.GroupVersionKind())
317 schema = nil
318 }
319
320 var newobj *unstructured.Unstructured
321 err = retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
322 newobj, err = createOrUpdate(ctx, rc, obj, c.Create, c.DryRun, schema, desc, dryRunText)
323 return
324 })
325 if err != nil {
326 return fmt.Errorf("Error updating %s: %s", desc, err)
327 }
328
329 // Some objects appear under multiple kinds
330 // (eg: Deployment is both extensions/v1beta1
331 // and apps/v1beta1). UID is the only stable
332 // identifier that links these two views of
333 // the same object.
334 seenUids.Insert(string(newobj.GetUID()))
335
336 // Don't wait for CRDs to settle schema under DryRun
337 if !c.DryRun {
338 waitForSchemaChange(ctx, c.Discovery, rc, newobj)
339 }
340 }
341
342 if c.GcTag != "" && !c.SkipGc {
343 version, err := utils.FetchVersion(c.Discovery)
344 if err != nil {
345 version = utils.GetDefaultVersion()
346 log.Warnf("Unable to parse server version. Received %v. Using default %s", err, version.String())
347 }
348
349 // [gctag-migration]: Add LabelGcTag==c.GcTag to ListOptions.LabelSelector in phase2
350 err = walkObjects(ctx, c.Client, c.Discovery, metav1.ListOptions{}, func(o runtime.Object) error {
351 meta, err := meta.Accessor(o)
352 if err != nil {
353 return err
354 }
355 gvk := o.GetObjectKind().GroupVersionKind()
356 desc := fmt.Sprintf("%s %s (%s)", utils.ResourceNameFor(c.Mapper, o), utils.FqName(meta), gvk.GroupVersion())
357 log.Debugf("Considering %v for gc", desc)
358 if eligibleForGc(meta, c.GcTag) && !seenUids.Has(string(meta.GetUID())) {
359 log.Info("Garbage collecting ", desc, dryRunText)
360 if !c.DryRun {
361 err := gcDelete(ctx, c.Client, c.Mapper, &version, o)
362 if err != nil {
363 return err
364 }
365 }
366 }
367 return nil
368 })
369 if err != nil {
370 return err
371 }
372 }
373
374 return nil
375}
376
377func stringListContains(list []string, value string) bool {
378 for _, item := range list {
379 if item == value {
380 return true
381 }
382 }
383 return false
384}
385
386func gcDelete(ctx context.Context, client dynamic.Interface, mapper meta.RESTMapper, version *utils.ServerVersion, o runtime.Object) error {
387 obj, err := meta.Accessor(o)
388 if err != nil {
389 return fmt.Errorf("Unexpected object type: %s", err)
390 }
391
392 uid := obj.GetUID()
393 desc := fmt.Sprintf("%s %s", utils.ResourceNameFor(mapper, o), utils.FqName(obj))
394
395 deleteOpts := metav1.DeleteOptions{
396 Preconditions: &metav1.Preconditions{UID: &uid},
397 }
398 if version.Compare(1, 6) < 0 {
399 // 1.5.x option
400 boolFalse := false
401 deleteOpts.OrphanDependents = &boolFalse
402 } else {
403 // 1.6.x option (NB: Background is broken)
404 fg := metav1.DeletePropagationForeground
405 deleteOpts.PropagationPolicy = &fg
406 }
407
408 c, err := utils.ClientForResource(client, mapper, o, metav1.NamespaceNone)
409 if err != nil {
410 return err
411 }
412
413 err = c.Delete(ctx, obj.GetName(), deleteOpts)
414 if err != nil && (errors.IsNotFound(err) || errors.IsConflict(err)) {
415 // We lost a race with something else changing the object
416 log.Debugf("Ignoring error while deleting %s: %s", desc, err)
417 err = nil
418 }
419 if err != nil {
420 return fmt.Errorf("Error deleting %s: %s", desc, err)
421 }
422
423 return nil
424}
425
426func walkObjects(ctx context.Context, client dynamic.Interface, disco discovery.DiscoveryInterface, listopts metav1.ListOptions, callback func(runtime.Object) error) error {
427 rsrclists, err := disco.ServerResources()
428 if err != nil {
429 return err
430 }
431 for _, rsrclist := range rsrclists {
432 gv, err := schema.ParseGroupVersion(rsrclist.GroupVersion)
433 if err != nil {
434 return err
435 }
436
437 for _, rsrc := range rsrclist.APIResources {
438 if !stringListContains(rsrc.Verbs, "list") {
439 log.Debugf("Don't know how to list %#v, skipping", rsrc)
440 continue
441 }
442
443 gvr := gv.WithResource(rsrc.Name)
444 if rsrc.Group != "" {
445 gvr.Group = rsrc.Group
446 }
447 if rsrc.Version != "" {
448 gvr.Version = rsrc.Version
449 }
450
451 var rc dynamic.ResourceInterface
452 if rsrc.Namespaced {
453 rc = client.Resource(gvr).Namespace(metav1.NamespaceAll)
454 } else {
455 rc = client.Resource(gvr)
456 }
457
458 log.Debugf("Listing %s", gvr)
459 obj, err := rc.List(ctx, listopts)
460 if err != nil {
461 return err
462 }
463 if err = meta.EachListItem(obj, callback); err != nil {
464 return err
465 }
466 }
467 }
468 return nil
469}
470
471func eligibleForGc(obj metav1.Object, gcTag string) bool {
472 for _, ref := range obj.GetOwnerReferences() {
473 if ref.Controller != nil && *ref.Controller {
474 // Has a controller ref
475 return false
476 }
477 }
478
479 a := obj.GetAnnotations()
480
481 strategy, ok := a[AnnotationGcStrategy]
482 if !ok {
483 strategy = GcStrategyAuto
484 }
485
486 // [gctag-migration]: Check *label* == tag instead in phase2
487 return a[AnnotationGcTag] == gcTag &&
488 strategy == GcStrategyAuto
489}