blob: d035c2ee270764abd98927eff0af76f79686ed4c [file] [log] [blame]
Serge Bazanskibe538db2020-11-12 00:22:42 +01001package kubecfg
2
3import (
4 "context"
5 "fmt"
6 "sort"
7 "time"
8
9 jsonpatch "github.com/evanphx/json-patch"
10 log "github.com/sirupsen/logrus"
11 apiext_v1b1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
12 apiequality "k8s.io/apimachinery/pkg/api/equality"
13 "k8s.io/apimachinery/pkg/api/errors"
14 "k8s.io/apimachinery/pkg/api/meta"
15 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
17 "k8s.io/apimachinery/pkg/runtime"
18 "k8s.io/apimachinery/pkg/runtime/schema"
19 "k8s.io/apimachinery/pkg/util/diff"
20 "k8s.io/apimachinery/pkg/util/jsonmergepatch"
21 "k8s.io/apimachinery/pkg/util/sets"
22 "k8s.io/apimachinery/pkg/util/strategicpatch"
23 "k8s.io/apimachinery/pkg/util/wait"
24 "k8s.io/client-go/discovery"
25 "k8s.io/client-go/dynamic"
26 "k8s.io/client-go/util/retry"
27 "k8s.io/kube-openapi/pkg/util/proto"
28 "k8s.io/kubectl/pkg/util/openapi"
29
30 "code.hackerspace.pl/hscloud/cluster/tools/kartongips/utils"
31)
32
33const (
34 // AnnotationOrigObject annotation records the resource as it
35 // was most recently specified by kubecfg (serialised to
36 // JSON). This is used as input to the strategic-merge-patch
37 // 3-way merge when performing updates.
38 AnnotationOrigObject = "kubecfg.ksonnet.io/last-applied-configuration"
39
40 // AnnotationGcTag annotation that triggers
41 // garbage collection. Objects with value equal to
42 // command-line flag that are *not* in config will be deleted.
43 //
44 // NB: this is in phase1 of a migration to use a label instead.
45 // At this stage, both label+migration are written, but the
46 // annotation (only) is still used to trigger GC. [gctag-migration]
47 AnnotationGcTag = LabelGcTag
48
49 // LabelGcTag label that triggers garbage collection. Objects
50 // with value equal to command-line flag that are *not* in
51 // config will be deleted.
52 //
53 // NB: this is in phase1 of a migration from an annotation.
54 // At this stage, both label+migration are written, but the
55 // annotation (only) is still used to trigger GC. [gctag-migration]
56 LabelGcTag = "kubecfg.ksonnet.io/garbage-collect-tag"
57
58 // AnnotationGcStrategy controls gc logic. Current values:
59 // `auto` (default if absent) - do garbage collection
60 // `ignore` - never garbage collect this object
61 AnnotationGcStrategy = "kubecfg.ksonnet.io/garbage-collect-strategy"
62
63 // GcStrategyAuto is the default automatic gc logic
64 GcStrategyAuto = "auto"
65 // GcStrategyIgnore means this object should be ignored by garbage collection
66 GcStrategyIgnore = "ignore"
67)
68
69var (
70 gkCRD = schema.GroupKind{Group: "apiextensions.k8s.io", Kind: "CustomResourceDefinition"}
71)
72
73// UpdateCmd represents the update subcommand
74type UpdateCmd struct {
75 Client dynamic.Interface
76 Mapper meta.RESTMapper
77 Discovery discovery.DiscoveryInterface
78 DefaultNamespace string
79
80 Create bool
81 GcTag string
82 SkipGc bool
83 DryRun bool
84}
85
86func isValidKindSchema(schema proto.Schema) bool {
87 if schema == nil {
88 return false
89 }
90 patchMeta := strategicpatch.NewPatchMetaFromOpenAPI(schema)
91 _, _, err := patchMeta.LookupPatchMetadataForStruct("metadata")
92 if err != nil {
93 log.Debugf("Rejecting schema due to missing 'metadata' property (encountered %q)", err)
94 }
95 return err == nil
96}
97
98func patch(existing, new *unstructured.Unstructured, schema proto.Schema) (*unstructured.Unstructured, error) {
99 annos := existing.GetAnnotations()
100 var origData []byte
101 if data := annos[AnnotationOrigObject]; data != "" {
102 tmp := unstructured.Unstructured{}
103 err := utils.CompactDecodeObject(data, &tmp)
104 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000105 return nil, fmt.Errorf("CompactDecodeObject original object: %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100106 }
107 origData, err = tmp.MarshalJSON()
108 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000109 return nil, fmt.Errorf("MarshalJSON original object: %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100110 }
111 }
112
113 log.Debugf("origData: %s", origData)
114
115 new = new.DeepCopy()
116 utils.DeleteMetaDataAnnotation(new, AnnotationOrigObject)
117 data, err := utils.CompactEncodeObject(new)
118 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000119 return nil, fmt.Errorf("CompactEncodeObject: %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100120 }
121 utils.SetMetaDataAnnotation(new, AnnotationOrigObject, data)
122
123 // Note origData may be empty if last-applied annotation didn't exist
124
125 newData, err := new.MarshalJSON()
126 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000127 return nil, fmt.Errorf("MarshalJSON new: %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100128 }
129
130 existingData, err := existing.MarshalJSON()
131 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000132 return nil, fmt.Errorf("MarshalJSON new: %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100133 }
134
Serge Bazanski6579e842021-09-11 14:58:46 +0000135 schemaless := func() ([]byte, error) {
Serge Bazanskibe538db2020-11-12 00:22:42 +0100136 patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(origData, newData, existingData)
137 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000138 return nil, fmt.Errorf("CreateThreeWayJSONMergePatch (schemaless): %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100139 }
Serge Bazanski6579e842021-09-11 14:58:46 +0000140 resData, err := jsonpatch.MergePatch(existingData, patch)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100141 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000142 return nil, fmt.Errorf("MergePatch (schemaless): %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100143 }
Serge Bazanski6579e842021-09-11 14:58:46 +0000144 return resData, nil
145 }
146 schemaful := func() ([]byte, error) {
Serge Bazanskibe538db2020-11-12 00:22:42 +0100147 patchMeta := strategicpatch.NewPatchMetaFromOpenAPI(schema)
148
149 patch, err := strategicpatch.CreateThreeWayMergePatch(origData, newData, existingData, patchMeta, true)
150 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000151 return nil, fmt.Errorf("CreateThreeWayMergePatch (schemaful): %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100152 }
Serge Bazanski6579e842021-09-11 14:58:46 +0000153 resData, err := strategicpatch.StrategicMergePatchUsingLookupPatchMeta(existingData, patch, patchMeta)
154 if err != nil {
155 return nil, fmt.Errorf("StrategicMergePatch (schemaful): %w", err)
156 }
157 return resData, nil
158 }
159
160 var resData []byte
161 if schema == nil {
162 resData, err = schemaless()
Serge Bazanskibe538db2020-11-12 00:22:42 +0100163 if err != nil {
164 return nil, err
165 }
Serge Bazanski6579e842021-09-11 14:58:46 +0000166 } else {
167 resData, err = schemaful()
168 if err != nil {
169 log.Warningf("Schemaful/Three-way merge failed (%v), attempting schemaless/JSON merge...", err)
170 resData, err = schemaless()
171 if err != nil {
172 return nil, err
173 }
174 }
Serge Bazanskibe538db2020-11-12 00:22:42 +0100175 }
176
177 result, _, err := unstructured.UnstructuredJSONScheme.Decode(resData, nil, nil)
178 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000179 return nil, fmt.Errorf("Decode: %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100180 }
181
182 return result.(*unstructured.Unstructured), nil
183}
184
185func createOrUpdate(ctx context.Context, rc dynamic.ResourceInterface, obj *unstructured.Unstructured, create bool, dryRun bool, schema proto.Schema, desc, dryRunText string) (*unstructured.Unstructured, error) {
186 existing, err := rc.Get(ctx, obj.GetName(), metav1.GetOptions{})
187 if create && errors.IsNotFound(err) {
188 log.Info("Creating ", desc, dryRunText)
189
190 data, err := utils.CompactEncodeObject(obj)
191 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000192 return nil, fmt.Errorf("CompactEncodeObject: %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100193 }
194 utils.SetMetaDataAnnotation(obj, AnnotationOrigObject, data)
195
196 if dryRun {
197 return obj, nil
198 }
199 newobj, err := rc.Create(ctx, obj, metav1.CreateOptions{})
200 log.Debugf("Create(%s) returned (%v, %v)", obj.GetName(), newobj, err)
201 return newobj, err
202 }
203 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000204 return nil, fmt.Errorf("Get: %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100205 }
206
207 mergedObj, err := patch(existing, obj, schema)
208 if err != nil {
Serge Bazanski6579e842021-09-11 14:58:46 +0000209 return nil, fmt.Errorf("patch: %w", err)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100210 }
211
212 // Kubernetes is a bit odd when/how it reports
213 // metadata.creationTimestamp. Here, patch() gets confused by
214 // the explicit creationTimestamp=null (it's not omitEmpty).
215 // It's easiest here to just nuke any existing timestamp,
216 // since we don't care.
217 if ts := mergedObj.GetCreationTimestamp(); ts.IsZero() {
218 existing.SetCreationTimestamp(metav1.Time{})
219 }
220 if apiequality.Semantic.DeepEqual(existing, mergedObj) {
221 log.Debugf("Not updating %s - unchanged", desc)
222 return mergedObj, nil
223 }
224
225 log.Debug("About to make change: ", diff.ObjectDiff(existing, mergedObj))
226 log.Info("Updating ", desc, dryRunText)
227 if dryRun {
Serge Bazanski6579e842021-09-11 14:58:46 +0000228 return mergedObj, fmt.Errorf("ObjectDiff: %v", nil)
Serge Bazanskibe538db2020-11-12 00:22:42 +0100229 }
230 newobj, err := rc.Update(ctx, mergedObj, metav1.UpdateOptions{})
231 log.Debugf("Update(%s) returned (%v, %v)", mergedObj.GetName(), newobj, err)
232 if err != nil {
233 log.Debug("Updated object: ", diff.ObjectDiff(existing, newobj))
234 }
235 return newobj, err
236}
237
238// CustomResourceDefinitions modify the discovery metadata, so need
239// some extra help. NB: This is also true of other things like
240// APIService registrations - we don't handle those automatically yet
241// (and perhaps never will in the full general case).
242func isSchemaEstablished(obj *unstructured.Unstructured) bool {
243 if obj.GroupVersionKind().GroupKind() != gkCRD {
244 // Not a CRD
245 return true
246 }
247
248 crd := apiext_v1b1.CustomResourceDefinition{}
249 converter := runtime.DefaultUnstructuredConverter
250 if err := converter.FromUnstructured(obj.UnstructuredContent(), &crd); err != nil {
251 log.Warnf("failed to parse CustomResourceDefinition: %v", err)
252 return false // retry
253 }
254
255 for _, cond := range crd.Status.Conditions {
256 if cond.Type == apiext_v1b1.Established && cond.Status == apiext_v1b1.ConditionTrue {
257 return true
258 }
259 }
260 return false
261}
262
263func waitForSchemaChange(ctx context.Context, disco discovery.DiscoveryInterface, rc dynamic.ResourceInterface, obj *unstructured.Unstructured) {
264 if isSchemaEstablished(obj) {
265 return
266 }
267 log.Debugf("Waiting for schema change from %v to become established", obj.GetName())
268 err := wait.Poll(100*time.Millisecond, 30*time.Minute, func() (bool, error) {
269 // Re-fetch discovery metadata
270 utils.MaybeMarkStale(disco)
271
272 var err error
273 obj, err = rc.Get(ctx, obj.GetName(), metav1.GetOptions{})
274 if err != nil {
275 if errors.IsNotFound(err) {
276 // continue polling
277 return false, nil
278 }
279 return false, err
280 }
281
282 return isSchemaEstablished(obj), nil
283 })
284 if err != nil {
285 log.Warnf("Encountered an error while waiting for new schema change to propagate (%v). Ignoring and continuing, which may lead to further errors.", err)
286 }
287}
288
289// Run executes the update command
290func (c UpdateCmd) Run(ctx context.Context, apiObjects []*unstructured.Unstructured) error {
291 dryRunText := ""
292 if c.DryRun {
293 dryRunText = " (dry-run)"
294 }
295
296 log.Infof("Fetching schemas for %d resources", len(apiObjects))
297 depOrder, err := utils.DependencyOrder(c.Discovery, c.Mapper, apiObjects)
298 if err != nil {
299 return err
300 }
301 sort.Sort(depOrder)
302
303 seenUids := sets.NewString()
304
305 schemaDoc, err := c.Discovery.OpenAPISchema()
306 if err != nil {
307 return err
308 }
309 schemaResources, err := openapi.NewOpenAPIData(schemaDoc)
310 if err != nil {
311 return err
312 }
313
314 for _, obj := range apiObjects {
315 log.Debugf("Starting update of %s", utils.FqName(obj))
316
317 if c.GcTag != "" {
318 // [gctag-migration]: Remove annotation in phase2
319 utils.SetMetaDataAnnotation(obj, AnnotationGcTag, c.GcTag)
320 utils.SetMetaDataLabel(obj, LabelGcTag, c.GcTag)
321 }
322
323 desc := fmt.Sprintf("%s %s", utils.ResourceNameFor(c.Mapper, obj), utils.FqName(obj))
324
325 rc, err := utils.ClientForResource(c.Client, c.Mapper, obj, c.DefaultNamespace)
326 if err != nil {
327 return err
328 }
329
330 schema := schemaResources.LookupResource(obj.GroupVersionKind())
331 if !isValidKindSchema(schema) {
332 // Invalid schema (eg: custom resource without
333 // schema returns trivial type:object with k8s >=1.15)
334 log.Debugf("Ignoring invalid schema for %s", obj.GroupVersionKind())
335 schema = nil
336 }
337
338 var newobj *unstructured.Unstructured
339 err = retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
340 newobj, err = createOrUpdate(ctx, rc, obj, c.Create, c.DryRun, schema, desc, dryRunText)
341 return
342 })
343 if err != nil {
344 return fmt.Errorf("Error updating %s: %s", desc, err)
345 }
346
347 // Some objects appear under multiple kinds
348 // (eg: Deployment is both extensions/v1beta1
349 // and apps/v1beta1). UID is the only stable
350 // identifier that links these two views of
351 // the same object.
352 seenUids.Insert(string(newobj.GetUID()))
353
354 // Don't wait for CRDs to settle schema under DryRun
355 if !c.DryRun {
356 waitForSchemaChange(ctx, c.Discovery, rc, newobj)
357 }
358 }
359
360 if c.GcTag != "" && !c.SkipGc {
361 version, err := utils.FetchVersion(c.Discovery)
362 if err != nil {
363 version = utils.GetDefaultVersion()
364 log.Warnf("Unable to parse server version. Received %v. Using default %s", err, version.String())
365 }
366
367 // [gctag-migration]: Add LabelGcTag==c.GcTag to ListOptions.LabelSelector in phase2
368 err = walkObjects(ctx, c.Client, c.Discovery, metav1.ListOptions{}, func(o runtime.Object) error {
369 meta, err := meta.Accessor(o)
370 if err != nil {
371 return err
372 }
373 gvk := o.GetObjectKind().GroupVersionKind()
374 desc := fmt.Sprintf("%s %s (%s)", utils.ResourceNameFor(c.Mapper, o), utils.FqName(meta), gvk.GroupVersion())
375 log.Debugf("Considering %v for gc", desc)
376 if eligibleForGc(meta, c.GcTag) && !seenUids.Has(string(meta.GetUID())) {
377 log.Info("Garbage collecting ", desc, dryRunText)
378 if !c.DryRun {
379 err := gcDelete(ctx, c.Client, c.Mapper, &version, o)
380 if err != nil {
381 return err
382 }
383 }
384 }
385 return nil
386 })
387 if err != nil {
388 return err
389 }
390 }
391
392 return nil
393}
394
395func stringListContains(list []string, value string) bool {
396 for _, item := range list {
397 if item == value {
398 return true
399 }
400 }
401 return false
402}
403
404func gcDelete(ctx context.Context, client dynamic.Interface, mapper meta.RESTMapper, version *utils.ServerVersion, o runtime.Object) error {
405 obj, err := meta.Accessor(o)
406 if err != nil {
407 return fmt.Errorf("Unexpected object type: %s", err)
408 }
409
410 uid := obj.GetUID()
411 desc := fmt.Sprintf("%s %s", utils.ResourceNameFor(mapper, o), utils.FqName(obj))
412
413 deleteOpts := metav1.DeleteOptions{
414 Preconditions: &metav1.Preconditions{UID: &uid},
415 }
416 if version.Compare(1, 6) < 0 {
417 // 1.5.x option
418 boolFalse := false
419 deleteOpts.OrphanDependents = &boolFalse
420 } else {
421 // 1.6.x option (NB: Background is broken)
422 fg := metav1.DeletePropagationForeground
423 deleteOpts.PropagationPolicy = &fg
424 }
425
426 c, err := utils.ClientForResource(client, mapper, o, metav1.NamespaceNone)
427 if err != nil {
428 return err
429 }
430
431 err = c.Delete(ctx, obj.GetName(), deleteOpts)
432 if err != nil && (errors.IsNotFound(err) || errors.IsConflict(err)) {
433 // We lost a race with something else changing the object
434 log.Debugf("Ignoring error while deleting %s: %s", desc, err)
435 err = nil
436 }
437 if err != nil {
438 return fmt.Errorf("Error deleting %s: %s", desc, err)
439 }
440
441 return nil
442}
443
444func walkObjects(ctx context.Context, client dynamic.Interface, disco discovery.DiscoveryInterface, listopts metav1.ListOptions, callback func(runtime.Object) error) error {
445 rsrclists, err := disco.ServerResources()
446 if err != nil {
447 return err
448 }
449 for _, rsrclist := range rsrclists {
450 gv, err := schema.ParseGroupVersion(rsrclist.GroupVersion)
451 if err != nil {
452 return err
453 }
454
455 for _, rsrc := range rsrclist.APIResources {
456 if !stringListContains(rsrc.Verbs, "list") {
457 log.Debugf("Don't know how to list %#v, skipping", rsrc)
458 continue
459 }
460
461 gvr := gv.WithResource(rsrc.Name)
462 if rsrc.Group != "" {
463 gvr.Group = rsrc.Group
464 }
465 if rsrc.Version != "" {
466 gvr.Version = rsrc.Version
467 }
468
469 var rc dynamic.ResourceInterface
470 if rsrc.Namespaced {
471 rc = client.Resource(gvr).Namespace(metav1.NamespaceAll)
472 } else {
473 rc = client.Resource(gvr)
474 }
475
476 log.Debugf("Listing %s", gvr)
477 obj, err := rc.List(ctx, listopts)
478 if err != nil {
479 return err
480 }
481 if err = meta.EachListItem(obj, callback); err != nil {
482 return err
483 }
484 }
485 }
486 return nil
487}
488
489func eligibleForGc(obj metav1.Object, gcTag string) bool {
490 for _, ref := range obj.GetOwnerReferences() {
491 if ref.Controller != nil && *ref.Controller {
492 // Has a controller ref
493 return false
494 }
495 }
496
497 a := obj.GetAnnotations()
498
499 strategy, ok := a[AnnotationGcStrategy]
500 if !ok {
501 strategy = GcStrategyAuto
502 }
503
504 // [gctag-migration]: Check *label* == tag instead in phase2
505 return a[AnnotationGcTag] == gcTag &&
506 strategy == GcStrategyAuto
507}