| package kubecfg |
| |
| import ( |
| "context" |
| "fmt" |
| "sort" |
| "time" |
| |
| jsonpatch "github.com/evanphx/json-patch" |
| log "github.com/sirupsen/logrus" |
| apiext_v1b1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" |
| apiequality "k8s.io/apimachinery/pkg/api/equality" |
| "k8s.io/apimachinery/pkg/api/errors" |
| "k8s.io/apimachinery/pkg/api/meta" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/runtime/schema" |
| "k8s.io/apimachinery/pkg/util/diff" |
| "k8s.io/apimachinery/pkg/util/jsonmergepatch" |
| "k8s.io/apimachinery/pkg/util/sets" |
| "k8s.io/apimachinery/pkg/util/strategicpatch" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/client-go/discovery" |
| "k8s.io/client-go/dynamic" |
| "k8s.io/client-go/util/retry" |
| "k8s.io/kube-openapi/pkg/util/proto" |
| "k8s.io/kubectl/pkg/util/openapi" |
| |
| "code.hackerspace.pl/hscloud/cluster/tools/kartongips/utils" |
| ) |
| |
| const ( |
| // AnnotationOrigObject annotation records the resource as it |
| // was most recently specified by kubecfg (serialised to |
| // JSON). This is used as input to the strategic-merge-patch |
| // 3-way merge when performing updates. |
| AnnotationOrigObject = "kubecfg.ksonnet.io/last-applied-configuration" |
| |
| // AnnotationGcTag annotation that triggers |
| // garbage collection. Objects with value equal to |
| // command-line flag that are *not* in config will be deleted. |
| // |
| // NB: this is in phase1 of a migration to use a label instead. |
| // At this stage, both label+migration are written, but the |
| // annotation (only) is still used to trigger GC. [gctag-migration] |
| AnnotationGcTag = LabelGcTag |
| |
| // LabelGcTag label that triggers garbage collection. Objects |
| // with value equal to command-line flag that are *not* in |
| // config will be deleted. |
| // |
| // NB: this is in phase1 of a migration from an annotation. |
| // At this stage, both label+migration are written, but the |
| // annotation (only) is still used to trigger GC. [gctag-migration] |
| LabelGcTag = "kubecfg.ksonnet.io/garbage-collect-tag" |
| |
| // AnnotationGcStrategy controls gc logic. Current values: |
| // `auto` (default if absent) - do garbage collection |
| // `ignore` - never garbage collect this object |
| AnnotationGcStrategy = "kubecfg.ksonnet.io/garbage-collect-strategy" |
| |
| // GcStrategyAuto is the default automatic gc logic |
| GcStrategyAuto = "auto" |
| // GcStrategyIgnore means this object should be ignored by garbage collection |
| GcStrategyIgnore = "ignore" |
| ) |
| |
| var ( |
| gkCRD = schema.GroupKind{Group: "apiextensions.k8s.io", Kind: "CustomResourceDefinition"} |
| ) |
| |
| // UpdateCmd represents the update subcommand |
| type UpdateCmd struct { |
| Client dynamic.Interface |
| Mapper meta.RESTMapper |
| Discovery discovery.DiscoveryInterface |
| DefaultNamespace string |
| |
| Create bool |
| GcTag string |
| SkipGc bool |
| DryRun bool |
| } |
| |
| func isValidKindSchema(schema proto.Schema) bool { |
| if schema == nil { |
| return false |
| } |
| patchMeta := strategicpatch.NewPatchMetaFromOpenAPI(schema) |
| _, _, err := patchMeta.LookupPatchMetadataForStruct("metadata") |
| if err != nil { |
| log.Debugf("Rejecting schema due to missing 'metadata' property (encountered %q)", err) |
| } |
| return err == nil |
| } |
| |
| func patch(existing, new *unstructured.Unstructured, schema proto.Schema) (*unstructured.Unstructured, error) { |
| annos := existing.GetAnnotations() |
| var origData []byte |
| if data := annos[AnnotationOrigObject]; data != "" { |
| tmp := unstructured.Unstructured{} |
| err := utils.CompactDecodeObject(data, &tmp) |
| if err != nil { |
| return nil, err |
| } |
| origData, err = tmp.MarshalJSON() |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| log.Debugf("origData: %s", origData) |
| |
| new = new.DeepCopy() |
| utils.DeleteMetaDataAnnotation(new, AnnotationOrigObject) |
| data, err := utils.CompactEncodeObject(new) |
| if err != nil { |
| return nil, err |
| } |
| utils.SetMetaDataAnnotation(new, AnnotationOrigObject, data) |
| |
| // Note origData may be empty if last-applied annotation didn't exist |
| |
| newData, err := new.MarshalJSON() |
| if err != nil { |
| return nil, err |
| } |
| |
| existingData, err := existing.MarshalJSON() |
| if err != nil { |
| return nil, err |
| } |
| |
| var resData []byte |
| if schema == nil { |
| // No schema information - fallback to JSON merge patch |
| patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(origData, newData, existingData) |
| if err != nil { |
| return nil, err |
| } |
| resData, err = jsonpatch.MergePatch(existingData, patch) |
| if err != nil { |
| return nil, err |
| } |
| } else { |
| patchMeta := strategicpatch.NewPatchMetaFromOpenAPI(schema) |
| |
| patch, err := strategicpatch.CreateThreeWayMergePatch(origData, newData, existingData, patchMeta, true) |
| if err != nil { |
| return nil, err |
| } |
| resData, err = strategicpatch.StrategicMergePatchUsingLookupPatchMeta(existingData, patch, patchMeta) |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| result, _, err := unstructured.UnstructuredJSONScheme.Decode(resData, nil, nil) |
| if err != nil { |
| return nil, err |
| } |
| |
| return result.(*unstructured.Unstructured), nil |
| } |
| |
| func createOrUpdate(ctx context.Context, rc dynamic.ResourceInterface, obj *unstructured.Unstructured, create bool, dryRun bool, schema proto.Schema, desc, dryRunText string) (*unstructured.Unstructured, error) { |
| existing, err := rc.Get(ctx, obj.GetName(), metav1.GetOptions{}) |
| if create && errors.IsNotFound(err) { |
| log.Info("Creating ", desc, dryRunText) |
| |
| data, err := utils.CompactEncodeObject(obj) |
| if err != nil { |
| return nil, err |
| } |
| utils.SetMetaDataAnnotation(obj, AnnotationOrigObject, data) |
| |
| if dryRun { |
| return obj, nil |
| } |
| newobj, err := rc.Create(ctx, obj, metav1.CreateOptions{}) |
| log.Debugf("Create(%s) returned (%v, %v)", obj.GetName(), newobj, err) |
| return newobj, err |
| } |
| if err != nil { |
| return nil, err |
| } |
| |
| mergedObj, err := patch(existing, obj, schema) |
| if err != nil { |
| return nil, err |
| } |
| |
| // Kubernetes is a bit odd when/how it reports |
| // metadata.creationTimestamp. Here, patch() gets confused by |
| // the explicit creationTimestamp=null (it's not omitEmpty). |
| // It's easiest here to just nuke any existing timestamp, |
| // since we don't care. |
| if ts := mergedObj.GetCreationTimestamp(); ts.IsZero() { |
| existing.SetCreationTimestamp(metav1.Time{}) |
| } |
| if apiequality.Semantic.DeepEqual(existing, mergedObj) { |
| log.Debugf("Not updating %s - unchanged", desc) |
| return mergedObj, nil |
| } |
| |
| log.Debug("About to make change: ", diff.ObjectDiff(existing, mergedObj)) |
| log.Info("Updating ", desc, dryRunText) |
| if dryRun { |
| return mergedObj, nil |
| } |
| newobj, err := rc.Update(ctx, mergedObj, metav1.UpdateOptions{}) |
| log.Debugf("Update(%s) returned (%v, %v)", mergedObj.GetName(), newobj, err) |
| if err != nil { |
| log.Debug("Updated object: ", diff.ObjectDiff(existing, newobj)) |
| } |
| return newobj, err |
| } |
| |
| // CustomResourceDefinitions modify the discovery metadata, so need |
| // some extra help. NB: This is also true of other things like |
| // APIService registrations - we don't handle those automatically yet |
| // (and perhaps never will in the full general case). |
| func isSchemaEstablished(obj *unstructured.Unstructured) bool { |
| if obj.GroupVersionKind().GroupKind() != gkCRD { |
| // Not a CRD |
| return true |
| } |
| |
| crd := apiext_v1b1.CustomResourceDefinition{} |
| converter := runtime.DefaultUnstructuredConverter |
| if err := converter.FromUnstructured(obj.UnstructuredContent(), &crd); err != nil { |
| log.Warnf("failed to parse CustomResourceDefinition: %v", err) |
| return false // retry |
| } |
| |
| for _, cond := range crd.Status.Conditions { |
| if cond.Type == apiext_v1b1.Established && cond.Status == apiext_v1b1.ConditionTrue { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func waitForSchemaChange(ctx context.Context, disco discovery.DiscoveryInterface, rc dynamic.ResourceInterface, obj *unstructured.Unstructured) { |
| if isSchemaEstablished(obj) { |
| return |
| } |
| log.Debugf("Waiting for schema change from %v to become established", obj.GetName()) |
| err := wait.Poll(100*time.Millisecond, 30*time.Minute, func() (bool, error) { |
| // Re-fetch discovery metadata |
| utils.MaybeMarkStale(disco) |
| |
| var err error |
| obj, err = rc.Get(ctx, obj.GetName(), metav1.GetOptions{}) |
| if err != nil { |
| if errors.IsNotFound(err) { |
| // continue polling |
| return false, nil |
| } |
| return false, err |
| } |
| |
| return isSchemaEstablished(obj), nil |
| }) |
| if err != nil { |
| log.Warnf("Encountered an error while waiting for new schema change to propagate (%v). Ignoring and continuing, which may lead to further errors.", err) |
| } |
| } |
| |
| // Run executes the update command |
| func (c UpdateCmd) Run(ctx context.Context, apiObjects []*unstructured.Unstructured) error { |
| dryRunText := "" |
| if c.DryRun { |
| dryRunText = " (dry-run)" |
| } |
| |
| log.Infof("Fetching schemas for %d resources", len(apiObjects)) |
| depOrder, err := utils.DependencyOrder(c.Discovery, c.Mapper, apiObjects) |
| if err != nil { |
| return err |
| } |
| sort.Sort(depOrder) |
| |
| seenUids := sets.NewString() |
| |
| schemaDoc, err := c.Discovery.OpenAPISchema() |
| if err != nil { |
| return err |
| } |
| schemaResources, err := openapi.NewOpenAPIData(schemaDoc) |
| if err != nil { |
| return err |
| } |
| |
| for _, obj := range apiObjects { |
| log.Debugf("Starting update of %s", utils.FqName(obj)) |
| |
| if c.GcTag != "" { |
| // [gctag-migration]: Remove annotation in phase2 |
| utils.SetMetaDataAnnotation(obj, AnnotationGcTag, c.GcTag) |
| utils.SetMetaDataLabel(obj, LabelGcTag, c.GcTag) |
| } |
| |
| desc := fmt.Sprintf("%s %s", utils.ResourceNameFor(c.Mapper, obj), utils.FqName(obj)) |
| |
| rc, err := utils.ClientForResource(c.Client, c.Mapper, obj, c.DefaultNamespace) |
| if err != nil { |
| return err |
| } |
| |
| schema := schemaResources.LookupResource(obj.GroupVersionKind()) |
| if !isValidKindSchema(schema) { |
| // Invalid schema (eg: custom resource without |
| // schema returns trivial type:object with k8s >=1.15) |
| log.Debugf("Ignoring invalid schema for %s", obj.GroupVersionKind()) |
| schema = nil |
| } |
| |
| var newobj *unstructured.Unstructured |
| err = retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { |
| newobj, err = createOrUpdate(ctx, rc, obj, c.Create, c.DryRun, schema, desc, dryRunText) |
| return |
| }) |
| if err != nil { |
| return fmt.Errorf("Error updating %s: %s", desc, err) |
| } |
| |
| // Some objects appear under multiple kinds |
| // (eg: Deployment is both extensions/v1beta1 |
| // and apps/v1beta1). UID is the only stable |
| // identifier that links these two views of |
| // the same object. |
| seenUids.Insert(string(newobj.GetUID())) |
| |
| // Don't wait for CRDs to settle schema under DryRun |
| if !c.DryRun { |
| waitForSchemaChange(ctx, c.Discovery, rc, newobj) |
| } |
| } |
| |
| if c.GcTag != "" && !c.SkipGc { |
| version, err := utils.FetchVersion(c.Discovery) |
| if err != nil { |
| version = utils.GetDefaultVersion() |
| log.Warnf("Unable to parse server version. Received %v. Using default %s", err, version.String()) |
| } |
| |
| // [gctag-migration]: Add LabelGcTag==c.GcTag to ListOptions.LabelSelector in phase2 |
| err = walkObjects(ctx, c.Client, c.Discovery, metav1.ListOptions{}, func(o runtime.Object) error { |
| meta, err := meta.Accessor(o) |
| if err != nil { |
| return err |
| } |
| gvk := o.GetObjectKind().GroupVersionKind() |
| desc := fmt.Sprintf("%s %s (%s)", utils.ResourceNameFor(c.Mapper, o), utils.FqName(meta), gvk.GroupVersion()) |
| log.Debugf("Considering %v for gc", desc) |
| if eligibleForGc(meta, c.GcTag) && !seenUids.Has(string(meta.GetUID())) { |
| log.Info("Garbage collecting ", desc, dryRunText) |
| if !c.DryRun { |
| err := gcDelete(ctx, c.Client, c.Mapper, &version, o) |
| if err != nil { |
| return err |
| } |
| } |
| } |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| func stringListContains(list []string, value string) bool { |
| for _, item := range list { |
| if item == value { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func gcDelete(ctx context.Context, client dynamic.Interface, mapper meta.RESTMapper, version *utils.ServerVersion, o runtime.Object) error { |
| obj, err := meta.Accessor(o) |
| if err != nil { |
| return fmt.Errorf("Unexpected object type: %s", err) |
| } |
| |
| uid := obj.GetUID() |
| desc := fmt.Sprintf("%s %s", utils.ResourceNameFor(mapper, o), utils.FqName(obj)) |
| |
| deleteOpts := metav1.DeleteOptions{ |
| Preconditions: &metav1.Preconditions{UID: &uid}, |
| } |
| if version.Compare(1, 6) < 0 { |
| // 1.5.x option |
| boolFalse := false |
| deleteOpts.OrphanDependents = &boolFalse |
| } else { |
| // 1.6.x option (NB: Background is broken) |
| fg := metav1.DeletePropagationForeground |
| deleteOpts.PropagationPolicy = &fg |
| } |
| |
| c, err := utils.ClientForResource(client, mapper, o, metav1.NamespaceNone) |
| if err != nil { |
| return err |
| } |
| |
| err = c.Delete(ctx, obj.GetName(), deleteOpts) |
| if err != nil && (errors.IsNotFound(err) || errors.IsConflict(err)) { |
| // We lost a race with something else changing the object |
| log.Debugf("Ignoring error while deleting %s: %s", desc, err) |
| err = nil |
| } |
| if err != nil { |
| return fmt.Errorf("Error deleting %s: %s", desc, err) |
| } |
| |
| return nil |
| } |
| |
| func walkObjects(ctx context.Context, client dynamic.Interface, disco discovery.DiscoveryInterface, listopts metav1.ListOptions, callback func(runtime.Object) error) error { |
| rsrclists, err := disco.ServerResources() |
| if err != nil { |
| return err |
| } |
| for _, rsrclist := range rsrclists { |
| gv, err := schema.ParseGroupVersion(rsrclist.GroupVersion) |
| if err != nil { |
| return err |
| } |
| |
| for _, rsrc := range rsrclist.APIResources { |
| if !stringListContains(rsrc.Verbs, "list") { |
| log.Debugf("Don't know how to list %#v, skipping", rsrc) |
| continue |
| } |
| |
| gvr := gv.WithResource(rsrc.Name) |
| if rsrc.Group != "" { |
| gvr.Group = rsrc.Group |
| } |
| if rsrc.Version != "" { |
| gvr.Version = rsrc.Version |
| } |
| |
| var rc dynamic.ResourceInterface |
| if rsrc.Namespaced { |
| rc = client.Resource(gvr).Namespace(metav1.NamespaceAll) |
| } else { |
| rc = client.Resource(gvr) |
| } |
| |
| log.Debugf("Listing %s", gvr) |
| obj, err := rc.List(ctx, listopts) |
| if err != nil { |
| return err |
| } |
| if err = meta.EachListItem(obj, callback); err != nil { |
| return err |
| } |
| } |
| } |
| return nil |
| } |
| |
| func eligibleForGc(obj metav1.Object, gcTag string) bool { |
| for _, ref := range obj.GetOwnerReferences() { |
| if ref.Controller != nil && *ref.Controller { |
| // Has a controller ref |
| return false |
| } |
| } |
| |
| a := obj.GetAnnotations() |
| |
| strategy, ok := a[AnnotationGcStrategy] |
| if !ok { |
| strategy = GcStrategyAuto |
| } |
| |
| // [gctag-migration]: Check *label* == tag instead in phase2 |
| return a[AnnotationGcTag] == gcTag && |
| strategy == GcStrategyAuto |
| } |