Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,19 @@ spec:
resource:
description: Resource is the name of the related resource (for example "secrets").
type: string
syncStatus:
description: |-
SyncStatus enables synchronization of the status subresource in the same direction as
the spec (from the origin side to the destination side). When enabled, the agent will
use the status subresource endpoint to update the destination object's status.
This requires the related resource to have a status subresource configured in its CRD.

- origin: kcp -> status is synced from kcp to the service cluster
- origin: service -> status is synced from the service cluster to kcp

For origin: service, Watch must also be configured so that changes to the related
resource's status trigger reconciliation and ensure the informer cache is populated.
type: boolean
version:
description: |-
Version is the API version of the related resource. This can be left blank to automatically
Expand Down Expand Up @@ -863,6 +876,8 @@ spec:
rule: '!has(self.group) || (has(self.resource) && has(self.version))'
- message: identity hashes can only be used with GVRs
rule: '!has(self.identityHash) || (has(self.group) && has(self.version) && has(self.resource))'
- message: watch must be configured when origin is service and syncStatus is true
rule: '!(self.origin == ''service'' && has(self.syncStatus) && self.syncStatus) || has(self.watch)'
type: array
resource:
description: |-
Expand Down
1 change: 1 addition & 0 deletions hack/tools.checksums
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ golangci-lint|GOARCH=arm64;GOOS=linux|2ed9cf2ad070dabc7947ba34cdc5142910be830306
kube-apiserver|GOARCH=amd64;GOOS=linux|ca822082ec39e54a25836a4011ddb66e482e317a7a4f1a1f73882bbd2cf5a2a1
kube-apiserver|GOARCH=arm64;GOOS=linux|6ade6c2646e2c01fde1095407452afc2b65e89d6da16da29ee39f6223ccaf63b
kubectl|GOARCH=amd64;GOOS=linux|9591f3d75e1581f3f7392e6ad119aab2f28ae7d6c6e083dc5d22469667f27253
kubectl|GOARCH=arm64;GOOS=darwin|8f38d3a38ae317b00ebf90254dc274dd28d8c6eea4a4b30c5cb12d3d27017b6d
kubectl|GOARCH=arm64;GOOS=linux|95df604e914941f3172a93fa8feeb1a1a50f4011dfbe0c01e01b660afc8f9b85
yq|GOARCH=amd64;GOOS=linux|0c2b24e645b57d8e7c0566d18643a6d4f5580feeea3878127354a46f2a1e4598
yq|GOARCH=arm64;GOOS=darwin|164e10e5f7df62990e4f3823205e7ea42ba5660523a428df07c7386c0b62e3d9
Expand Down
42 changes: 40 additions & 2 deletions internal/sync/object_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ type objectSyncer struct {
destCreator objectCreatorFunc
// list of subresources in the resource type
subresources []string
// whether to enable status subresource back-syncing
// whether to enable status subresource back-syncing (dest -> source)
syncStatusBack bool
// whether to sync status in the forward direction (source -> dest)
syncStatusForward bool
// whether or not to add/expect a finalizer on the source
blockSourceDeletion bool
// whether or not to place sync-related metadata on the destination object
Expand Down Expand Up @@ -197,10 +199,20 @@ func (s *objectSyncer) applyMutations(source, dest syncSide) (syncSide, syncSide
func (s *objectSyncer) syncObjectContents(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) (requeue bool, err error) {
// Sync the spec (or more generally, the desired state) from source to dest.
requeue, err = s.syncObjectSpec(ctx, log, source, dest)
if requeue || err != nil {
if err != nil {
return false, err
}

// Always attempt forward status sync regardless of whether spec was just updated,
// so that a simultaneous spec+status change doesn't defer the status by one cycle.
if err = s.syncObjectStatusForward(ctx, log, source, dest); err != nil {
return requeue, err
}

if requeue {
return true, nil
}

// Sync the status back in the opposite direction, from dest to source.
return s.syncObjectStatus(ctx, log, source, dest)
}
Expand Down Expand Up @@ -323,6 +335,32 @@ func (s *objectSyncer) syncObjectStatus(ctx context.Context, log *zap.SugaredLog
return false, nil
}

func (s *objectSyncer) syncObjectStatusForward(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) error {
if !s.syncStatusForward || dest.object == nil {
return nil
}

sourceContent := source.object.UnstructuredContent()
destContent := dest.object.UnstructuredContent()

if !equality.Semantic.DeepEqual(sourceContent["status"], destContent["status"]) {
destContent["status"] = sourceContent["status"]

log.Debug("Updating destination object status…")
if err := dest.client.Status().Update(ctx, dest.object); err != nil {
if apierrors.IsNotFound(err) {
// The /status subresource does not exist on the destination CRD.
// Retrying will not help; emit a warning so the user knows what to fix.
s.recordEvent(ctx, source, dest, corev1.EventTypeWarning, "StatusSubresourceMissing", "Cannot sync status: the destination resource does not have a status subresource. Set syncStatus: false or add subresources.status to the CRD.")
return nil
}
return fmt.Errorf("failed to update destination object status: %w", err)
}
}

return nil
}

func (s *objectSyncer) ensureDestinationObject(ctx context.Context, log *zap.SugaredLogger, source, dest syncSide) error {
// create a copy of the source with GVK projected and renaming rules applied
destObj, err := s.destCreator(source.object)
Expand Down
223 changes: 223 additions & 0 deletions internal/sync/object_syncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
Copyright 2026 The KCP Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sync

import (
"context"
"testing"

"go.uber.org/zap"

"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/record"
)

// fakeStateStore is a minimal ObjectStateStore for unit tests.
type fakeStateStore struct {
lastKnown *unstructured.Unstructured
}

func (f *fakeStateStore) Get(_ context.Context, _ syncSide) (*unstructured.Unstructured, error) {
return f.lastKnown, nil
}

func (f *fakeStateStore) Put(_ context.Context, _ *unstructured.Unstructured, _ logicalcluster.Name, _ []string) error {
return nil
}

func makeUnstructuredWithStatus(name, namespace string, status map[string]interface{}) *unstructured.Unstructured {
obj := &unstructured.Unstructured{}
obj.SetAPIVersion("test.example.com/v1")
obj.SetKind("Widget")
obj.SetName(name)
obj.SetNamespace(namespace)
if status != nil {
if err := unstructured.SetNestedMap(obj.Object, status, "status"); err != nil {
panic(err)
}
}
return obj
}

func TestSyncObjectStatusForward(t *testing.T) {
log := zap.NewNop().Sugar()

t.Run("no-op when syncStatusForward is false", func(t *testing.T) {
source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"})
dest := makeUnstructuredWithStatus("dst", "default", nil)
destClient := buildFakeClientWithStatus(dest)

syncer := &objectSyncer{syncStatusForward: false}
ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10))

err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

got := dest.UnstructuredContent()["status"]
if got != nil {
t.Errorf("expected dest status to remain nil, got %v", got)
}
})

t.Run("no-op when dest object is nil", func(t *testing.T) {
source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"})

syncer := &objectSyncer{syncStatusForward: true}
ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10))

err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: nil})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
})

t.Run("syncs status from source to dest when they differ", func(t *testing.T) {
source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"})
dest := makeUnstructuredWithStatus("dst", "default", nil)
destClient := buildFakeClientWithStatus(dest)

syncer := &objectSyncer{syncStatusForward: true, eventObjSide: syncSideSource}
ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10))

err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

phase, _, _ := unstructured.NestedString(dest.Object, "status", "phase")
if phase != "ready" {
t.Errorf("expected dest status.phase=ready, got %q", phase)
}
})

t.Run("no-op when status is already equal", func(t *testing.T) {
status := map[string]interface{}{"phase": "ready"}
source := makeUnstructuredWithStatus("src", "default", status)
dest := makeUnstructuredWithStatus("dst", "default", status)
destClient := buildFakeClientWithStatus(dest)

syncer := &objectSyncer{syncStatusForward: true}
ctx := WithEventRecorder(t.Context(), record.NewFakeRecorder(10))

err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
})

t.Run("emits warning and returns nil when dest has no status subresource (404)", func(t *testing.T) {
source := makeUnstructuredWithStatus("src", "default", map[string]interface{}{"phase": "ready"})
dest := makeUnstructuredWithStatus("dst", "default", nil)

// Build a client that does NOT register the status subresource, so Status().Update() → 404.
destClient := buildFakeClient(dest)

syncer := &objectSyncer{syncStatusForward: true, eventObjSide: syncSideSource}
recorder := record.NewFakeRecorder(10)
ctx := WithEventRecorder(t.Context(), recorder)

err := syncer.syncObjectStatusForward(ctx, log, syncSide{object: source}, syncSide{object: dest, client: destClient})
if err != nil {
t.Fatalf("expected no error on 404, got: %v", err)
}

select {
case event := <-recorder.Events:
if event == "" {
t.Error("expected a Warning event to be emitted")
}
default:
t.Error("expected a Warning event to be emitted but channel was empty")
}
})
}

// TestSyncObjectContentsForwardStatusRunsEvenOnSpecRequeue verifies the key invariant:
// syncObjectStatusForward is called unconditionally in syncObjectContents, even when
// syncObjectSpec already returned requeue=true due to a pending spec change.
// A future refactor that re-introduces an early return after spec sync would silently
// break status sync for simultaneous spec+status changes.
func TestSyncObjectContentsForwardStatusRunsEvenOnSpecRequeue(t *testing.T) {
log := zap.NewNop().Sugar()

// Source: new spec + status set.
source := &unstructured.Unstructured{}
source.SetAPIVersion("test.example.com/v1")
source.SetKind("Widget")
source.SetName("my-widget")
source.SetNamespace("default")
if err := unstructured.SetNestedField(source.Object, "new-value", "spec", "username"); err != nil {
t.Fatal(err)
}
if err := unstructured.SetNestedField(source.Object, "ready", "status", "phase"); err != nil {
t.Fatal(err)
}

// Dest: old spec, no status.
dest := &unstructured.Unstructured{}
dest.SetAPIVersion("test.example.com/v1")
dest.SetKind("Widget")
dest.SetName("my-widget")
dest.SetNamespace("default")
if err := unstructured.SetNestedField(dest.Object, "old-value", "spec", "username"); err != nil {
t.Fatal(err)
}

destClient := buildFakeClientWithStatus(dest)

// Last known source state has old spec — so syncObjectSpec sees a diff and patches, returning requeue=true.
lastKnown := &unstructured.Unstructured{}
lastKnown.SetAPIVersion("test.example.com/v1")
lastKnown.SetKind("Widget")
lastKnown.SetName("my-widget")
if err := unstructured.SetNestedField(lastKnown.Object, "old-value", "spec", "username"); err != nil {
t.Fatal(err)
}

syncer := &objectSyncer{
stateStore: &fakeStateStore{lastKnown: lastKnown},
syncStatusForward: true,
subresources: []string{"status"},
destCreator: func(u *unstructured.Unstructured) (*unstructured.Unstructured, error) { return u.DeepCopy(), nil },
eventObjSide: syncSideSource,
}

recorder := record.NewFakeRecorder(10)
ctx := WithEventRecorder(t.Context(), recorder)
ctx = WithClusterName(ctx, logicalcluster.Name("testcluster"))

sourceSide := syncSide{object: source, clusterName: "testcluster"}
destSide := syncSide{object: dest, client: destClient}

requeue, err := syncer.syncObjectContents(ctx, log, sourceSide, destSide)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !requeue {
t.Error("expected requeue=true because spec changed, but got false")
}

// Even though spec sync returned requeue=true, the dest status must have been updated.
phase, _, _ := unstructured.NestedString(dest.Object, "status", "phase")
if phase != "ready" {
t.Errorf("expected dest status.phase=ready after syncObjectContents, got %q — forward status sync did not run on spec requeue", phase)
}
}
22 changes: 11 additions & 11 deletions internal/sync/syncer_related.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ func (s *ResourceSyncer) processRelatedResource(ctx context.Context, log *zap.Su
// into the cleanup procedure to ensure that copies of the related object are removed.
forceDelete := primaryDeleting && (relRes.Cleanup || relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp)

// When status sync is enabled, include "status" in subresources so it is stripped from
// the spec patch (avoiding a no-op write on resources that have a status subresource).
// The status is then separately written via the status subresource endpoint by syncStatusForward.
relatedSubresources := []string(nil)
if relRes.SyncStatus {
relatedSubresources = []string{"status"}
}

syncer := objectSyncer{
// Related objects within kcp are not labelled with the agent name because it's unnecessary.
// agentName: "",
Expand All @@ -149,17 +157,9 @@ func (s *ResourceSyncer) processRelatedResource(ctx context.Context, log *zap.Su

return dest, nil
},
// Originally related resources were only ConfigMaps and Secrets, which do not have subresources;
// nowadays we support arbitrary APIs for related resources, but for simplicity do not [yet?]
// support syncing the subresources back. For this we would first need to figure out which
// subresources even exist.
subresources: nil,
// Theoretically we would only want to sync the status back if the related resource
// originates in kcp, because it would be weird if the service provider relied on status
// information provided to them by the consumer.
// However since we do not know anything about subresources, we currently cannot enable this
// feature at all.
syncStatusBack: false,
subresources: relatedSubresources,
syncStatusBack: false,
syncStatusForward: relRes.SyncStatus,
// if the origin is on the remote side, we want to add a finalizer to make
// sure we can clean up properly; when forceDelete is enabled, we are in deletion mode and
// want to force the deletion, so we ignore the related object's origin (it was taken into
Expand Down
15 changes: 15 additions & 0 deletions sdk/apis/syncagent/v1alpha1/published_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ const (
// +kubebuilder:validation:XValidation:rule="!has(self.group) || (has(self.resource) && has(self.version))",message="configuring a group also requires a version and resource"
// group is included here because when an identityHash is used, core/v1 cannot possible be targetted
// +kubebuilder:validation:XValidation:rule="!has(self.identityHash) || (has(self.group) && has(self.version) && has(self.resource))",message="identity hashes can only be used with GVRs"
// +kubebuilder:validation:XValidation:rule="!(self.origin == 'service' && has(self.syncStatus) && self.syncStatus) || has(self.watch)",message="watch must be configured when origin is service and syncStatus is true"
type RelatedResourceSpec struct {
// Identifier is a unique name for this related resource. The name must be unique within one
// PublishedResource and is the key by which consumers (end users) can identify and consume the
Expand Down Expand Up @@ -268,6 +269,20 @@ type RelatedResourceSpec struct {
// resource type and uses the configured rule to enqueue the correct primary object.
// Without this field, changes to origin:kcp related resources do not trigger reconciliation.
Watch *RelatedResourceWatch `json:"watch,omitempty"`

// SyncStatus enables synchronization of the status subresource in the same direction as
// the spec (from the origin side to the destination side). When enabled, the agent will
// use the status subresource endpoint to update the destination object's status.
// This requires the related resource to have a status subresource configured in its CRD.
//
// - origin: kcp -> status is synced from kcp to the service cluster
// - origin: service -> status is synced from the service cluster to kcp
//
// For origin: service, Watch must also be configured so that changes to the related
// resource's status trigger reconciliation and ensure the informer cache is populated.
//
// +optional
SyncStatus bool `json:"syncStatus,omitempty"`
}

// RelatedResourceWatch configures how the watch handler maps a changed related resource
Expand Down
Loading
Loading