From 2e90f2a641091a3402fa73e7eca7965417a4ce9c Mon Sep 17 00:00:00 2001 From: Iskren Date: Tue, 30 Jun 2026 01:40:51 +0300 Subject: [PATCH 1/3] Add SyncStatus field to RelatedResourceSpec Signed-off-by: Iskren some codeverify fixes sdk changes Signed-off-by: Iskren --- .../syncagent.kcp.io_publishedresources.yaml | 10 + hack/tools.checksums | 1 + internal/sync/object_syncer.go | 38 +- internal/sync/syncer_related.go | 22 +- .../syncagent/v1alpha1/published_resource.go | 11 + .../syncagent/v1alpha1/relatedresourcespec.go | 9 + test/crds/subnet.yaml | 27 + test/e2e/sync/related_status_test.go | 531 ++++++++++++++++++ 8 files changed, 636 insertions(+), 13 deletions(-) create mode 100644 test/crds/subnet.yaml create mode 100644 test/e2e/sync/related_status_test.go diff --git a/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml b/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml index 78b1ba4d..a20d522e 100644 --- a/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml +++ b/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml @@ -780,6 +780,16 @@ spec: resource: description: Resource is the name of the related resource (for example "secrets"). type: string + syncStatus: + description: |- + SyncStatus enables synchronization of the status subresource in the same direction as + the spec (from the origin side to the destination side). When enabled, the agent will + use the status subresource endpoint to update the destination object's status. + This requires the related resource to have a status subresource configured in its CRD. + + - origin: kcp -> status is synced from kcp to the service cluster + - origin: service -> status is synced from the service cluster to kcp + type: boolean version: description: |- Version is the API version of the related resource. This can be left blank to automatically diff --git a/hack/tools.checksums b/hack/tools.checksums index 109523eb..9448bc7b 100644 --- a/hack/tools.checksums +++ b/hack/tools.checksums @@ -11,6 +11,7 @@ golangci-lint|GOARCH=arm64;GOOS=linux|2ed9cf2ad070dabc7947ba34cdc5142910be830306 kube-apiserver|GOARCH=amd64;GOOS=linux|ca822082ec39e54a25836a4011ddb66e482e317a7a4f1a1f73882bbd2cf5a2a1 kube-apiserver|GOARCH=arm64;GOOS=linux|6ade6c2646e2c01fde1095407452afc2b65e89d6da16da29ee39f6223ccaf63b kubectl|GOARCH=amd64;GOOS=linux|9591f3d75e1581f3f7392e6ad119aab2f28ae7d6c6e083dc5d22469667f27253 +kubectl|GOARCH=arm64;GOOS=darwin|8f38d3a38ae317b00ebf90254dc274dd28d8c6eea4a4b30c5cb12d3d27017b6d kubectl|GOARCH=arm64;GOOS=linux|95df604e914941f3172a93fa8feeb1a1a50f4011dfbe0c01e01b660afc8f9b85 yq|GOARCH=amd64;GOOS=linux|0c2b24e645b57d8e7c0566d18643a6d4f5580feeea3878127354a46f2a1e4598 yq|GOARCH=arm64;GOOS=darwin|164e10e5f7df62990e4f3823205e7ea42ba5660523a428df07c7386c0b62e3d9 diff --git a/internal/sync/object_syncer.go b/internal/sync/object_syncer.go index 54aa942e..6e43393b 100644 --- a/internal/sync/object_syncer.go +++ b/internal/sync/object_syncer.go @@ -49,8 +49,10 @@ type objectSyncer struct { destCreator objectCreatorFunc // list of subresources in the resource type subresources []string - // whether to enable status subresource back-syncing + // whether to enable status subresource back-syncing (dest -> source) syncStatusBack bool + // whether to sync status in the forward direction (source -> dest) + syncStatusForward bool // whether or not to add/expect a finalizer on the source blockSourceDeletion bool // whether or not to place sync-related metadata on the destination object @@ -197,10 +199,20 @@ func (s *objectSyncer) applyMutations(source, dest syncSide) (syncSide, syncSide func (s *objectSyncer) syncObjectContents(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) (requeue bool, err error) { // Sync the spec (or more generally, the desired state) from source to dest. requeue, err = s.syncObjectSpec(ctx, log, source, dest) - if requeue || err != nil { + if err != nil { + return false, err + } + + // Always attempt forward status sync regardless of whether spec was just updated, + // so that a simultaneous spec+status change doesn't defer the status by one cycle. + if _, err = s.syncObjectStatusForward(ctx, log, source, dest); err != nil { return requeue, err } + if requeue { + return true, nil + } + // Sync the status back in the opposite direction, from dest to source. return s.syncObjectStatus(ctx, log, source, dest) } @@ -323,6 +335,28 @@ func (s *objectSyncer) syncObjectStatus(ctx context.Context, log *zap.SugaredLog return false, nil } +func (s *objectSyncer) syncObjectStatusForward(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) (requeue bool, err error) { + if !s.syncStatusForward || dest.object == nil { + return false, nil + } + + sourceContent := source.object.UnstructuredContent() + destContent := dest.object.UnstructuredContent() + + if !equality.Semantic.DeepEqual(sourceContent["status"], destContent["status"]) { + destContent["status"] = sourceContent["status"] + + log.Debug("Updating destination object status…") + if err := dest.client.Status().Update(ctx, dest.object); err != nil { + return false, fmt.Errorf("failed to update destination object status: %w", err) + } + + s.recordEvent(ctx, source, dest, corev1.EventTypeNormal, "ObjectStatusSynced", "The current object status has been synchronized to the destination.") + } + + return false, nil +} + func (s *objectSyncer) ensureDestinationObject(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) error { // create a copy of the source with GVK projected and renaming rules applied destObj, err := s.destCreator(source.object) diff --git a/internal/sync/syncer_related.go b/internal/sync/syncer_related.go index 736cea7c..f8281ab6 100644 --- a/internal/sync/syncer_related.go +++ b/internal/sync/syncer_related.go @@ -133,6 +133,14 @@ func (s *ResourceSyncer) processRelatedResource(ctx context.Context, log *zap.Su // into the cleanup procedure to ensure that copies of the related object are removed. forceDelete := primaryDeleting && (relRes.Cleanup || relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp) + // When status sync is enabled, include "status" in subresources so it is stripped from + // the spec patch (avoiding a no-op write on resources that have a status subresource). + // The status is then separately written via the status subresource endpoint by syncStatusForward. + relatedSubresources := []string(nil) + if relRes.SyncStatus { + relatedSubresources = []string{"status"} + } + syncer := objectSyncer{ // Related objects within kcp are not labelled with the agent name because it's unnecessary. // agentName: "", @@ -149,17 +157,9 @@ func (s *ResourceSyncer) processRelatedResource(ctx context.Context, log *zap.Su return dest, nil }, - // Originally related resources were only ConfigMaps and Secrets, which do not have subresources; - // nowadays we support arbitrary APIs for related resources, but for simplicity do not [yet?] - // support syncing the subresources back. For this we would first need to figure out which - // subresources even exist. - subresources: nil, - // Theoretically we would only want to sync the status back if the related resource - // originates in kcp, because it would be weird if the service provider relied on status - // information provided to them by the consumer. - // However since we do not know anything about subresources, we currently cannot enable this - // feature at all. - syncStatusBack: false, + subresources: relatedSubresources, + syncStatusBack: false, + syncStatusForward: relRes.SyncStatus, // if the origin is on the remote side, we want to add a finalizer to make // sure we can clean up properly; when forceDelete is enabled, we are in deletion mode and // want to force the deletion, so we ignore the related object's origin (it was taken into diff --git a/sdk/apis/syncagent/v1alpha1/published_resource.go b/sdk/apis/syncagent/v1alpha1/published_resource.go index c2e97993..6f0cec04 100644 --- a/sdk/apis/syncagent/v1alpha1/published_resource.go +++ b/sdk/apis/syncagent/v1alpha1/published_resource.go @@ -268,6 +268,17 @@ type RelatedResourceSpec struct { // resource type and uses the configured rule to enqueue the correct primary object. // Without this field, changes to origin:kcp related resources do not trigger reconciliation. Watch *RelatedResourceWatch `json:"watch,omitempty"` + + // SyncStatus enables synchronization of the status subresource in the same direction as + // the spec (from the origin side to the destination side). When enabled, the agent will + // use the status subresource endpoint to update the destination object's status. + // This requires the related resource to have a status subresource configured in its CRD. + // + // - origin: kcp -> status is synced from kcp to the service cluster + // - origin: service -> status is synced from the service cluster to kcp + // + // +optional + SyncStatus bool `json:"syncStatus,omitempty"` } // RelatedResourceWatch configures how the watch handler maps a changed related resource diff --git a/sdk/applyconfiguration/syncagent/v1alpha1/relatedresourcespec.go b/sdk/applyconfiguration/syncagent/v1alpha1/relatedresourcespec.go index 3028b00a..5517ac2b 100644 --- a/sdk/applyconfiguration/syncagent/v1alpha1/relatedresourcespec.go +++ b/sdk/applyconfiguration/syncagent/v1alpha1/relatedresourcespec.go @@ -37,6 +37,7 @@ type RelatedResourceSpecApplyConfiguration struct { Object *RelatedResourceObjectApplyConfiguration `json:"object,omitempty"` Mutation *ResourceMutationSpecApplyConfiguration `json:"mutation,omitempty"` Watch *RelatedResourceWatchApplyConfiguration `json:"watch,omitempty"` + SyncStatus *bool `json:"syncStatus,omitempty"` } // RelatedResourceSpecApplyConfiguration constructs a declarative configuration of the RelatedResourceSpec type for use with @@ -140,3 +141,11 @@ func (b *RelatedResourceSpecApplyConfiguration) WithWatch(value *RelatedResource b.Watch = value return b } + +// WithSyncStatus sets the SyncStatus field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the SyncStatus field is set to the value of the last call. +func (b *RelatedResourceSpecApplyConfiguration) WithSyncStatus(value bool) *RelatedResourceSpecApplyConfiguration { + b.SyncStatus = &value + return b +} diff --git a/test/crds/subnet.yaml b/test/crds/subnet.yaml new file mode 100644 index 00000000..1602e018 --- /dev/null +++ b/test/crds/subnet.yaml @@ -0,0 +1,27 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: subnets.aws.example.com +spec: + group: aws.example.com + scope: Namespaced + names: + plural: subnets + singular: subnet + kind: Subnet + versions: + - name: v1 + served: true + storage: true + subresources: + status: {} + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + x-kubernetes-preserve-unknown-fields: true + status: + type: object + x-kubernetes-preserve-unknown-fields: true diff --git a/test/e2e/sync/related_status_test.go b/test/e2e/sync/related_status_test.go new file mode 100644 index 00000000..3c639c78 --- /dev/null +++ b/test/e2e/sync/related_status_test.go @@ -0,0 +1,531 @@ +//go:build e2e + +/* +Copyright 2026 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + + syncagentv1alpha1 "github.com/kcp-dev/api-syncagent/sdk/apis/syncagent/v1alpha1" + crds "github.com/kcp-dev/api-syncagent/test/crds/example/v1" + "github.com/kcp-dev/api-syncagent/test/utils" + + "github.com/kcp-dev/logicalcluster/v3" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + ctrlruntime "sigs.k8s.io/controller-runtime" +) + +const subnetAPIVersion = "aws.example.com/v1" +const subnetKind = "Subnet" + +func makeSubnet(name, namespace string) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion(subnetAPIVersion) + obj.SetKind(subnetKind) + obj.SetName(name) + obj.SetNamespace(namespace) + return obj +} + +// TestSyncRelatedObjectStatusFromKcp verifies that when syncStatus: true is set on a related +// resource with origin: kcp, the status is propagated from kcp to the service cluster. +func TestSyncRelatedObjectStatusFromKcp(t *testing.T) { + const apiExportName = "kcp.example.com" + + ctrlruntime.SetLogger(logr.Discard()) + + ctx := t.Context() + + orgKubconfig := utils.CreateOrganization(t, ctx, "related-status-from-kcp", apiExportName) + + envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ + "test/crds/crontab.yaml", + "test/crds/subnet.yaml", + }) + + // Publish Subnets (with sync disabled) so the Subnet schema is available in kcp. + prSubnets := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-subnets"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "aws.example.com", + Version: "v1", + Kind: "Subnet", + }, + Projection: &syncagentv1alpha1.ResourceProjection{ + Group: "kcp.example.com", + }, + Synchronization: &syncagentv1alpha1.SynchronizationSpec{Enabled: false}, + }, + } + if err := envtestClient.Create(ctx, prSubnets); err != nil { + t.Fatalf("Failed to create Subnet PublishedResource: %v", err) + } + + // Publish CronTabs with a related Subnet (origin: kcp, syncStatus: true). + prCrontabs := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabs"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTab", + }, + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "{{ .Object.metadata.name }}", + Namespace: "synced-{{ .Object.metadata.namespace }}", + }, + Projection: &syncagentv1alpha1.ResourceProjection{Group: "kcp.example.com"}, + Related: []syncagentv1alpha1.RelatedResourceSpec{ + { + Identifier: "subnet", + Origin: syncagentv1alpha1.RelatedResourceOriginKcp, + Group: "kcp.example.com", + Version: "v1", + Resource: "subnets", + Projection: &syncagentv1alpha1.RelatedResourceProjection{ + Group: "aws.example.com", + }, + SyncStatus: true, + Object: syncagentv1alpha1.RelatedResourceObject{ + RelatedResourceObjectSpec: syncagentv1alpha1.RelatedResourceObjectSpec{ + Template: &syncagentv1alpha1.TemplateExpression{ + Template: "my-subnet", + }, + }, + }, + }, + }, + }, + } + if err := envtestClient.Create(ctx, prCrontabs); err != nil { + t.Fatalf("Failed to create CronTab PublishedResource: %v", err) + } + + utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName, "") + + kcpClusterClient := utils.GetKcpAdminClusterClient(t) + teamClusterPath := logicalcluster.NewPath("root").Join("related-status-from-kcp").Join("team-1") + teamClient := kcpClusterClient.Cluster(teamClusterPath) + + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ + Group: apiExportName, + Version: "v1", + Kind: "CronTab", + }) + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ + Group: apiExportName, + Version: "v1", + Kind: "Subnet", + }) + + // Create the Subnet in kcp first (origin: kcp means the user creates it there) and set its + // status before creating the CronTab. This ensures the status is already present when the + // agent's second reconciliation cycle (after creating the service cluster copy) runs, so + // syncObjectStatusForward can see it without needing a Watch trigger. + kcpSubnet := makeSubnet("my-subnet", "default") + kcpSubnet.SetAPIVersion("kcp.example.com/v1") + kcpSubnet.SetKind("Subnet") + if err := teamClient.Create(ctx, kcpSubnet); err != nil { + t.Fatalf("Failed to create Subnet in kcp: %v", err) + } + + // Simulate a controller in kcp setting the subnet's status (e.g. the provisioned subnet ID). + if err := unstructured.SetNestedField(kcpSubnet.Object, "subnet-kcp-12345", "status", "id"); err != nil { + t.Fatalf("Failed to set status.id: %v", err) + } + if err := teamClient.Status().Update(ctx, kcpSubnet); err != nil { + t.Fatalf("Failed to update Subnet status in kcp: %v", err) + } + + // Create the primary CronTab in kcp after the Subnet is ready. + crontab := utils.ToUnstructured(t, &crds.Crontab{ + TypeMeta: metav1.TypeMeta{APIVersion: "kcp.example.com/v1", Kind: "CronTab"}, + ObjectMeta: metav1.ObjectMeta{Name: "my-crontab", Namespace: "default"}, + Spec: crds.CrontabSpec{CronSpec: "* * *", Image: "ubuntu:latest"}, + }) + if err := teamClient.Create(ctx, crontab); err != nil { + t.Fatalf("Failed to create CronTab: %v", err) + } + + // Wait for the Subnet copy to appear in the service cluster. + t.Log("Waiting for Subnet copy to appear in the service cluster…") + serviceSubnet := makeSubnet("my-subnet", "synced-default") + serviceSubnet.SetAPIVersion(subnetAPIVersion) + serviceSubnet.SetKind(subnetKind) + + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { + return envtestClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "synced-default"}, serviceSubnet) == nil, nil + }) + if err != nil { + t.Fatalf("Subnet copy never appeared in service cluster: %v", err) + } + + // Verify the status was forwarded from kcp to the service cluster. + t.Log("Waiting for Subnet status to be synced to service cluster…") + err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { + if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "synced-default"}, serviceSubnet); err != nil { + return false, nil + } + + id, _, _ := unstructured.NestedString(serviceSubnet.Object, "status", "id") + return id == "subnet-kcp-12345", nil + }) + if err != nil { + id, _, _ := unstructured.NestedString(serviceSubnet.Object, "status", "id") + t.Fatalf("Subnet status was not synced to service cluster (got status.id=%q, want %q)", id, "subnet-kcp-12345") + } + + // Verify that updating status on the service cluster does NOT propagate back to kcp + // (there is no reverse sync for origin:kcp + syncStatus). + serviceSubnet2 := makeSubnet("my-subnet", "synced-default") + serviceSubnet2.SetAPIVersion(subnetAPIVersion) + serviceSubnet2.SetKind(subnetKind) + if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "synced-default"}, serviceSubnet2); err != nil { + t.Fatalf("Failed to re-fetch service cluster Subnet: %v", err) + } + if err := unstructured.SetNestedField(serviceSubnet2.Object, "subnet-local-override", "status", "id"); err != nil { + t.Fatalf("Failed to set status.id: %v", err) + } + if err := envtestClient.Status().Update(ctx, serviceSubnet2); err != nil { + t.Fatalf("Failed to update service cluster Subnet status: %v", err) + } + + // Briefly poll kcp to confirm its status was not overwritten. + kcpSubnetCheck := makeSubnet("my-subnet", "default") + kcpSubnetCheck.SetAPIVersion("kcp.example.com/v1") + kcpSubnetCheck.SetKind("Subnet") + if err := teamClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "default"}, kcpSubnetCheck); err != nil { + t.Fatalf("Failed to get kcp Subnet: %v", err) + } + kcpID, _, _ := unstructured.NestedString(kcpSubnetCheck.Object, "status", "id") + if kcpID != "subnet-kcp-12345" { + t.Fatalf("kcp Subnet status was unexpectedly modified (got %q, want %q)", kcpID, "subnet-kcp-12345") + } +} + +// TestSyncRelatedObjectStatusFromService verifies that when syncStatus: true is set on a related +// resource with origin: service, the status is propagated from the service cluster to kcp. +func TestSyncRelatedObjectStatusFromService(t *testing.T) { + const apiExportName = "kcp.example.com" + + ctrlruntime.SetLogger(logr.Discard()) + + ctx := t.Context() + + orgKubconfig := utils.CreateOrganization(t, ctx, "related-status-from-service", apiExportName) + + envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ + "test/crds/crontab.yaml", + "test/crds/subnet.yaml", + }) + + // Publish Subnets (with sync disabled) so the Subnet schema is available in kcp + // and the agent can create kcp-side copies when syncing service-origin related resources. + prSubnets := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-subnets"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "aws.example.com", + Version: "v1", + Kind: "Subnet", + }, + Projection: &syncagentv1alpha1.ResourceProjection{ + Group: "kcp.example.com", + }, + Synchronization: &syncagentv1alpha1.SynchronizationSpec{Enabled: false}, + }, + } + if err := envtestClient.Create(ctx, prSubnets); err != nil { + t.Fatalf("Failed to create Subnet PublishedResource: %v", err) + } + + // Publish CronTabs with a related Subnet (origin: service, syncStatus: true). + prCrontabs := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabs"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTab", + }, + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "{{ .Object.metadata.name }}", + Namespace: "synced-{{ .Object.metadata.namespace }}", + }, + Projection: &syncagentv1alpha1.ResourceProjection{Group: "kcp.example.com"}, + Related: []syncagentv1alpha1.RelatedResourceSpec{ + { + Identifier: "subnet", + Origin: syncagentv1alpha1.RelatedResourceOriginService, + Group: "aws.example.com", + Version: "v1", + Resource: "subnets", + Projection: &syncagentv1alpha1.RelatedResourceProjection{ + Group: "kcp.example.com", + }, + SyncStatus: true, + // Watch triggers CronTab reconciliation whenever a service cluster Subnet + // changes. This also registers a Subnet informer in the local cache, so + // syncObjectStatusForward can read the up-to-date status from the cache. + Watch: &syncagentv1alpha1.RelatedResourceWatch{ + BySelector: &metav1.LabelSelector{}, + }, + Object: syncagentv1alpha1.RelatedResourceObject{ + RelatedResourceObjectSpec: syncagentv1alpha1.RelatedResourceObjectSpec{ + Template: &syncagentv1alpha1.TemplateExpression{ + Template: "my-subnet", + }, + }, + }, + }, + }, + }, + } + if err := envtestClient.Create(ctx, prCrontabs); err != nil { + t.Fatalf("Failed to create CronTab PublishedResource: %v", err) + } + + utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName, "") + + kcpClusterClient := utils.GetKcpAdminClusterClient(t) + teamClusterPath := logicalcluster.NewPath("root").Join("related-status-from-service").Join("team-1") + teamClient := kcpClusterClient.Cluster(teamClusterPath) + + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ + Group: apiExportName, + Version: "v1", + Kind: "CronTab", + }) + + // Create the primary CronTab in kcp. + crontab := utils.ToUnstructured(t, &crds.Crontab{ + TypeMeta: metav1.TypeMeta{APIVersion: "kcp.example.com/v1", Kind: "CronTab"}, + ObjectMeta: metav1.ObjectMeta{Name: "my-crontab", Namespace: "default"}, + Spec: crds.CrontabSpec{CronSpec: "* * *", Image: "ubuntu:latest"}, + }) + if err := teamClient.Create(ctx, crontab); err != nil { + t.Fatalf("Failed to create CronTab: %v", err) + } + + // Simulate a service-cluster operator creating the Subnet and setting its status. + ensureNamespace(t, ctx, envtestClient, "synced-default") + + serviceSubnet := makeSubnet("my-subnet", "synced-default") + if err := envtestClient.Create(ctx, serviceSubnet); err != nil { + t.Fatalf("Failed to create Subnet in service cluster: %v", err) + } + + // Set the status (e.g. the real AWS subnet ID returned after provisioning). + if err := unstructured.SetNestedField(serviceSubnet.Object, "subnet-svc-99999", "status", "id"); err != nil { + t.Fatalf("Failed to set status.id: %v", err) + } + if err := envtestClient.Status().Update(ctx, serviceSubnet); err != nil { + t.Fatalf("Failed to update Subnet status: %v", err) + } + + // Wait for the kcp copy to appear. + t.Log("Waiting for Subnet copy to appear in kcp…") + kcpSubnet := &unstructured.Unstructured{} + kcpSubnet.SetAPIVersion("kcp.example.com/v1") + kcpSubnet.SetKind("Subnet") + + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { + return teamClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "default"}, kcpSubnet) == nil, nil + }) + if err != nil { + t.Fatalf("Subnet copy never appeared in kcp: %v", err) + } + + // Wait for the status to be synced to kcp. + t.Log("Waiting for Subnet status to be synced to kcp…") + err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { + if err := teamClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "default"}, kcpSubnet); err != nil { + return false, nil + } + + id, _, _ := unstructured.NestedString(kcpSubnet.Object, "status", "id") + return id == "subnet-svc-99999", nil + }) + if err != nil { + id, _, _ := unstructured.NestedString(kcpSubnet.Object, "status", "id") + t.Fatalf("Subnet status was not synced to kcp (got status.id=%q, want %q)", id, "subnet-svc-99999") + } +} + +// TestSyncRelatedObjectStatusNotSyncedByDefault verifies that when syncStatus is not set (the +// default), status changes on the origin side do not appear on the destination copy. +func TestSyncRelatedObjectStatusNotSyncedByDefault(t *testing.T) { + const apiExportName = "kcp.example.com" + + ctrlruntime.SetLogger(logr.Discard()) + + ctx := t.Context() + + orgKubconfig := utils.CreateOrganization(t, ctx, "related-status-not-synced", apiExportName) + + envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ + "test/crds/crontab.yaml", + "test/crds/subnet.yaml", + }) + + prSubnets := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-subnets"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "aws.example.com", + Version: "v1", + Kind: "Subnet", + }, + Projection: &syncagentv1alpha1.ResourceProjection{ + Group: "kcp.example.com", + }, + Synchronization: &syncagentv1alpha1.SynchronizationSpec{Enabled: false}, + }, + } + if err := envtestClient.Create(ctx, prSubnets); err != nil { + t.Fatalf("Failed to create Subnet PublishedResource: %v", err) + } + + // SyncStatus is intentionally NOT set here. + prCrontabs := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabs"}, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTab", + }, + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "{{ .Object.metadata.name }}", + Namespace: "synced-{{ .Object.metadata.namespace }}", + }, + Projection: &syncagentv1alpha1.ResourceProjection{Group: "kcp.example.com"}, + Related: []syncagentv1alpha1.RelatedResourceSpec{ + { + Identifier: "subnet", + Origin: syncagentv1alpha1.RelatedResourceOriginKcp, + Group: "kcp.example.com", + Version: "v1", + Resource: "subnets", + Projection: &syncagentv1alpha1.RelatedResourceProjection{ + Group: "aws.example.com", + }, + // SyncStatus: false (default) + Object: syncagentv1alpha1.RelatedResourceObject{ + RelatedResourceObjectSpec: syncagentv1alpha1.RelatedResourceObjectSpec{ + Template: &syncagentv1alpha1.TemplateExpression{ + Template: "my-subnet", + }, + }, + }, + }, + }, + }, + } + if err := envtestClient.Create(ctx, prCrontabs); err != nil { + t.Fatalf("Failed to create CronTab PublishedResource: %v", err) + } + + utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName, "") + + kcpClusterClient := utils.GetKcpAdminClusterClient(t) + teamClusterPath := logicalcluster.NewPath("root").Join("related-status-not-synced").Join("team-1") + teamClient := kcpClusterClient.Cluster(teamClusterPath) + + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ + Group: apiExportName, + Version: "v1", + Kind: "CronTab", + }) + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ + Group: apiExportName, + Version: "v1", + Kind: "Subnet", + }) + + // Create primary CronTab in kcp. + crontab := utils.ToUnstructured(t, &crds.Crontab{ + TypeMeta: metav1.TypeMeta{APIVersion: "kcp.example.com/v1", Kind: "CronTab"}, + ObjectMeta: metav1.ObjectMeta{Name: "my-crontab", Namespace: "default"}, + Spec: crds.CrontabSpec{CronSpec: "* * *", Image: "ubuntu:latest"}, + }) + if err := teamClient.Create(ctx, crontab); err != nil { + t.Fatalf("Failed to create CronTab: %v", err) + } + + // Create the Subnet in kcp and set its status. + kcpSubnet := makeSubnet("my-subnet", "default") + kcpSubnet.SetAPIVersion("kcp.example.com/v1") + kcpSubnet.SetKind("Subnet") + if err := teamClient.Create(ctx, kcpSubnet); err != nil { + t.Fatalf("Failed to create Subnet in kcp: %v", err) + } + if err := unstructured.SetNestedField(kcpSubnet.Object, "subnet-should-not-appear", "status", "id"); err != nil { + t.Fatalf("Failed to set status.id: %v", err) + } + if err := teamClient.Status().Update(ctx, kcpSubnet); err != nil { + t.Fatalf("Failed to update Subnet status in kcp: %v", err) + } + + // Wait for the service cluster copy to appear. + t.Log("Waiting for Subnet copy to appear in service cluster…") + serviceSubnet := makeSubnet("my-subnet", "synced-default") + serviceSubnet.SetAPIVersion(subnetAPIVersion) + serviceSubnet.SetKind(subnetKind) + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { + return envtestClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "synced-default"}, serviceSubnet) == nil, nil + }) + if err != nil { + t.Fatalf("Subnet copy never appeared in service cluster: %v", err) + } + + // Give the agent a moment to settle and confirm status is not present. + // We wait a short time for any potential erroneous status sync to occur. + var finalServiceSubnet unstructured.Unstructured + finalServiceSubnet.SetAPIVersion(subnetAPIVersion) + finalServiceSubnet.SetKind(subnetKind) + + // Poll briefly to check that no status appears. + statusSeen := false + _ = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) { + if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "synced-default"}, &finalServiceSubnet); err != nil { + return false, nil + } + id, exists, _ := unstructured.NestedString(finalServiceSubnet.Object, "status", "id") + if exists && id != "" { + statusSeen = true + return true, nil + } + return false, nil + }) + + if statusSeen { + id, _, _ := unstructured.NestedString(finalServiceSubnet.Object, "status", "id") + t.Fatalf("Subnet status was unexpectedly synced to service cluster (got status.id=%q)", id) + } +} From 79025d5b2d00ee0bbab835da6f121bfd680f9c16 Mon Sep 17 00:00:00 2001 From: Iskren Date: Tue, 30 Jun 2026 12:06:20 +0300 Subject: [PATCH 2/3] handle missing status subresource and add unit tests for syncObjectStatusForward Signed-off-by: Iskren --- .../syncagent.kcp.io_publishedresources.yaml | 5 + internal/sync/object_syncer.go | 6 + internal/sync/object_syncer_test.go | 226 ++++++++++++++++++ .../syncagent/v1alpha1/published_resource.go | 5 + 4 files changed, 242 insertions(+) create mode 100644 internal/sync/object_syncer_test.go diff --git a/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml b/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml index a20d522e..0fb9f1f3 100644 --- a/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml +++ b/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml @@ -789,6 +789,11 @@ spec: - origin: kcp -> status is synced from kcp to the service cluster - origin: service -> status is synced from the service cluster to kcp + + For origin: service, it is strongly recommended to also configure Watch so that + changes to the related resource's status trigger reconciliation and ensure the + informer cache is populated. Without Watch, status is only synced as a side effect + of the primary object being reconciled, which may be infrequent. type: boolean version: description: |- diff --git a/internal/sync/object_syncer.go b/internal/sync/object_syncer.go index 6e43393b..b8f6d892 100644 --- a/internal/sync/object_syncer.go +++ b/internal/sync/object_syncer.go @@ -348,6 +348,12 @@ func (s *objectSyncer) syncObjectStatusForward(ctx context.Context, log *zap.Sug log.Debug("Updating destination object status…") if err := dest.client.Status().Update(ctx, dest.object); err != nil { + if apierrors.IsNotFound(err) { + // The /status subresource does not exist on the destination CRD. + // Retrying will not help; emit a warning so the user knows what to fix. + s.recordEvent(ctx, source, dest, corev1.EventTypeWarning, "StatusSubresourceMissing", "Cannot sync status: the destination resource does not have a status subresource. Set syncStatus: false or add subresources.status to the CRD.") + return false, nil + } return false, fmt.Errorf("failed to update destination object status: %w", err) } diff --git a/internal/sync/object_syncer_test.go b/internal/sync/object_syncer_test.go new file mode 100644 index 00000000..eb81f046 --- /dev/null +++ b/internal/sync/object_syncer_test.go @@ -0,0 +1,226 @@ +/* +Copyright 2026 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "context" + "testing" + + "go.uber.org/zap" + + "github.com/kcp-dev/logicalcluster/v3" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/tools/record" +) + +// fakeStateStore is a minimal ObjectStateStore for unit tests. +type fakeStateStore struct { + lastKnown *unstructured.Unstructured +} + +func (f *fakeStateStore) Get(_ context.Context, _ syncSide) (*unstructured.Unstructured, error) { + return f.lastKnown, nil +} + +func (f *fakeStateStore) Put(_ context.Context, _ *unstructured.Unstructured, _ logicalcluster.Name, _ []string) error { + return nil +} + +func makeUnstructuredWithStatus(name, namespace string, status map[string]interface{}) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion("test.example.com/v1") + obj.SetKind("Widget") + obj.SetName(name) + obj.SetNamespace(namespace) + if status != nil { + if err := unstructured.SetNestedMap(obj.Object, status, "status"); err != nil { + panic(err) + } + } + return obj +} + +func TestSyncObjectStatusForward(t *testing.T) { + log := zap.NewNop().Sugar() + + t.Run("no-op when syncStatusForward is false", func(t *testing.T) { + source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"}) + dest := makeUnstructuredWithStatus("dst", "default", nil) + destClient := buildFakeClientWithStatus(dest) + + syncer := &objectSyncer{syncStatusForward: false} + ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) + + _, err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + got := dest.UnstructuredContent()["status"] + if got != nil { + t.Errorf("expected dest status to remain nil, got %v", got) + } + }) + + t.Run("no-op when dest object is nil", func(t *testing.T) { + source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"}) + + syncer := &objectSyncer{syncStatusForward: true} + ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) + + _, err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: nil}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("syncs status from source to dest when they differ", func(t *testing.T) { + source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"}) + dest := makeUnstructuredWithStatus("dst", "default", nil) + destClient := buildFakeClientWithStatus(dest) + + syncer := &objectSyncer{syncStatusForward: true, eventObjSide: syncSideSource} + ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) + + _, err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + phase, _, _ := unstructured.NestedString(dest.Object, "status", "phase") + if phase != "ready" { + t.Errorf("expected dest status.phase=ready, got %q", phase) + } + }) + + t.Run("no-op when status is already equal", func(t *testing.T) { + status := map[string]interface{}{"phase": "ready"} + source := makeUnstructuredWithStatus("src", "default", status) + dest := makeUnstructuredWithStatus("dst", "default", status) + destClient := buildFakeClientWithStatus(dest) + + syncer := &objectSyncer{syncStatusForward: true} + ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) + + _, err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("emits warning and returns nil when dest has no status subresource (404)", func(t *testing.T) { + source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"}) + dest := makeUnstructuredWithStatus("dst", "default", nil) + + // Build a client that does NOT register the status subresource, so Status().Update() → 404. + destClient := buildFakeClient(dest) + + syncer := &objectSyncer{syncStatusForward: true, eventObjSide: syncSideSource} + recorder := record.NewFakeRecorder(10) + ctx := WithEventRecorder(t.Context(), recorder) + + requeue, err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + if err != nil { + t.Fatalf("expected no error on 404, got: %v", err) + } + if requeue { + t.Error("expected requeue=false on 404") + } + + select { + case event := <-recorder.Events: + if event == "" { + t.Error("expected a Warning event to be emitted") + } + default: + t.Error("expected a Warning event to be emitted but channel was empty") + } + }) +} + +// TestSyncObjectContentsForwardStatusRunsEvenOnSpecRequeue verifies the key invariant: +// syncObjectStatusForward is called unconditionally in syncObjectContents, even when +// syncObjectSpec already returned requeue=true due to a pending spec change. +// A future refactor that re-introduces an early return after spec sync would silently +// break status sync for simultaneous spec+status changes. +func TestSyncObjectContentsForwardStatusRunsEvenOnSpecRequeue(t *testing.T) { + log := zap.NewNop().Sugar() + + // Source: new spec + status set. + source := &unstructured.Unstructured{} + source.SetAPIVersion("test.example.com/v1") + source.SetKind("Widget") + source.SetName("my-widget") + source.SetNamespace("default") + if err := unstructured.SetNestedField(source.Object, "new-value", "spec", "username"); err != nil { + t.Fatal(err) + } + if err := unstructured.SetNestedField(source.Object, "ready", "status", "phase"); err != nil { + t.Fatal(err) + } + + // Dest: old spec, no status. + dest := &unstructured.Unstructured{} + dest.SetAPIVersion("test.example.com/v1") + dest.SetKind("Widget") + dest.SetName("my-widget") + dest.SetNamespace("default") + if err := unstructured.SetNestedField(dest.Object, "old-value", "spec", "username"); err != nil { + t.Fatal(err) + } + + destClient := buildFakeClientWithStatus(dest) + + // Last known source state has old spec — so syncObjectSpec sees a diff and patches, returning requeue=true. + lastKnown := &unstructured.Unstructured{} + lastKnown.SetAPIVersion("test.example.com/v1") + lastKnown.SetKind("Widget") + lastKnown.SetName("my-widget") + if err := unstructured.SetNestedField(lastKnown.Object, "old-value", "spec", "username"); err != nil { + t.Fatal(err) + } + + syncer := &objectSyncer{ + stateStore: &fakeStateStore{lastKnown: lastKnown}, + syncStatusForward: true, + subresources: []string{"status"}, + destCreator: func(u *unstructured.Unstructured) (*unstructured.Unstructured, error) { return u.DeepCopy(), nil }, + eventObjSide: syncSideSource, + } + + recorder := record.NewFakeRecorder(10) + ctx := WithEventRecorder(t.Context(), recorder) + ctx = WithClusterName(ctx, logicalcluster.Name("testcluster")) + + sourceSide := syncSide{object: source, clusterName: "testcluster"} + destSide := syncSide{object: dest, client: destClient} + + requeue, err := syncer.syncObjectContents(ctx, log, sourceSide, destSide) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !requeue { + t.Error("expected requeue=true because spec changed, but got false") + } + + // Even though spec sync returned requeue=true, the dest status must have been updated. + phase, _, _ := unstructured.NestedString(dest.Object, "status", "phase") + if phase != "ready" { + t.Errorf("expected dest status.phase=ready after syncObjectContents, got %q — forward status sync did not run on spec requeue", phase) + } +} diff --git a/sdk/apis/syncagent/v1alpha1/published_resource.go b/sdk/apis/syncagent/v1alpha1/published_resource.go index 6f0cec04..f8d3a6cc 100644 --- a/sdk/apis/syncagent/v1alpha1/published_resource.go +++ b/sdk/apis/syncagent/v1alpha1/published_resource.go @@ -277,6 +277,11 @@ type RelatedResourceSpec struct { // - origin: kcp -> status is synced from kcp to the service cluster // - origin: service -> status is synced from the service cluster to kcp // + // For origin: service, it is strongly recommended to also configure Watch so that + // changes to the related resource's status trigger reconciliation and ensure the + // informer cache is populated. Without Watch, status is only synced as a side effect + // of the primary object being reconciled, which may be infrequent. + // // +optional SyncStatus bool `json:"syncStatus,omitempty"` } From 4a454632614bc02a937e668bd5bd5e30808f77b2 Mon Sep 17 00:00:00 2001 From: Iskren Date: Tue, 30 Jun 2026 17:15:20 +0300 Subject: [PATCH 3/3] enforce watch when syncStatus and origin: service; simplify syncObjectStatusForward Signed-off-by: Iskren resolved nitpick --- .../syncagent.kcp.io_publishedresources.yaml | 8 +- internal/sync/object_syncer.go | 14 +- internal/sync/object_syncer_test.go | 13 +- .../syncagent/v1alpha1/published_resource.go | 7 +- .../{subnet.yaml => crontabswithstatus.yaml} | 10 +- test/e2e/sdk/cel_validations_test.go | 33 +++ test/e2e/sync/related_status_test.go | 261 +++++++++--------- 7 files changed, 185 insertions(+), 161 deletions(-) rename test/crds/{subnet.yaml => crontabswithstatus.yaml} (77%) diff --git a/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml b/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml index 0fb9f1f3..59224836 100644 --- a/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml +++ b/deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml @@ -790,10 +790,8 @@ spec: - origin: kcp -> status is synced from kcp to the service cluster - origin: service -> status is synced from the service cluster to kcp - For origin: service, it is strongly recommended to also configure Watch so that - changes to the related resource's status trigger reconciliation and ensure the - informer cache is populated. Without Watch, status is only synced as a side effect - of the primary object being reconciled, which may be infrequent. + For origin: service, Watch must also be configured so that changes to the related + resource's status trigger reconciliation and ensure the informer cache is populated. type: boolean version: description: |- @@ -878,6 +876,8 @@ spec: rule: '!has(self.group) || (has(self.resource) && has(self.version))' - message: identity hashes can only be used with GVRs rule: '!has(self.identityHash) || (has(self.group) && has(self.version) && has(self.resource))' + - message: watch must be configured when origin is service and syncStatus is true + rule: '!(self.origin == ''service'' && has(self.syncStatus) && self.syncStatus) || has(self.watch)' type: array resource: description: |- diff --git a/internal/sync/object_syncer.go b/internal/sync/object_syncer.go index b8f6d892..a362e635 100644 --- a/internal/sync/object_syncer.go +++ b/internal/sync/object_syncer.go @@ -205,7 +205,7 @@ func (s *objectSyncer) syncObjectContents(ctx context.Context, log *zap.SugaredL // Always attempt forward status sync regardless of whether spec was just updated, // so that a simultaneous spec+status change doesn't defer the status by one cycle. - if _, err = s.syncObjectStatusForward(ctx, log, source, dest); err != nil { + if err = s.syncObjectStatusForward(ctx, log, source, dest); err != nil { return requeue, err } @@ -335,9 +335,9 @@ func (s *objectSyncer) syncObjectStatus(ctx context.Context, log *zap.SugaredLog return false, nil } -func (s *objectSyncer) syncObjectStatusForward(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) (requeue bool, err error) { +func (s *objectSyncer) syncObjectStatusForward(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) error { if !s.syncStatusForward || dest.object == nil { - return false, nil + return nil } sourceContent := source.object.UnstructuredContent() @@ -352,15 +352,13 @@ func (s *objectSyncer) syncObjectStatusForward(ctx context.Context, log *zap.Sug // The /status subresource does not exist on the destination CRD. // Retrying will not help; emit a warning so the user knows what to fix. s.recordEvent(ctx, source, dest, corev1.EventTypeWarning, "StatusSubresourceMissing", "Cannot sync status: the destination resource does not have a status subresource. Set syncStatus: false or add subresources.status to the CRD.") - return false, nil + return nil } - return false, fmt.Errorf("failed to update destination object status: %w", err) + return fmt.Errorf("failed to update destination object status: %w", err) } - - s.recordEvent(ctx, source, dest, corev1.EventTypeNormal, "ObjectStatusSynced", "The current object status has been synchronized to the destination.") } - return false, nil + return nil } func (s *objectSyncer) ensureDestinationObject(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) error { diff --git a/internal/sync/object_syncer_test.go b/internal/sync/object_syncer_test.go index eb81f046..3fdb40d3 100644 --- a/internal/sync/object_syncer_test.go +++ b/internal/sync/object_syncer_test.go @@ -66,7 +66,7 @@ func TestSyncObjectStatusForward(t *testing.T) { syncer := &objectSyncer{syncStatusForward: false} ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) - _, err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -83,7 +83,7 @@ func TestSyncObjectStatusForward(t *testing.T) { syncer := &objectSyncer{syncStatusForward: true} ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) - _, err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: nil}) + err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: nil}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -97,7 +97,7 @@ func TestSyncObjectStatusForward(t *testing.T) { syncer := &objectSyncer{syncStatusForward: true, eventObjSide: syncSideSource} ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) - _, err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -117,7 +117,7 @@ func TestSyncObjectStatusForward(t *testing.T) { syncer := &objectSyncer{syncStatusForward: true} ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10)) - _, err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -134,13 +134,10 @@ func TestSyncObjectStatusForward(t *testing.T) { recorder := record.NewFakeRecorder(10) ctx := WithEventRecorder(t.Context(), recorder) - requeue, err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) + err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient}) if err != nil { t.Fatalf("expected no error on 404, got: %v", err) } - if requeue { - t.Error("expected requeue=false on 404") - } select { case event := <-recorder.Events: diff --git a/sdk/apis/syncagent/v1alpha1/published_resource.go b/sdk/apis/syncagent/v1alpha1/published_resource.go index f8d3a6cc..77737290 100644 --- a/sdk/apis/syncagent/v1alpha1/published_resource.go +++ b/sdk/apis/syncagent/v1alpha1/published_resource.go @@ -210,6 +210,7 @@ const ( // +kubebuilder:validation:XValidation:rule="!has(self.group) || (has(self.resource) && has(self.version))",message="configuring a group also requires a version and resource" // group is included here because when an identityHash is used, core/v1 cannot possible be targetted // +kubebuilder:validation:XValidation:rule="!has(self.identityHash) || (has(self.group) && has(self.version) && has(self.resource))",message="identity hashes can only be used with GVRs" +// +kubebuilder:validation:XValidation:rule="!(self.origin == 'service' && has(self.syncStatus) && self.syncStatus) || has(self.watch)",message="watch must be configured when origin is service and syncStatus is true" type RelatedResourceSpec struct { // Identifier is a unique name for this related resource. The name must be unique within one // PublishedResource and is the key by which consumers (end users) can identify and consume the @@ -277,10 +278,8 @@ type RelatedResourceSpec struct { // - origin: kcp -> status is synced from kcp to the service cluster // - origin: service -> status is synced from the service cluster to kcp // - // For origin: service, it is strongly recommended to also configure Watch so that - // changes to the related resource's status trigger reconciliation and ensure the - // informer cache is populated. Without Watch, status is only synced as a side effect - // of the primary object being reconciled, which may be infrequent. + // For origin: service, Watch must also be configured so that changes to the related + // resource's status trigger reconciliation and ensure the informer cache is populated. // // +optional SyncStatus bool `json:"syncStatus,omitempty"` diff --git a/test/crds/subnet.yaml b/test/crds/crontabswithstatus.yaml similarity index 77% rename from test/crds/subnet.yaml rename to test/crds/crontabswithstatus.yaml index 1602e018..4f5ea0b9 100644 --- a/test/crds/subnet.yaml +++ b/test/crds/crontabswithstatus.yaml @@ -1,14 +1,14 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: - name: subnets.aws.example.com + name: crontabswithstatus.example.com spec: - group: aws.example.com + group: example.com scope: Namespaced names: - plural: subnets - singular: subnet - kind: Subnet + plural: crontabswithstatus + singular: crontabwithstatus + kind: CronTabWithStatus versions: - name: v1 served: true diff --git a/test/e2e/sdk/cel_validations_test.go b/test/e2e/sdk/cel_validations_test.go index 3fa3c36d..96da0369 100644 --- a/test/e2e/sdk/cel_validations_test.go +++ b/test/e2e/sdk/cel_validations_test.go @@ -176,6 +176,39 @@ func TestValidateRelatedResourceSpec(t *testing.T) { IdentityHash: "abc123", }, }, + { + name: "syncStatus with origin service requires watch", + valid: false, + spec: syncagentv1alpha1.RelatedResourceSpec{ + Origin: syncagentv1alpha1.RelatedResourceOriginService, + Resource: "things", + Version: "v1", + SyncStatus: true, + }, + }, + { + name: "syncStatus with origin service and watch is valid", + valid: true, + spec: syncagentv1alpha1.RelatedResourceSpec{ + Origin: syncagentv1alpha1.RelatedResourceOriginService, + Resource: "things", + Version: "v1", + SyncStatus: true, + Watch: &syncagentv1alpha1.RelatedResourceWatch{ + ByOwner: &syncagentv1alpha1.RelatedResourceWatchByOwner{}, + }, + }, + }, + { + name: "syncStatus with origin kcp does not require watch", + valid: true, + spec: syncagentv1alpha1.RelatedResourceSpec{ + Origin: syncagentv1alpha1.RelatedResourceOriginKcp, + Resource: "things", + Version: "v1", + SyncStatus: true, + }, + }, } alphaNum := regexp.MustCompile(`[^a-z0-9]`) diff --git a/test/e2e/sync/related_status_test.go b/test/e2e/sync/related_status_test.go index 3c639c78..0cf8aa78 100644 --- a/test/e2e/sync/related_status_test.go +++ b/test/e2e/sync/related_status_test.go @@ -39,13 +39,13 @@ import ( ctrlruntime "sigs.k8s.io/controller-runtime" ) -const subnetAPIVersion = "aws.example.com/v1" -const subnetKind = "Subnet" +const crontabWithStatusAPIVersion = "example.com/v1" +const crontabWithStatusKind = "CronTabWithStatus" -func makeSubnet(name, namespace string) *unstructured.Unstructured { +func makeCronTabWithStatus(name, namespace string) *unstructured.Unstructured { obj := &unstructured.Unstructured{} - obj.SetAPIVersion(subnetAPIVersion) - obj.SetKind(subnetKind) + obj.SetAPIVersion(crontabWithStatusAPIVersion) + obj.SetKind(crontabWithStatusKind) obj.SetName(name) obj.SetNamespace(namespace) return obj @@ -64,17 +64,17 @@ func TestSyncRelatedObjectStatusFromKcp(t *testing.T) { envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ "test/crds/crontab.yaml", - "test/crds/subnet.yaml", + "test/crds/crontabswithstatus.yaml", }) - // Publish Subnets (with sync disabled) so the Subnet schema is available in kcp. - prSubnets := &syncagentv1alpha1.PublishedResource{ - ObjectMeta: metav1.ObjectMeta{Name: "publish-subnets"}, + // Publish CronTabsWithStatus (with sync disabled) so the schema is available in kcp. + prCronTabsWithStatus := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabswithstatus"}, Spec: syncagentv1alpha1.PublishedResourceSpec{ Resource: syncagentv1alpha1.SourceResourceDescriptor{ - APIGroup: "aws.example.com", + APIGroup: "example.com", Version: "v1", - Kind: "Subnet", + Kind: "CronTabWithStatus", }, Projection: &syncagentv1alpha1.ResourceProjection{ Group: "kcp.example.com", @@ -82,11 +82,11 @@ func TestSyncRelatedObjectStatusFromKcp(t *testing.T) { Synchronization: &syncagentv1alpha1.SynchronizationSpec{Enabled: false}, }, } - if err := envtestClient.Create(ctx, prSubnets); err != nil { - t.Fatalf("Failed to create Subnet PublishedResource: %v", err) + if err := envtestClient.Create(ctx, prCronTabsWithStatus); err != nil { + t.Fatalf("Failed to create CronTabWithStatus PublishedResource: %v", err) } - // Publish CronTabs with a related Subnet (origin: kcp, syncStatus: true). + // Publish CronTabs with a related CronTabWithStatus (origin: kcp, syncStatus: true). prCrontabs := &syncagentv1alpha1.PublishedResource{ ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabs"}, Spec: syncagentv1alpha1.PublishedResourceSpec{ @@ -102,19 +102,19 @@ func TestSyncRelatedObjectStatusFromKcp(t *testing.T) { Projection: &syncagentv1alpha1.ResourceProjection{Group: "kcp.example.com"}, Related: []syncagentv1alpha1.RelatedResourceSpec{ { - Identifier: "subnet", + Identifier: "crontabwithstatus", Origin: syncagentv1alpha1.RelatedResourceOriginKcp, Group: "kcp.example.com", Version: "v1", - Resource: "subnets", + Resource: "crontabswithstatus", Projection: &syncagentv1alpha1.RelatedResourceProjection{ - Group: "aws.example.com", + Group: "example.com", }, SyncStatus: true, Object: syncagentv1alpha1.RelatedResourceObject{ RelatedResourceObjectSpec: syncagentv1alpha1.RelatedResourceObjectSpec{ Template: &syncagentv1alpha1.TemplateExpression{ - Template: "my-subnet", + Template: "my-related", }, }, }, @@ -140,29 +140,29 @@ func TestSyncRelatedObjectStatusFromKcp(t *testing.T) { utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ Group: apiExportName, Version: "v1", - Kind: "Subnet", + Kind: "CronTabWithStatus", }) - // Create the Subnet in kcp first (origin: kcp means the user creates it there) and set its - // status before creating the CronTab. This ensures the status is already present when the + // Create the related object in kcp first (origin: kcp means the user creates it there) and set + // its status before creating the CronTab. This ensures the status is already present when the // agent's second reconciliation cycle (after creating the service cluster copy) runs, so // syncObjectStatusForward can see it without needing a Watch trigger. - kcpSubnet := makeSubnet("my-subnet", "default") - kcpSubnet.SetAPIVersion("kcp.example.com/v1") - kcpSubnet.SetKind("Subnet") - if err := teamClient.Create(ctx, kcpSubnet); err != nil { - t.Fatalf("Failed to create Subnet in kcp: %v", err) + kcpObj := makeCronTabWithStatus("my-related", "default") + kcpObj.SetAPIVersion("kcp.example.com/v1") + kcpObj.SetKind("CronTabWithStatus") + if err := teamClient.Create(ctx, kcpObj); err != nil { + t.Fatalf("Failed to create CronTabWithStatus in kcp: %v", err) } - // Simulate a controller in kcp setting the subnet's status (e.g. the provisioned subnet ID). - if err := unstructured.SetNestedField(kcpSubnet.Object, "subnet-kcp-12345", "status", "id"); err != nil { + // Simulate a controller in kcp setting the object's status. + if err := unstructured.SetNestedField(kcpObj.Object, "id-kcp-12345", "status", "id"); err != nil { t.Fatalf("Failed to set status.id: %v", err) } - if err := teamClient.Status().Update(ctx, kcpSubnet); err != nil { - t.Fatalf("Failed to update Subnet status in kcp: %v", err) + if err := teamClient.Status().Update(ctx, kcpObj); err != nil { + t.Fatalf("Failed to update CronTabWithStatus status in kcp: %v", err) } - // Create the primary CronTab in kcp after the Subnet is ready. + // Create the primary CronTab in kcp after the related object is ready. crontab := utils.ToUnstructured(t, &crds.Crontab{ TypeMeta: metav1.TypeMeta{APIVersion: "kcp.example.com/v1", Kind: "CronTab"}, ObjectMeta: metav1.ObjectMeta{Name: "my-crontab", Namespace: "default"}, @@ -172,59 +172,59 @@ func TestSyncRelatedObjectStatusFromKcp(t *testing.T) { t.Fatalf("Failed to create CronTab: %v", err) } - // Wait for the Subnet copy to appear in the service cluster. - t.Log("Waiting for Subnet copy to appear in the service cluster…") - serviceSubnet := makeSubnet("my-subnet", "synced-default") - serviceSubnet.SetAPIVersion(subnetAPIVersion) - serviceSubnet.SetKind(subnetKind) + // Wait for the service cluster copy to appear. + t.Log("Waiting for CronTabWithStatus copy to appear in the service cluster…") + svcObj := makeCronTabWithStatus("my-related", "synced-default") + svcObj.SetAPIVersion(crontabWithStatusAPIVersion) + svcObj.SetKind(crontabWithStatusKind) err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { - return envtestClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "synced-default"}, serviceSubnet) == nil, nil + return envtestClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "synced-default"}, svcObj) == nil, nil }) if err != nil { - t.Fatalf("Subnet copy never appeared in service cluster: %v", err) + t.Fatalf("CronTabWithStatus copy never appeared in service cluster: %v", err) } // Verify the status was forwarded from kcp to the service cluster. - t.Log("Waiting for Subnet status to be synced to service cluster…") + t.Log("Waiting for CronTabWithStatus status to be synced to service cluster…") err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { - if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "synced-default"}, serviceSubnet); err != nil { + if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "synced-default"}, svcObj); err != nil { return false, nil } - id, _, _ := unstructured.NestedString(serviceSubnet.Object, "status", "id") - return id == "subnet-kcp-12345", nil + id, _, _ := unstructured.NestedString(svcObj.Object, "status", "id") + return id == "id-kcp-12345", nil }) if err != nil { - id, _, _ := unstructured.NestedString(serviceSubnet.Object, "status", "id") - t.Fatalf("Subnet status was not synced to service cluster (got status.id=%q, want %q)", id, "subnet-kcp-12345") + id, _, _ := unstructured.NestedString(svcObj.Object, "status", "id") + t.Fatalf("CronTabWithStatus status was not synced to service cluster (got status.id=%q, want %q)", id, "id-kcp-12345") } // Verify that updating status on the service cluster does NOT propagate back to kcp // (there is no reverse sync for origin:kcp + syncStatus). - serviceSubnet2 := makeSubnet("my-subnet", "synced-default") - serviceSubnet2.SetAPIVersion(subnetAPIVersion) - serviceSubnet2.SetKind(subnetKind) - if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "synced-default"}, serviceSubnet2); err != nil { - t.Fatalf("Failed to re-fetch service cluster Subnet: %v", err) + svcObj2 := makeCronTabWithStatus("my-related", "synced-default") + svcObj2.SetAPIVersion(crontabWithStatusAPIVersion) + svcObj2.SetKind(crontabWithStatusKind) + if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "synced-default"}, svcObj2); err != nil { + t.Fatalf("Failed to re-fetch service cluster CronTabWithStatus: %v", err) } - if err := unstructured.SetNestedField(serviceSubnet2.Object, "subnet-local-override", "status", "id"); err != nil { + if err := unstructured.SetNestedField(svcObj2.Object, "id-local-override", "status", "id"); err != nil { t.Fatalf("Failed to set status.id: %v", err) } - if err := envtestClient.Status().Update(ctx, serviceSubnet2); err != nil { - t.Fatalf("Failed to update service cluster Subnet status: %v", err) + if err := envtestClient.Status().Update(ctx, svcObj2); err != nil { + t.Fatalf("Failed to update service cluster CronTabWithStatus status: %v", err) } // Briefly poll kcp to confirm its status was not overwritten. - kcpSubnetCheck := makeSubnet("my-subnet", "default") - kcpSubnetCheck.SetAPIVersion("kcp.example.com/v1") - kcpSubnetCheck.SetKind("Subnet") - if err := teamClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "default"}, kcpSubnetCheck); err != nil { - t.Fatalf("Failed to get kcp Subnet: %v", err) + kcpObjCheck := makeCronTabWithStatus("my-related", "default") + kcpObjCheck.SetAPIVersion("kcp.example.com/v1") + kcpObjCheck.SetKind("CronTabWithStatus") + if err := teamClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "default"}, kcpObjCheck); err != nil { + t.Fatalf("Failed to get kcp CronTabWithStatus: %v", err) } - kcpID, _, _ := unstructured.NestedString(kcpSubnetCheck.Object, "status", "id") - if kcpID != "subnet-kcp-12345" { - t.Fatalf("kcp Subnet status was unexpectedly modified (got %q, want %q)", kcpID, "subnet-kcp-12345") + kcpID, _, _ := unstructured.NestedString(kcpObjCheck.Object, "status", "id") + if kcpID != "id-kcp-12345" { + t.Fatalf("kcp CronTabWithStatus status was unexpectedly modified (got %q, want %q)", kcpID, "id-kcp-12345") } } @@ -241,18 +241,18 @@ func TestSyncRelatedObjectStatusFromService(t *testing.T) { envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ "test/crds/crontab.yaml", - "test/crds/subnet.yaml", + "test/crds/crontabswithstatus.yaml", }) - // Publish Subnets (with sync disabled) so the Subnet schema is available in kcp + // Publish CronTabsWithStatus (with sync disabled) so the schema is available in kcp // and the agent can create kcp-side copies when syncing service-origin related resources. - prSubnets := &syncagentv1alpha1.PublishedResource{ - ObjectMeta: metav1.ObjectMeta{Name: "publish-subnets"}, + prCronTabsWithStatus := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabswithstatus"}, Spec: syncagentv1alpha1.PublishedResourceSpec{ Resource: syncagentv1alpha1.SourceResourceDescriptor{ - APIGroup: "aws.example.com", + APIGroup: "example.com", Version: "v1", - Kind: "Subnet", + Kind: "CronTabWithStatus", }, Projection: &syncagentv1alpha1.ResourceProjection{ Group: "kcp.example.com", @@ -260,11 +260,11 @@ func TestSyncRelatedObjectStatusFromService(t *testing.T) { Synchronization: &syncagentv1alpha1.SynchronizationSpec{Enabled: false}, }, } - if err := envtestClient.Create(ctx, prSubnets); err != nil { - t.Fatalf("Failed to create Subnet PublishedResource: %v", err) + if err := envtestClient.Create(ctx, prCronTabsWithStatus); err != nil { + t.Fatalf("Failed to create CronTabWithStatus PublishedResource: %v", err) } - // Publish CronTabs with a related Subnet (origin: service, syncStatus: true). + // Publish CronTabs with a related CronTabWithStatus (origin: service, syncStatus: true). prCrontabs := &syncagentv1alpha1.PublishedResource{ ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabs"}, Spec: syncagentv1alpha1.PublishedResourceSpec{ @@ -280,17 +280,17 @@ func TestSyncRelatedObjectStatusFromService(t *testing.T) { Projection: &syncagentv1alpha1.ResourceProjection{Group: "kcp.example.com"}, Related: []syncagentv1alpha1.RelatedResourceSpec{ { - Identifier: "subnet", + Identifier: "crontabwithstatus", Origin: syncagentv1alpha1.RelatedResourceOriginService, - Group: "aws.example.com", + Group: "example.com", Version: "v1", - Resource: "subnets", + Resource: "crontabswithstatus", Projection: &syncagentv1alpha1.RelatedResourceProjection{ Group: "kcp.example.com", }, SyncStatus: true, - // Watch triggers CronTab reconciliation whenever a service cluster Subnet - // changes. This also registers a Subnet informer in the local cache, so + // Watch triggers CronTab reconciliation whenever a service cluster + // CronTabWithStatus changes. This also registers an informer so that // syncObjectStatusForward can read the up-to-date status from the cache. Watch: &syncagentv1alpha1.RelatedResourceWatch{ BySelector: &metav1.LabelSelector{}, @@ -298,7 +298,7 @@ func TestSyncRelatedObjectStatusFromService(t *testing.T) { Object: syncagentv1alpha1.RelatedResourceObject{ RelatedResourceObjectSpec: syncagentv1alpha1.RelatedResourceObjectSpec{ Template: &syncagentv1alpha1.TemplateExpression{ - Template: "my-subnet", + Template: "my-related", }, }, }, @@ -332,48 +332,47 @@ func TestSyncRelatedObjectStatusFromService(t *testing.T) { t.Fatalf("Failed to create CronTab: %v", err) } - // Simulate a service-cluster operator creating the Subnet and setting its status. + // Create the related object on the service cluster and set its status. ensureNamespace(t, ctx, envtestClient, "synced-default") - serviceSubnet := makeSubnet("my-subnet", "synced-default") - if err := envtestClient.Create(ctx, serviceSubnet); err != nil { - t.Fatalf("Failed to create Subnet in service cluster: %v", err) + svcObj := makeCronTabWithStatus("my-related", "synced-default") + if err := envtestClient.Create(ctx, svcObj); err != nil { + t.Fatalf("Failed to create CronTabWithStatus in service cluster: %v", err) } - // Set the status (e.g. the real AWS subnet ID returned after provisioning). - if err := unstructured.SetNestedField(serviceSubnet.Object, "subnet-svc-99999", "status", "id"); err != nil { + if err := unstructured.SetNestedField(svcObj.Object, "id-svc-99999", "status", "id"); err != nil { t.Fatalf("Failed to set status.id: %v", err) } - if err := envtestClient.Status().Update(ctx, serviceSubnet); err != nil { - t.Fatalf("Failed to update Subnet status: %v", err) + if err := envtestClient.Status().Update(ctx, svcObj); err != nil { + t.Fatalf("Failed to update CronTabWithStatus status: %v", err) } // Wait for the kcp copy to appear. - t.Log("Waiting for Subnet copy to appear in kcp…") - kcpSubnet := &unstructured.Unstructured{} - kcpSubnet.SetAPIVersion("kcp.example.com/v1") - kcpSubnet.SetKind("Subnet") + t.Log("Waiting for CronTabWithStatus copy to appear in kcp…") + kcpObj := &unstructured.Unstructured{} + kcpObj.SetAPIVersion("kcp.example.com/v1") + kcpObj.SetKind("CronTabWithStatus") err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { - return teamClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "default"}, kcpSubnet) == nil, nil + return teamClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "default"}, kcpObj) == nil, nil }) if err != nil { - t.Fatalf("Subnet copy never appeared in kcp: %v", err) + t.Fatalf("CronTabWithStatus copy never appeared in kcp: %v", err) } // Wait for the status to be synced to kcp. - t.Log("Waiting for Subnet status to be synced to kcp…") + t.Log("Waiting for CronTabWithStatus status to be synced to kcp…") err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { - if err := teamClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "default"}, kcpSubnet); err != nil { + if err := teamClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "default"}, kcpObj); err != nil { return false, nil } - id, _, _ := unstructured.NestedString(kcpSubnet.Object, "status", "id") - return id == "subnet-svc-99999", nil + id, _, _ := unstructured.NestedString(kcpObj.Object, "status", "id") + return id == "id-svc-99999", nil }) if err != nil { - id, _, _ := unstructured.NestedString(kcpSubnet.Object, "status", "id") - t.Fatalf("Subnet status was not synced to kcp (got status.id=%q, want %q)", id, "subnet-svc-99999") + id, _, _ := unstructured.NestedString(kcpObj.Object, "status", "id") + t.Fatalf("CronTabWithStatus status was not synced to kcp (got status.id=%q, want %q)", id, "id-svc-99999") } } @@ -390,16 +389,16 @@ func TestSyncRelatedObjectStatusNotSyncedByDefault(t *testing.T) { envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ "test/crds/crontab.yaml", - "test/crds/subnet.yaml", + "test/crds/crontabswithstatus.yaml", }) - prSubnets := &syncagentv1alpha1.PublishedResource{ - ObjectMeta: metav1.ObjectMeta{Name: "publish-subnets"}, + prCronTabsWithStatus := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "publish-crontabswithstatus"}, Spec: syncagentv1alpha1.PublishedResourceSpec{ Resource: syncagentv1alpha1.SourceResourceDescriptor{ - APIGroup: "aws.example.com", + APIGroup: "example.com", Version: "v1", - Kind: "Subnet", + Kind: "CronTabWithStatus", }, Projection: &syncagentv1alpha1.ResourceProjection{ Group: "kcp.example.com", @@ -407,8 +406,8 @@ func TestSyncRelatedObjectStatusNotSyncedByDefault(t *testing.T) { Synchronization: &syncagentv1alpha1.SynchronizationSpec{Enabled: false}, }, } - if err := envtestClient.Create(ctx, prSubnets); err != nil { - t.Fatalf("Failed to create Subnet PublishedResource: %v", err) + if err := envtestClient.Create(ctx, prCronTabsWithStatus); err != nil { + t.Fatalf("Failed to create CronTabWithStatus PublishedResource: %v", err) } // SyncStatus is intentionally NOT set here. @@ -427,19 +426,19 @@ func TestSyncRelatedObjectStatusNotSyncedByDefault(t *testing.T) { Projection: &syncagentv1alpha1.ResourceProjection{Group: "kcp.example.com"}, Related: []syncagentv1alpha1.RelatedResourceSpec{ { - Identifier: "subnet", + Identifier: "crontabwithstatus", Origin: syncagentv1alpha1.RelatedResourceOriginKcp, Group: "kcp.example.com", Version: "v1", - Resource: "subnets", + Resource: "crontabswithstatus", Projection: &syncagentv1alpha1.RelatedResourceProjection{ - Group: "aws.example.com", + Group: "example.com", }, // SyncStatus: false (default) Object: syncagentv1alpha1.RelatedResourceObject{ RelatedResourceObjectSpec: syncagentv1alpha1.RelatedResourceObjectSpec{ Template: &syncagentv1alpha1.TemplateExpression{ - Template: "my-subnet", + Template: "my-related", }, }, }, @@ -465,7 +464,7 @@ func TestSyncRelatedObjectStatusNotSyncedByDefault(t *testing.T) { utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionKind{ Group: apiExportName, Version: "v1", - Kind: "Subnet", + Kind: "CronTabWithStatus", }) // Create primary CronTab in kcp. @@ -478,45 +477,43 @@ func TestSyncRelatedObjectStatusNotSyncedByDefault(t *testing.T) { t.Fatalf("Failed to create CronTab: %v", err) } - // Create the Subnet in kcp and set its status. - kcpSubnet := makeSubnet("my-subnet", "default") - kcpSubnet.SetAPIVersion("kcp.example.com/v1") - kcpSubnet.SetKind("Subnet") - if err := teamClient.Create(ctx, kcpSubnet); err != nil { - t.Fatalf("Failed to create Subnet in kcp: %v", err) + // Create the related object in kcp and set its status. + kcpObj := makeCronTabWithStatus("my-related", "default") + kcpObj.SetAPIVersion("kcp.example.com/v1") + kcpObj.SetKind("CronTabWithStatus") + if err := teamClient.Create(ctx, kcpObj); err != nil { + t.Fatalf("Failed to create CronTabWithStatus in kcp: %v", err) } - if err := unstructured.SetNestedField(kcpSubnet.Object, "subnet-should-not-appear", "status", "id"); err != nil { + if err := unstructured.SetNestedField(kcpObj.Object, "id-should-not-appear", "status", "id"); err != nil { t.Fatalf("Failed to set status.id: %v", err) } - if err := teamClient.Status().Update(ctx, kcpSubnet); err != nil { - t.Fatalf("Failed to update Subnet status in kcp: %v", err) + if err := teamClient.Status().Update(ctx, kcpObj); err != nil { + t.Fatalf("Failed to update CronTabWithStatus status in kcp: %v", err) } // Wait for the service cluster copy to appear. - t.Log("Waiting for Subnet copy to appear in service cluster…") - serviceSubnet := makeSubnet("my-subnet", "synced-default") - serviceSubnet.SetAPIVersion(subnetAPIVersion) - serviceSubnet.SetKind(subnetKind) + t.Log("Waiting for CronTabWithStatus copy to appear in service cluster…") + svcObj := makeCronTabWithStatus("my-related", "synced-default") + svcObj.SetAPIVersion(crontabWithStatusAPIVersion) + svcObj.SetKind(crontabWithStatusKind) err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 3*time.Minute, false, func(ctx context.Context) (bool, error) { - return envtestClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "synced-default"}, serviceSubnet) == nil, nil + return envtestClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "synced-default"}, svcObj) == nil, nil }) if err != nil { - t.Fatalf("Subnet copy never appeared in service cluster: %v", err) + t.Fatalf("CronTabWithStatus copy never appeared in service cluster: %v", err) } - // Give the agent a moment to settle and confirm status is not present. - // We wait a short time for any potential erroneous status sync to occur. - var finalServiceSubnet unstructured.Unstructured - finalServiceSubnet.SetAPIVersion(subnetAPIVersion) - finalServiceSubnet.SetKind(subnetKind) + // Poll briefly to confirm no status appears on the service cluster copy. + var finalSvcObj unstructured.Unstructured + finalSvcObj.SetAPIVersion(crontabWithStatusAPIVersion) + finalSvcObj.SetKind(crontabWithStatusKind) - // Poll briefly to check that no status appears. statusSeen := false _ = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) { - if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-subnet", Namespace: "synced-default"}, &finalServiceSubnet); err != nil { + if err := envtestClient.Get(ctx, types.NamespacedName{Name: "my-related", Namespace: "synced-default"}, &finalSvcObj); err != nil { return false, nil } - id, exists, _ := unstructured.NestedString(finalServiceSubnet.Object, "status", "id") + id, exists, _ := unstructured.NestedString(finalSvcObj.Object, "status", "id") if exists && id != "" { statusSeen = true return true, nil @@ -525,7 +522,7 @@ func TestSyncRelatedObjectStatusNotSyncedByDefault(t *testing.T) { }) if statusSeen { - id, _, _ := unstructured.NestedString(finalServiceSubnet.Object, "status", "id") - t.Fatalf("Subnet status was unexpectedly synced to service cluster (got status.id=%q)", id) + id, _, _ := unstructured.NestedString(finalSvcObj.Object, "status", "id") + t.Fatalf("CronTabWithStatus status was unexpectedly synced to service cluster (got status.id=%q)", id) } }