diff --git a/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml b/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml index 78b1ba4d..59224836 100644 --- a/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml +++ b/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml @@ -780,6 +780,19 @@ spec: resource: description: Resource is the name of the related resource (for example "secrets"). type: string + syncStatus: + description: |- + SyncStatus enables synchronization of the status subresource in the same direction as + the spec (from the origin side to the destination side). When enabled, the agent will + use the status subresource endpoint to update the destination object's status. + This requires the related resource to have a status subresource configured in its CRD. + + - origin: kcp -> status is synced from kcp to the service cluster + - origin: service -> status is synced from the service cluster to kcp + + For origin: service, Watch must also be configured so that changes to the related + resource's status trigger reconciliation and ensure the informer cache is populated. + type: boolean version: description: |- Version is the API version of the related resource. This can be left blank to automatically @@ -863,6 +876,8 @@ spec: rule: '!has(self.group) || (has(self.resource) && has(self.version))' - message: identity hashes can only be used with GVRs rule: '!has(self.identityHash) || (has(self.group) && has(self.version) && has(self.resource))' + - message: watch must be configured when origin is service and syncStatus is true + rule: '!(self.origin == ''service'' && has(self.syncStatus) && self.syncStatus) || has(self.watch)' type: array resource: description: |- diff --git a/hack/tools.checksums b/hack/tools.checksums index 109523eb..9448bc7b 100644 --- a/hack/tools.checksums +++ b/hack/tools.checksums @@ -11,6 +11,7 @@ golangci-lint|GOARCH=arm64;GOOS=linux|2ed9cf2ad070dabc7947ba34cdc5142910be830306 kube-apiserver|GOARCH=amd64;GOOS=linux|ca822082ec39e54a25836a4011ddb66e482e317a7a4f1a1f73882bbd2cf5a2a1 kube-apiserver|GOARCH=arm64;GOOS=linux|6ade6c2646e2c01fde1095407452afc2b65e89d6da16da29ee39f6223ccaf63b kubectl|GOARCH=amd64;GOOS=linux|9591f3d75e1581f3f7392e6ad119aab2f28ae7d6c6e083dc5d22469667f27253 +kubectl|GOARCH=arm64;GOOS=darwin|8f38d3a38ae317b00ebf90254dc274dd28d8c6eea4a4b30c5cb12d3d27017b6d kubectl|GOARCH=arm64;GOOS=linux|95df604e914941f3172a93fa8feeb1a1a50f4011dfbe0c01e01b660afc8f9b85 yq|GOARCH=amd64;GOOS=linux|0c2b24e645b57d8e7c0566d18643a6d4f5580feeea3878127354a46f2a1e4598 yq|GOARCH=arm64;GOOS=darwin|164e10e5f7df62990e4f3823205e7ea42ba5660523a428df07c7386c0b62e3d9 diff --git a/internal/sync/object_syncer.go b/internal/sync/object_syncer.go index 54aa942e..a362e635 100644 --- a/internal/sync/object_syncer.go +++ b/internal/sync/object_syncer.go @@ -49,8 +49,10 @@ type objectSyncer struct { destCreator objectCreatorFunc // list of subresources in the resource type subresources []string - // whether to enable status subresource back-syncing + // whether to enable status subresource back-syncing (dest -> source) syncStatusBack bool + // whether to sync status in the forward direction (source -> dest) + syncStatusForward bool // whether or not to add/expect a finalizer on the source blockSourceDeletion bool // whether or not to place sync-related metadata on the destination object @@ -197,10 +199,20 @@ func (s *objectSyncer) applyMutations(source, dest syncSide) (syncSide, syncSide func (s *objectSyncer) syncObjectContents(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) (requeue bool, err error) { // Sync the spec (or more generally, the desired state) from source to dest. requeue, err = s.syncObjectSpec(ctx, log, source, dest) - if requeue || err != nil { + if err != nil { + return false, err + } + + // Always attempt forward status sync regardless of whether spec was just updated, + // so that a simultaneous spec+status change doesn't defer the status by one cycle. + if err = s.syncObjectStatusForward(ctx, log, source, dest); err != nil { return requeue, err } + if requeue { + return true, nil + } + // Sync the status back in the opposite direction, from dest to source. return s.syncObjectStatus(ctx, log, source, dest) } @@ -323,6 +335,32 @@ func (s *objectSyncer) syncObjectStatus(ctx context.Context, log *zap.SugaredLog return false, nil } +func (s *objectSyncer) syncObjectStatusForward(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) error { + if !s.syncStatusForward || dest.object == nil { + return nil + } + + sourceContent := source.object.UnstructuredContent() + destContent := dest.object.UnstructuredContent() + + if !equality.Semantic.DeepEqual(sourceContent["status"], destContent["status"]) { + destContent["status"] = sourceContent["status"] + + log.Debug("Updating destination object status…") + if err := dest.client.Status().Update(ctx, dest.object); err != nil { + if apierrors.IsNotFound(err) { + // The /status subresource does not exist on the destination CRD. + // Retrying will not help; emit a warning so the user knows what to fix. + s.recordEvent(ctx, source, dest, corev1.EventTypeWarning, "StatusSubresourceMissing", "Cannot sync status: the destination resource does not have a status subresource. Set syncStatus: false or add subresources.status to the CRD.") + return nil + } + return fmt.Errorf("failed to update destination object status: %w", err) + } + } + + return nil +} + func (s *objectSyncer) ensureDestinationObject(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) error { // create a copy of the source with GVK projected and renaming rules applied destObj, err := s.destCreator(source.object) diff --git a/internal/sync/object_syncer_test.go b/internal/sync/object_syncer_test.go new file mode 100644 index 00000000..3fdb40d3 --- /dev/null +++ b/internal/sync/object_syncer_test.go @@ -0,0 +1,223 @@ +/* +Copyright 2026 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "context" + "testing" + + "go.uber.org/zap" + + "github.com/kcp-dev/logicalcluster/v3" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/tools/record" +) + +// fakeStateStore is a minimal ObjectStateStore for unit tests. +type fakeStateStore struct { + lastKnown *unstructured.Unstructured +} + +func (f *fakeStateStore) Get(_ context.Context, _ syncSide) (*unstructured.Unstructured, error) { + return f.lastKnown, nil +} + +func (f *fakeStateStore) Put(_ context.Context, _ *unstructured.Unstructured, _ logicalcluster.Name, _ []string) error { + return nil +} + +func makeUnstructuredWithStatus(name, namespace string, status map[string]interface{}) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion("test.example.com/v1") + obj.SetKind("Widget") + obj.SetName(name) + obj.SetNamespace(namespace) + if status != nil { + if err := unstructured.SetNestedMap(obj.Object, status, "status"); err != nil { + panic(err) + } + } + return obj +} + +func TestSyncObjectStatusForward(t *testing.T) { + log := zap.NewNop().Sugar() + + t.Run("no-op when syncStatusForward is false", func(t *testing.T) { + source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"}) + dest := makeUnstructuredWithStatus("dst", "default", nil) + destClient := buildFakeClientWithStatus(dest) + + syncer := &objectSyncer{syncStatusForward: false} + ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) + + err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + got := dest.UnstructuredContent()["status"] + if got != nil { + t.Errorf("expected dest status to remain nil, got %v", got) + } + }) + + t.Run("no-op when dest object is nil", func(t *testing.T) { + source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"}) + + syncer := &objectSyncer{syncStatusForward: true} + ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) + + err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: nil}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("syncs status from source to dest when they differ", func(t *testing.T) { + source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"}) + dest := makeUnstructuredWithStatus("dst", "default", nil) + destClient := buildFakeClientWithStatus(dest) + + syncer := &objectSyncer{syncStatusForward: true, eventObjSide: syncSideSource} + ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) + + err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + phase, _, _ := unstructured.NestedString(dest.Object, "status", "phase") + if phase != "ready" { + t.Errorf("expected dest status.phase=ready, got %q", phase) + } + }) + + t.Run("no-op when status is already equal", func(t *testing.T) { + status := map[string]interface{}{"phase": "ready"} + source := makeUnstructuredWithStatus("src", "default", status) + dest := makeUnstructuredWithStatus("dst", "default", status) + destClient := buildFakeClientWithStatus(dest) + + syncer := &objectSyncer{syncStatusForward: true} + ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) + + err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("emits warning and returns nil when dest has no status subresource (404)", func(t *testing.T) { + source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"}) + dest := makeUnstructuredWithStatus("dst", "default", nil) + + // Build a client that does NOT register the status subresource, so Status().Update() → 404. + destClient := buildFakeClient(dest) + + syncer := &objectSyncer{syncStatusForward: true, eventObjSide: syncSideSource} + recorder := record.NewFakeRecorder(10) + ctx := WithEventRecorder(t.Context(), recorder) + + err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + if err != nil { + t.Fatalf("expected no error on 404, got: %v", err) + } + + select { + case event := <-recorder.Events: + if event == "" { + t.Error("expected a Warning event to be emitted") + } + default: + t.Error("expected a Warning event to be emitted but channel was empty") + } + }) +} + +// TestSyncObjectContentsForwardStatusRunsEvenOnSpecRequeue verifies the key invariant: +// syncObjectStatusForward is called unconditionally in syncObjectContents, even when +// syncObjectSpec already returned requeue=true due to a pending spec change. +// A future refactor that re-introduces an early return after spec sync would silently +// break status sync for simultaneous spec+status changes. +func TestSyncObjectContentsForwardStatusRunsEvenOnSpecRequeue(t *testing.T) { + log := zap.NewNop().Sugar() + + // Source: new spec + status set. + source := &unstructured.Unstructured{} + source.SetAPIVersion("test.example.com/v1") + source.SetKind("Widget") + source.SetName("my-widget") + source.SetNamespace("default") + if err := unstructured.SetNestedField(source.Object, "new-value", "spec", "username"); err != nil { + t.Fatal(err) + } + if err := unstructured.SetNestedField(source.Object, "ready", "status", "phase"); err != nil { + t.Fatal(err) + } + + // Dest: old spec, no status. + dest := &unstructured.Unstructured{} + dest.SetAPIVersion("test.example.com/v1") + dest.SetKind("Widget") + dest.SetName("my-widget") + dest.SetNamespace("default") + if err := unstructured.SetNestedField(dest.Object, "old-value", "spec", "username"); err != nil { + t.Fatal(err) + } + + destClient := buildFakeClientWithStatus(dest) + + // Last known source state has old spec — so syncObjectSpec sees a diff and patches, returning requeue=true. + lastKnown := &unstructured.Unstructured{} + lastKnown.SetAPIVersion("test.example.com/v1") + lastKnown.SetKind("Widget") + lastKnown.SetName("my-widget") + if err := unstructured.SetNestedField(lastKnown.Object, "old-value", "spec", "username"); err != nil { + t.Fatal(err) + } + + syncer := &objectSyncer{ + stateStore: &fakeStateStore{lastKnown: lastKnown}, + syncStatusForward: true, + subresources: []string{"status"}, + destCreator: func(u *unstructured.Unstructured) (*unstructured.Unstructured, error) { return u.DeepCopy(), nil }, + eventObjSide: syncSideSource, + } + + recorder := record.NewFakeRecorder(10) + ctx := WithEventRecorder(t.Context(), recorder) + ctx = WithClusterName(ctx, logicalcluster.Name("testcluster")) + + sourceSide := syncSide{object: source, clusterName: "testcluster"} + destSide := syncSide{object: dest, client: destClient} + + requeue, err := syncer.syncObjectContents(ctx, log, sourceSide, destSide) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !requeue { + t.Error("expected requeue=true because spec changed, but got false") + } + + // Even though spec sync returned requeue=true, the dest status must have been updated. + phase, _, _ := unstructured.NestedString(dest.Object, "status", "phase") + if phase != "ready" { + t.Errorf("expected dest status.phase=ready after syncObjectContents, got %q — forward status sync did not run on spec requeue", phase) + } +} diff --git a/internal/sync/syncer_related.go b/internal/sync/syncer_related.go index 736cea7c..f8281ab6 100644 --- a/internal/sync/syncer_related.go +++ b/internal/sync/syncer_related.go @@ -133,6 +133,14 @@ func (s *ResourceSyncer) processRelatedResource(ctx context.Context, log *zap.Su // into the cleanup procedure to ensure that copies of the related object are removed. forceDelete := primaryDeleting && (relRes.Cleanup || relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp) + // When status sync is enabled, include "status" in subresources so it is stripped from + // the spec patch (avoiding a no-op write on resources that have a status subresource). + // The status is then separately written via the status subresource endpoint by syncStatusForward. + relatedSubresources := []string(nil) + if relRes.SyncStatus { + relatedSubresources = []string{"status"} + } + syncer := objectSyncer{ // Related objects within kcp are not labelled with the agent name because it's unnecessary. // agentName: "", @@ -149,17 +157,9 @@ func (s *ResourceSyncer) processRelatedResource(ctx context.Context, log *zap.Su return dest, nil }, - // Originally related resources were only ConfigMaps and Secrets, which do not have subresources; - // nowadays we support arbitrary APIs for related resources, but for simplicity do not [yet?] - // support syncing the subresources back. For this we would first need to figure out which - // subresources even exist. - subresources: nil, - // Theoretically we would only want to sync the status back if the related resource - // originates in kcp, because it would be weird if the service provider relied on status - // information provided to them by the consumer. - // However since we do not know anything about subresources, we currently cannot enable this - // feature at all. - syncStatusBack: false, + subresources: relatedSubresources, + syncStatusBack: false, + syncStatusForward: relRes.SyncStatus, // if the origin is on the remote side, we want to add a finalizer to make // sure we can clean up properly; when forceDelete is enabled, we are in deletion mode and // want to force the deletion, so we ignore the related object's origin (it was taken into diff --git a/sdk/apis/syncagent/v1alpha1/published_resource.go b/sdk/apis/syncagent/v1alpha1/published_resource.go index c2e97993..77737290 100644 --- a/sdk/apis/syncagent/v1alpha1/published_resource.go +++ b/sdk/apis/syncagent/v1alpha1/published_resource.go @@ -210,6 +210,7 @@ const ( // +kubebuilder:validation:XValidation:rule="!has(self.group) || (has(self.resource) && has(self.version))",message="configuring a group also requires a version and resource" // group is included here because when an identityHash is used, core/v1 cannot possible be targetted // +kubebuilder:validation:XValidation:rule="!has(self.identityHash) || (has(self.group) && has(self.version) && has(self.resource))",message="identity hashes can only be used with GVRs" +// +kubebuilder:validation:XValidation:rule="!(self.origin == 'service' && has(self.syncStatus) && self.syncStatus) || has(self.watch)",message="watch must be configured when origin is service and syncStatus is true" type RelatedResourceSpec struct { // Identifier is a unique name for this related resource. The name must be unique within one // PublishedResource and is the key by which consumers (end users) can identify and consume the @@ -268,6 +269,20 @@ type RelatedResourceSpec struct { // resource type and uses the configured rule to enqueue the correct primary object. // Without this field, changes to origin:kcp related resources do not trigger reconciliation. Watch *RelatedResourceWatch `json:"watch,omitempty"` + + // SyncStatus enables synchronization of the status subresource in the same direction as + // the spec (from the origin side to the destination side). When enabled, the agent will + // use the status subresource endpoint to update the destination object's status. + // This requires the related resource to have a status subresource configured in its CRD. + // + // - origin: kcp -> status is synced from kcp to the service cluster + // - origin: service -> status is synced from the service cluster to kcp + // + // For origin: service, Watch must also be configured so that changes to the related + // resource's status trigger reconciliation and ensure the informer cache is populated. + // + // +optional + SyncStatus bool `json:"syncStatus,omitempty"` } // RelatedResourceWatch configures how the watch handler maps a changed related resource diff --git a/sdk/applyconfiguration/syncagent/v1alpha1/relatedresourcespec.go b/sdk/applyconfiguration/syncagent/v1alpha1/relatedresourcespec.go index 3028b00a..5517ac2b 100644 --- a/sdk/applyconfiguration/syncagent/v1alpha1/relatedresourcespec.go +++ b/sdk/applyconfiguration/syncagent/v1alpha1/relatedresourcespec.go @@ -37,6 +37,7 @@ type RelatedResourceSpecApplyConfiguration struct { Object *RelatedResourceObjectApplyConfiguration `json:"object,omitempty"` Mutation *ResourceMutationSpecApplyConfiguration `json:"mutation,omitempty"` Watch *RelatedResourceWatchApplyConfiguration `json:"watch,omitempty"` + SyncStatus *bool `json:"syncStatus,omitempty"` } // RelatedResourceSpecApplyConfiguration constructs a declarative configuration of the RelatedResourceSpec type for use with @@ -140,3 +141,11 @@ func (b *RelatedResourceSpecApplyConfiguration) WithWatch(value *RelatedResource b.Watch = value return b } + +// WithSyncStatus sets the SyncStatus field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the SyncStatus field is set to the value of the last call. +func (b *RelatedResourceSpecApplyConfiguration) WithSyncStatus(value bool) *RelatedResourceSpecApplyConfiguration { + b.SyncStatus = &value + return b +} diff --git a/test/crds/crontabswithstatus.yaml b/test/crds/crontabswithstatus.yaml new file mode 100644 index 00000000..4f5ea0b9 --- /dev/null +++ b/test/crds/crontabswithstatus.yaml @@ -0,0 +1,27 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: crontabswithstatus.example.com +spec: + group: example.com + scope: Namespaced + names: + plural: crontabswithstatus + singular: crontabwithstatus + kind: CronTabWithStatus + versions: + - name: v1 + served: true + storage: true + subresources: + status: {} + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + x-kubernetes-preserve-unknown-fields: true + status: + type: object + x-kubernetes-preserve-unknown-fields: true diff --git a/test/e2e/sdk/cel_validations_test.go b/test/e2e/sdk/cel_validations_test.go index 3fa3c36d..96da0369 100644 --- a/test/e2e/sdk/cel_validations_test.go +++ b/test/e2e/sdk/cel_validations_test.go @@ -176,6 +176,39 @@ func TestValidateRelatedResourceSpec(t *testing.T) { IdentityHash: "abc123", }, }, + { + name: "syncStatus with origin service requires watch", + valid: false, + spec: syncagentv1alpha1.RelatedResourceSpec{ + Origin: syncagentv1alpha1.RelatedResourceOriginService, + Resource: "things", + Version: "v1", + SyncStatus: true, + }, + }, + { + name: "syncStatus with origin service and watch is valid", + valid: true, + spec: syncagentv1alpha1.RelatedResourceSpec{ + Origin: syncagentv1alpha1.RelatedResourceOriginService, + Resource: "things", + Version: "v1", + SyncStatus: true, + Watch: &syncagentv1alpha1.RelatedResourceWatch{ + ByOwner: &syncagentv1alpha1.RelatedResourceWatchByOwner{}, + }, + }, + }, + { + name: "syncStatus with origin kcp does not require watch", + valid: true, + spec: syncagentv1alpha1.RelatedResourceSpec{ + Origin: syncagentv1alpha1.RelatedResourceOriginKcp, + Resource: "things", + Version: "v1", + SyncStatus: true, + }, + }, } alphaNum := regexp.MustCompile(`[^a-z0-9]`) diff --git a/test/e2e/sync/related_status_test.go b/test/e2e/sync/related_status_test.go new file mode 100644 index 00000000..0cf8aa78 --- /dev/null +++ b/test/e2e/sync/related_status_test.go @@ -0,0 +1,528 @@ +//go:build e2e + +/* +Copyright 2026 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + + syncagentv1alpha1 "github.com/kcp-dev/api-syncagent/sdk/apis/syncagent/v1alpha1" + crds "github.com/kcp-dev/api-syncagent/test/crds/example/v1" + "github.com/kcp-dev/api-syncagent/test/utils" + + "github.com/kcp-dev/logicalcluster/v3" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + ctrlruntime "sigs.k8s.io/controller-runtime" +) + +const crontabWithStatusAPIVersion = "example.com/v1" +const crontabWithStatusKind = "CronTabWithStatus" + +func makeCronTabWithStatus(name, namespace string) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion(crontabWithStatusAPIVersion) + obj.SetKind(crontabWithStatusKind) + obj.SetName(name) + obj.SetNamespace(namespace) + return obj +} + +// TestSyncRelatedObjectStatusFromKcp verifies that when syncStatus: true is set on a related +// resource with origin: kcp, the status is propagated from kcp to the service cluster. +func TestSyncRelatedObjectStatusFromKcp(t *testing.T) { + const apiExportName = "kcp.example.com" + + ctrlruntime.SetLogger(logr.Discard()) + + ctx := t.Context() + + orgKubconfig := utils.CreateOrganization(t, ctx, "related-status-from-kcp", apiExportName) + + envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ + "test/crds/crontab.yaml", + "test/crds/crontabswithstatus.yaml", + }) + + // Publish CronTabsWithStatus (with sync disabled) so the schema is available in kcp. + prCronTabsWithStatus := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabswithstatus"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTabWithStatus", + }, + Projection: &syncagentv1alpha1.ResourceProjection{ + Group: "kcp.example.com", + }, + Synchronization: &syncagentv1alpha1.SynchronizationSpec{Enabled: false}, + }, + } + if err := envtestClient.Create(ctx, prCronTabsWithStatus); err != nil { + t.Fatalf("Failed to create CronTabWithStatus PublishedResource: %v", err) + } + + // Publish CronTabs with a related CronTabWithStatus (origin: kcp, syncStatus: true). + prCrontabs := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabs"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTab", + }, + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "{{ .Object.metadata.name }}", + Namespace: "synced-{{ .Object.metadata.namespace }}", + }, + Projection: &syncagentv1alpha1.ResourceProjection{Group: "kcp.example.com"}, + Related: []syncagentv1alpha1.RelatedResourceSpec{ + { + Identifier: "crontabwithstatus", + Origin: syncagentv1alpha1.RelatedResourceOriginKcp, + Group: "kcp.example.com", + Version: "v1", + Resource: "crontabswithstatus", + Projection: &syncagentv1alpha1.RelatedResourceProjection{ + Group: "example.com", + }, + SyncStatus: true, + Object: syncagentv1alpha1.RelatedResourceObject{ + RelatedResourceObjectSpec: syncagentv1alpha1.RelatedResourceObjectSpec{ + Template: &syncagentv1alpha1.TemplateExpression{ + Template: "my-related", + }, + }, + }, + }, + }, + }, + } + if err := envtestClient.Create(ctx, prCrontabs); err != nil { + t.Fatalf("Failed to create CronTab PublishedResource: %v", err) + } + + utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName, "") + + kcpClusterClient := utils.GetKcpAdminClusterClient(t) + teamClusterPath := logicalcluster.NewPath("root").Join("related-status-from-kcp").Join("team-1") + teamClient := kcpClusterClient.Cluster(teamClusterPath) + + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ + Group: apiExportName, + Version: "v1", + Kind: "CronTab", + }) + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ + Group: apiExportName, + Version: "v1", + Kind: "CronTabWithStatus", + }) + + // Create the related object in kcp first (origin: kcp means the user creates it there) and set + // its status before creating the CronTab. This ensures the status is already present when the + // agent's second reconciliation cycle (after creating the service cluster copy) runs, so + // syncObjectStatusForward can see it without needing a Watch trigger. + kcpObj := makeCronTabWithStatus("my-related", "default") + kcpObj.SetAPIVersion("kcp.example.com/v1") + kcpObj.SetKind("CronTabWithStatus") + if err := teamClient.Create(ctx, kcpObj); err != nil { + t.Fatalf("Failed to create CronTabWithStatus in kcp: %v", err) + } + + // Simulate a controller in kcp setting the object's status. + if err := unstructured.SetNestedField(kcpObj.Object, "id-kcp-12345", "status", "id"); err != nil { + t.Fatalf("Failed to set status.id: %v", err) + } + if err := teamClient.Status().Update(ctx, kcpObj); err != nil { + t.Fatalf("Failed to update CronTabWithStatus status in kcp: %v", err) + } + + // Create the primary CronTab in kcp after the related object is ready. + crontab := utils.ToUnstructured(t, &crds.Crontab{ + TypeMeta: metav1.TypeMeta{APIVersion: "kcp.example.com/v1", Kind: "CronTab"}, + ObjectMeta: metav1.ObjectMeta{Name: "my-crontab", Namespace: "default"}, + Spec: crds.CrontabSpec{CronSpec: "* * *", Image: "ubuntu:latest"}, + }) + if err := teamClient.Create(ctx, crontab); err != nil { + t.Fatalf("Failed to create CronTab: %v", err) + } + + // Wait for the service cluster copy to appear. + t.Log("Waiting for CronTabWithStatus copy to appear in the service cluster…") + svcObj := makeCronTabWithStatus("my-related", "synced-default") + svcObj.SetAPIVersion(crontabWithStatusAPIVersion) + svcObj.SetKind(crontabWithStatusKind) + + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { + return envtestClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "synced-default"}, svcObj) == nil, nil + }) + if err != nil { + t.Fatalf("CronTabWithStatus copy never appeared in service cluster: %v", err) + } + + // Verify the status was forwarded from kcp to the service cluster. + t.Log("Waiting for CronTabWithStatus status to be synced to service cluster…") + err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { + if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "synced-default"}, svcObj); err != nil { + return false, nil + } + + id, _, _ := unstructured.NestedString(svcObj.Object, "status", "id") + return id == "id-kcp-12345", nil + }) + if err != nil { + id, _, _ := unstructured.NestedString(svcObj.Object, "status", "id") + t.Fatalf("CronTabWithStatus status was not synced to service cluster (got status.id=%q, want %q)", id, "id-kcp-12345") + } + + // Verify that updating status on the service cluster does NOT propagate back to kcp + // (there is no reverse sync for origin:kcp + syncStatus). + svcObj2 := makeCronTabWithStatus("my-related", "synced-default") + svcObj2.SetAPIVersion(crontabWithStatusAPIVersion) + svcObj2.SetKind(crontabWithStatusKind) + if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "synced-default"}, svcObj2); err != nil { + t.Fatalf("Failed to re-fetch service cluster CronTabWithStatus: %v", err) + } + if err := unstructured.SetNestedField(svcObj2.Object, "id-local-override", "status", "id"); err != nil { + t.Fatalf("Failed to set status.id: %v", err) + } + if err := envtestClient.Status().Update(ctx, svcObj2); err != nil { + t.Fatalf("Failed to update service cluster CronTabWithStatus status: %v", err) + } + + // Briefly poll kcp to confirm its status was not overwritten. + kcpObjCheck := makeCronTabWithStatus("my-related", "default") + kcpObjCheck.SetAPIVersion("kcp.example.com/v1") + kcpObjCheck.SetKind("CronTabWithStatus") + if err := teamClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "default"}, kcpObjCheck); err != nil { + t.Fatalf("Failed to get kcp CronTabWithStatus: %v", err) + } + kcpID, _, _ := unstructured.NestedString(kcpObjCheck.Object, "status", "id") + if kcpID != "id-kcp-12345" { + t.Fatalf("kcp CronTabWithStatus status was unexpectedly modified (got %q, want %q)", kcpID, "id-kcp-12345") + } +} + +// TestSyncRelatedObjectStatusFromService verifies that when syncStatus: true is set on a related +// resource with origin: service, the status is propagated from the service cluster to kcp. +func TestSyncRelatedObjectStatusFromService(t *testing.T) { + const apiExportName = "kcp.example.com" + + ctrlruntime.SetLogger(logr.Discard()) + + ctx := t.Context() + + orgKubconfig := utils.CreateOrganization(t, ctx, "related-status-from-service", apiExportName) + + envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ + "test/crds/crontab.yaml", + "test/crds/crontabswithstatus.yaml", + }) + + // Publish CronTabsWithStatus (with sync disabled) so the schema is available in kcp + // and the agent can create kcp-side copies when syncing service-origin related resources. + prCronTabsWithStatus := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabswithstatus"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTabWithStatus", + }, + Projection: &syncagentv1alpha1.ResourceProjection{ + Group: "kcp.example.com", + }, + Synchronization: &syncagentv1alpha1.SynchronizationSpec{Enabled: false}, + }, + } + if err := envtestClient.Create(ctx, prCronTabsWithStatus); err != nil { + t.Fatalf("Failed to create CronTabWithStatus PublishedResource: %v", err) + } + + // Publish CronTabs with a related CronTabWithStatus (origin: service, syncStatus: true). + prCrontabs := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabs"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTab", + }, + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "{{ .Object.metadata.name }}", + Namespace: "synced-{{ .Object.metadata.namespace }}", + }, + Projection: &syncagentv1alpha1.ResourceProjection{Group: "kcp.example.com"}, + Related: []syncagentv1alpha1.RelatedResourceSpec{ + { + Identifier: "crontabwithstatus", + Origin: syncagentv1alpha1.RelatedResourceOriginService, + Group: "example.com", + Version: "v1", + Resource: "crontabswithstatus", + Projection: &syncagentv1alpha1.RelatedResourceProjection{ + Group: "kcp.example.com", + }, + SyncStatus: true, + // Watch triggers CronTab reconciliation whenever a service cluster + // CronTabWithStatus changes. This also registers an informer so that + // syncObjectStatusForward can read the up-to-date status from the cache. + Watch: &syncagentv1alpha1.RelatedResourceWatch{ + BySelector: &metav1.LabelSelector{}, + }, + Object: syncagentv1alpha1.RelatedResourceObject{ + RelatedResourceObjectSpec: syncagentv1alpha1.RelatedResourceObjectSpec{ + Template: &syncagentv1alpha1.TemplateExpression{ + Template: "my-related", + }, + }, + }, + }, + }, + }, + } + if err := envtestClient.Create(ctx, prCrontabs); err != nil { + t.Fatalf("Failed to create CronTab PublishedResource: %v", err) + } + + utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName, "") + + kcpClusterClient := utils.GetKcpAdminClusterClient(t) + teamClusterPath := logicalcluster.NewPath("root").Join("related-status-from-service").Join("team-1") + teamClient := kcpClusterClient.Cluster(teamClusterPath) + + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ + Group: apiExportName, + Version: "v1", + Kind: "CronTab", + }) + + // Create the primary CronTab in kcp. + crontab := utils.ToUnstructured(t, &crds.Crontab{ + TypeMeta: metav1.TypeMeta{APIVersion: "kcp.example.com/v1", Kind: "CronTab"}, + ObjectMeta: metav1.ObjectMeta{Name: "my-crontab", Namespace: "default"}, + Spec: crds.CrontabSpec{CronSpec: "* * *", Image: "ubuntu:latest"}, + }) + if err := teamClient.Create(ctx, crontab); err != nil { + t.Fatalf("Failed to create CronTab: %v", err) + } + + // Create the related object on the service cluster and set its status. + ensureNamespace(t, ctx, envtestClient, "synced-default") + + svcObj := makeCronTabWithStatus("my-related", "synced-default") + if err := envtestClient.Create(ctx, svcObj); err != nil { + t.Fatalf("Failed to create CronTabWithStatus in service cluster: %v", err) + } + + if err := unstructured.SetNestedField(svcObj.Object, "id-svc-99999", "status", "id"); err != nil { + t.Fatalf("Failed to set status.id: %v", err) + } + if err := envtestClient.Status().Update(ctx, svcObj); err != nil { + t.Fatalf("Failed to update CronTabWithStatus status: %v", err) + } + + // Wait for the kcp copy to appear. + t.Log("Waiting for CronTabWithStatus copy to appear in kcp…") + kcpObj := &unstructured.Unstructured{} + kcpObj.SetAPIVersion("kcp.example.com/v1") + kcpObj.SetKind("CronTabWithStatus") + + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { + return teamClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "default"}, kcpObj) == nil, nil + }) + if err != nil { + t.Fatalf("CronTabWithStatus copy never appeared in kcp: %v", err) + } + + // Wait for the status to be synced to kcp. + t.Log("Waiting for CronTabWithStatus status to be synced to kcp…") + err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { + if err := teamClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "default"}, kcpObj); err != nil { + return false, nil + } + + id, _, _ := unstructured.NestedString(kcpObj.Object, "status", "id") + return id == "id-svc-99999", nil + }) + if err != nil { + id, _, _ := unstructured.NestedString(kcpObj.Object, "status", "id") + t.Fatalf("CronTabWithStatus status was not synced to kcp (got status.id=%q, want %q)", id, "id-svc-99999") + } +} + +// TestSyncRelatedObjectStatusNotSyncedByDefault verifies that when syncStatus is not set (the +// default), status changes on the origin side do not appear on the destination copy. +func TestSyncRelatedObjectStatusNotSyncedByDefault(t *testing.T) { + const apiExportName = "kcp.example.com" + + ctrlruntime.SetLogger(logr.Discard()) + + ctx := t.Context() + + orgKubconfig := utils.CreateOrganization(t, ctx, "related-status-not-synced", apiExportName) + + envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ + "test/crds/crontab.yaml", + "test/crds/crontabswithstatus.yaml", + }) + + prCronTabsWithStatus := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabswithstatus"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTabWithStatus", + }, + Projection: &syncagentv1alpha1.ResourceProjection{ + Group: "kcp.example.com", + }, + Synchronization: &syncagentv1alpha1.SynchronizationSpec{Enabled: false}, + }, + } + if err := envtestClient.Create(ctx, prCronTabsWithStatus); err != nil { + t.Fatalf("Failed to create CronTabWithStatus PublishedResource: %v", err) + } + + // SyncStatus is intentionally NOT set here. + prCrontabs := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabs"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTab", + }, + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "{{ .Object.metadata.name }}", + Namespace: "synced-{{ .Object.metadata.namespace }}", + }, + Projection: &syncagentv1alpha1.ResourceProjection{Group: "kcp.example.com"}, + Related: []syncagentv1alpha1.RelatedResourceSpec{ + { + Identifier: "crontabwithstatus", + Origin: syncagentv1alpha1.RelatedResourceOriginKcp, + Group: "kcp.example.com", + Version: "v1", + Resource: "crontabswithstatus", + Projection: &syncagentv1alpha1.RelatedResourceProjection{ + Group: "example.com", + }, + // SyncStatus: false (default) + Object: syncagentv1alpha1.RelatedResourceObject{ + RelatedResourceObjectSpec: syncagentv1alpha1.RelatedResourceObjectSpec{ + Template: &syncagentv1alpha1.TemplateExpression{ + Template: "my-related", + }, + }, + }, + }, + }, + }, + } + if err := envtestClient.Create(ctx, prCrontabs); err != nil { + t.Fatalf("Failed to create CronTab PublishedResource: %v", err) + } + + utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName, "") + + kcpClusterClient := utils.GetKcpAdminClusterClient(t) + teamClusterPath := logicalcluster.NewPath("root").Join("related-status-not-synced").Join("team-1") + teamClient := kcpClusterClient.Cluster(teamClusterPath) + + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ + Group: apiExportName, + Version: "v1", + Kind: "CronTab", + }) + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ + Group: apiExportName, + Version: "v1", + Kind: "CronTabWithStatus", + }) + + // Create primary CronTab in kcp. + crontab := utils.ToUnstructured(t, &crds.Crontab{ + TypeMeta: metav1.TypeMeta{APIVersion: "kcp.example.com/v1", Kind: "CronTab"}, + ObjectMeta: metav1.ObjectMeta{Name: "my-crontab", Namespace: "default"}, + Spec: crds.CrontabSpec{CronSpec: "* * *", Image: "ubuntu:latest"}, + }) + if err := teamClient.Create(ctx, crontab); err != nil { + t.Fatalf("Failed to create CronTab: %v", err) + } + + // Create the related object in kcp and set its status. + kcpObj := makeCronTabWithStatus("my-related", "default") + kcpObj.SetAPIVersion("kcp.example.com/v1") + kcpObj.SetKind("CronTabWithStatus") + if err := teamClient.Create(ctx, kcpObj); err != nil { + t.Fatalf("Failed to create CronTabWithStatus in kcp: %v", err) + } + if err := unstructured.SetNestedField(kcpObj.Object, "id-should-not-appear", "status", "id"); err != nil { + t.Fatalf("Failed to set status.id: %v", err) + } + if err := teamClient.Status().Update(ctx, kcpObj); err != nil { + t.Fatalf("Failed to update CronTabWithStatus status in kcp: %v", err) + } + + // Wait for the service cluster copy to appear. + t.Log("Waiting for CronTabWithStatus copy to appear in service cluster…") + svcObj := makeCronTabWithStatus("my-related", "synced-default") + svcObj.SetAPIVersion(crontabWithStatusAPIVersion) + svcObj.SetKind(crontabWithStatusKind) + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { + return envtestClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "synced-default"}, svcObj) == nil, nil + }) + if err != nil { + t.Fatalf("CronTabWithStatus copy never appeared in service cluster: %v", err) + } + + // Poll briefly to confirm no status appears on the service cluster copy. + var finalSvcObj unstructured.Unstructured + finalSvcObj.SetAPIVersion(crontabWithStatusAPIVersion) + finalSvcObj.SetKind(crontabWithStatusKind) + + statusSeen := false + _ = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) { + if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "synced-default"}, &finalSvcObj); err != nil { + return false, nil + } + id, exists, _ := unstructured.NestedString(finalSvcObj.Object, "status", "id") + if exists && id != "" { + statusSeen = true + return true, nil + } + return false, nil + }) + + if statusSeen { + id, _, _ := unstructured.NestedString(finalSvcObj.Object, "status", "id") + t.Fatalf("CronTabWithStatus status was unexpectedly synced to service cluster (got status.id=%q)", id) + } +}