From 2ce4111cdd5a939a0135ed5afe1bb9729f6560c8 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 25 Jun 2026 23:55:38 +0800 Subject: [PATCH 1/3] fix(connection): use MergeFrom patch for finalizer writes to avoid 409 conflicts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sub-resource reconcilers added and removed the resource finalizer with a full-object Update. The object is loaded once from the informer cache, so the Update carries a stale resourceVersion as an optimistic-lock precondition. When the server copy moves ahead — concurrent writers such as GitOps controllers, or informer-cache lag — the API server rejects the write with a 409 Conflict ("the object has been modified") and the finalizer is never removed. The object is then stuck Terminating while the connection requeues hot, and it only clears once the cache happens to catch up. Replace each finalizer Update with a client.MergeFrom patch, guarded by the boolean return of Add/RemoveFinalizer. The patch sends only the finalizer diff and carries no resourceVersion precondition, so it always applies to the live object regardless of cache lag or concurrent writers; the guard avoids an API call when the finalizer set is unchanged. Applied across all PulsarConnection sub-reconcilers (tenant, namespace, topic, source, sink, function, package, permission, geo-replication, ns-isolation-policy) and the PulsarConnection reconciler itself. Add a regression test that removes a PulsarSource finalizer from a stale-resourceVersion copy and asserts there is no conflict, that the write uses Patch rather than a full-object Update, and that the object is garbage-collected promptly. Co-Authored-By: Claude Opus 4.8 --- pkg/connection/reconcile_function.go | 20 +- pkg/connection/reconcile_geo_replication.go | 20 +- pkg/connection/reconcile_namespace.go | 20 +- pkg/connection/reconcile_nsisolationpolicy.go | 20 +- pkg/connection/reconcile_package.go | 20 +- pkg/connection/reconcile_permission.go | 20 +- pkg/connection/reconcile_sink.go | 20 +- pkg/connection/reconcile_source.go | 20 +- pkg/connection/reconcile_source_test.go | 180 ++++++++++++++++++ pkg/connection/reconcile_tenant.go | 20 +- pkg/connection/reconcile_topic.go | 20 +- pkg/connection/reconciler.go | 13 +- 12 files changed, 308 insertions(+), 85 deletions(-) create mode 100644 pkg/connection/reconcile_source_test.go diff --git a/pkg/connection/reconcile_function.go b/pkg/connection/reconcile_function.go index 6755f06c..78791e48 100644 --- a/pkg/connection/reconcile_function.go +++ b/pkg/connection/reconcile_function.go @@ -101,20 +101,24 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(instance, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, instance); err != nil { - log.Error(err, "Failed to remove finalizer") - return err + patch := client.MergeFrom(instance.DeepCopy()) + if controllerutil.RemoveFinalizer(instance, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, instance, patch); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } } return nil } if instance.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(instance, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, instance); err != nil { - log.Error(err, "Failed to add finalizer") - return err + patch := client.MergeFrom(instance.DeepCopy()) + if controllerutil.AddFinalizer(instance, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, instance, patch); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } } } diff --git a/pkg/connection/reconcile_geo_replication.go b/pkg/connection/reconcile_geo_replication.go index 54063e5c..715dfe2d 100644 --- a/pkg/connection/reconcile_geo_replication.go +++ b/pkg/connection/reconcile_geo_replication.go @@ -168,17 +168,21 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con } } } - controllerutil.RemoveFinalizer(geoReplication, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, geoReplication); err != nil { - log.Error(err, "Failed to remove finalizer") - return err + patch := client.MergeFrom(geoReplication.DeepCopy()) + if controllerutil.RemoveFinalizer(geoReplication, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, geoReplication, patch); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } } } } - controllerutil.AddFinalizer(geoReplication, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, geoReplication); err != nil { - log.Error(err, "Failed to add finalizer") - return err + patch := client.MergeFrom(geoReplication.DeepCopy()) + if controllerutil.AddFinalizer(geoReplication, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, geoReplication, patch); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } } secretUpdated, err := r.checkSecretRefUpdate(*destConnection, geoReplication.Spec.ClusterParamsOverride) diff --git a/pkg/connection/reconcile_namespace.go b/pkg/connection/reconcile_namespace.go index 27e76179..b5d21c7e 100644 --- a/pkg/connection/reconcile_namespace.go +++ b/pkg/connection/reconcile_namespace.go @@ -124,10 +124,12 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(namespace, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, namespace); err != nil { - log.Error(err, "Failed to remove finalizer") - return err + patch := client.MergeFrom(namespace.DeepCopy()) + if controllerutil.RemoveFinalizer(namespace, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, namespace, patch); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } } return nil @@ -135,10 +137,12 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls if namespace.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(namespace, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, namespace); err != nil { - log.Error(err, "Failed to add finalizer") - return err + patch := client.MergeFrom(namespace.DeepCopy()) + if controllerutil.AddFinalizer(namespace, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, namespace, patch); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } } } diff --git a/pkg/connection/reconcile_nsisolationpolicy.go b/pkg/connection/reconcile_nsisolationpolicy.go index 0c78f26a..4a80d8d0 100644 --- a/pkg/connection/reconcile_nsisolationpolicy.go +++ b/pkg/connection/reconcile_nsisolationpolicy.go @@ -105,20 +105,24 @@ func (r *PulsarNSIsolationPolicyReconciler) ReconcilePolicy(ctx context.Context, } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(policy, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, policy); err != nil { - log.Error(err, "Failed to remove finalizer") - return err + patch := client.MergeFrom(policy.DeepCopy()) + if controllerutil.RemoveFinalizer(policy, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, policy, patch); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } } return nil } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(policy, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, policy); err != nil { - log.Error(err, "Failed to add finalizer") - return err + patch := client.MergeFrom(policy.DeepCopy()) + if controllerutil.AddFinalizer(policy, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, policy, patch); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } } if resourcev1alpha1.IsPulsarResourceReady(policy) && diff --git a/pkg/connection/reconcile_package.go b/pkg/connection/reconcile_package.go index cc6992bc..cba3e748 100644 --- a/pkg/connection/reconcile_package.go +++ b/pkg/connection/reconcile_package.go @@ -154,19 +154,23 @@ func (r *PulsarPackageReconciler) ReconcilePackage(ctx context.Context, pulsarAd } } - controllerutil.RemoveFinalizer(pkg, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, pkg); err != nil { - log.Error(err, "Failed to remove finalizer") - return err + patch := client.MergeFrom(pkg.DeepCopy()) + if controllerutil.RemoveFinalizer(pkg, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, pkg, patch); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } } return nil } if pkg.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { - controllerutil.AddFinalizer(pkg, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, pkg); err != nil { - log.Error(err, "Failed to add finalizer") - return err + patch := client.MergeFrom(pkg.DeepCopy()) + if controllerutil.AddFinalizer(pkg, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, pkg, patch); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } } } diff --git a/pkg/connection/reconcile_permission.go b/pkg/connection/reconcile_permission.go index e2e9a74b..1d253999 100644 --- a/pkg/connection/reconcile_permission.go +++ b/pkg/connection/reconcile_permission.go @@ -102,20 +102,24 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu } } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(permission, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, permission); err != nil { - log.Error(err, "Failed to remove finalizer") - return err + patch := client.MergeFrom(permission.DeepCopy()) + if controllerutil.RemoveFinalizer(permission, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, permission, patch); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } } return nil } if permission.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(permission, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, permission); err != nil { - log.Error(err, "Failed to add finalizer") - return err + patch := client.MergeFrom(permission.DeepCopy()) + if controllerutil.AddFinalizer(permission, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, permission, patch); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } } } diff --git a/pkg/connection/reconcile_sink.go b/pkg/connection/reconcile_sink.go index b9aff332..dffa1df5 100644 --- a/pkg/connection/reconcile_sink.go +++ b/pkg/connection/reconcile_sink.go @@ -99,20 +99,24 @@ func (r *PulsarSinkReconciler) ReconcileSink(ctx context.Context, pulsarAdmin ad } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(sink, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, sink); err != nil { - log.Error(err, "Failed to remove finalizer") - return err + patch := client.MergeFrom(sink.DeepCopy()) + if controllerutil.RemoveFinalizer(sink, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, sink, patch); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } } return nil } if sink.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(sink, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, sink); err != nil { - log.Error(err, "Failed to add finalizer") - return err + patch := client.MergeFrom(sink.DeepCopy()) + if controllerutil.AddFinalizer(sink, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, sink, patch); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } } } diff --git a/pkg/connection/reconcile_source.go b/pkg/connection/reconcile_source.go index 23a687ac..3867b21e 100644 --- a/pkg/connection/reconcile_source.go +++ b/pkg/connection/reconcile_source.go @@ -100,20 +100,24 @@ func (r *PulsarSourceReconciler) ReconcileSource(ctx context.Context, pulsarAdmi } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(source, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, source); err != nil { - log.Error(err, "Failed to remove finalizer") - return err + patch := client.MergeFrom(source.DeepCopy()) + if controllerutil.RemoveFinalizer(source, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, source, patch); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } } return nil } if source.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(source, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, source); err != nil { - log.Error(err, "Failed to add finalizer") - return err + patch := client.MergeFrom(source.DeepCopy()) + if controllerutil.AddFinalizer(source, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, source, patch); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } } } diff --git a/pkg/connection/reconcile_source_test.go b/pkg/connection/reconcile_source_test.go new file mode 100644 index 00000000..7012f3f3 --- /dev/null +++ b/pkg/connection/reconcile_source_test.go @@ -0,0 +1,180 @@ +// Copyright 2026 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" + "github.com/streamnative/pulsar-resources-operator/pkg/admin" +) + +// finalizerCountingClient wraps a client.Client and counts full-object Update and Patch +// calls, so a test can assert that finalizer changes are persisted via a MergeFrom Patch +// (which carries no resourceVersion precondition) and never via a full-object Update. +type finalizerCountingClient struct { + client.Client + updateCalls int + patchCalls int +} + +func (c *finalizerCountingClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + c.updateCalls++ + return c.Client.Update(ctx, obj, opts...) +} + +func (c *finalizerCountingClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + c.patchCalls++ + return c.Client.Patch(ctx, obj, patch, opts...) +} + +func newSourceTestScheme(t *testing.T) *runtime.Scheme { + t.Helper() + scheme := runtime.NewScheme() + if err := resourcev1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("add resource scheme: %v", err) + } + return scheme +} + +// newDeletingTestSource returns a PulsarSource that is mid-deletion: it carries a +// deletionTimestamp and the operator finalizer. The fake client permits creating an object +// with a deletionTimestamp as long as it still has at least one finalizer. +func newDeletingTestSource() *resourcev1alpha1.PulsarSource { + deletionTime := metav1.NewTime(time.Unix(1700000000, 0)) + return &resourcev1alpha1.PulsarSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "source", + Namespace: "test-ns", + Generation: 1, + Finalizers: []string{resourcev1alpha1.FinalizerName}, + DeletionTimestamp: &deletionTime, + }, + Spec: resourcev1alpha1.PulsarSourceSpec{ + Tenant: "public", + Namespace: "default", + Name: "source", + LifecyclePolicy: resourcev1alpha1.CleanUpAfterDeletion, + }, + } +} + +// staleSourceCopyAfterConcurrentWrite returns the source as the reconciler would have +// cached it, then advances the server-side resourceVersion out of band (simulating a +// concurrent writer such as ArgoCD) so the returned copy is stale relative to the server. +// This is the exact condition under which a full-object Update fails with a 409 conflict. +func staleSourceCopyAfterConcurrentWrite(t *testing.T, k8sClient client.Client) *resourcev1alpha1.PulsarSource { + t.Helper() + cached := &resourcev1alpha1.PulsarSource{} + if err := k8sClient.Get(context.Background(), + types.NamespacedName{Namespace: "test-ns", Name: "source"}, cached); err != nil { + t.Fatalf("get cached source: %v", err) + } + + live := cached.DeepCopy() + live.Annotations = map[string]string{"out-of-band": "bump"} + if err := k8sClient.Update(context.Background(), live); err != nil { + t.Fatalf("out-of-band update to bump resourceVersion: %v", err) + } + return cached +} + +// TestReconcileSourceDeletionRemovesFinalizerWithStaleResourceVersion is the regression +// test for the finalizer-conflict fix. When a PulsarSource is being deleted, the operator +// must remove its finalizer via a MergeFrom Patch (no resourceVersion precondition) even +// when the in-memory copy is stale, so the object is garbage-collected promptly without a +// 409 conflict — regardless of informer-cache lag or concurrent writers (e.g. GitOps). +// +// Against the pre-fix implementation (a full-object Update) this test fails: the Update +// carries the stale resourceVersion and the fake API server rejects it with a 409. +func TestReconcileSourceDeletionRemovesFinalizerWithStaleResourceVersion(t *testing.T) { + scheme := newSourceTestScheme(t) + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&resourcev1alpha1.PulsarSource{}). + WithObjects(newDeletingTestSource()). + Build() + + // Capture the cached copy, then move the server resourceVersion ahead of it. + stale := staleSourceCopyAfterConcurrentWrite(t, fakeClient) + + countingClient := &finalizerCountingClient{Client: fakeClient} + r := &PulsarSourceReconciler{ + conn: &PulsarConnectionReconciler{ + connection: newReadyTestConnection(), + log: logr.Discard(), + client: countingClient, + }, + log: logr.Discard(), + } + + // DummyPulsarAdmin.DeletePulsarSource is a no-op; the assertion targets the finalizer path. + if err := r.ReconcileSource(context.Background(), &admin.DummyPulsarAdmin{}, stale); err != nil { + t.Fatalf("ReconcileSource on a deleting source with a stale resourceVersion returned an error "+ + "(want nil, no 409 conflict): %v", err) + } + + if countingClient.updateCalls != 0 { + t.Fatalf("finalizer removal must not use a full-object Update (it carries a resourceVersion "+ + "precondition); got %d Update call(s)", countingClient.updateCalls) + } + if countingClient.patchCalls == 0 { + t.Fatal("expected finalizer removal to persist via a MergeFrom Patch, got 0 Patch calls") + } + + got := &resourcev1alpha1.PulsarSource{} + err := fakeClient.Get(context.Background(), + types.NamespacedName{Namespace: "test-ns", Name: "source"}, got) + if err == nil { + t.Fatalf("expected source to be garbage-collected after finalizer removal, "+ + "but it still exists with finalizers %v", got.Finalizers) + } + if !apierrors.IsNotFound(err) { + t.Fatalf("expected NotFound after finalizer removal, got: %v", err) + } +} + +// TestSourceStaleFinalizerUpdateConflicts documents the pre-fix failure mode and proves the +// test harness genuinely surfaces it: removing the finalizer with a full-object Update from a +// stale-resourceVersion copy returns a 409 conflict. This is precisely the conflict that the +// MergeFrom Patch fix avoids, and it guards against any regression back to Update-based +// finalizer writes. +func TestSourceStaleFinalizerUpdateConflicts(t *testing.T) { + scheme := newSourceTestScheme(t) + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&resourcev1alpha1.PulsarSource{}). + WithObjects(newDeletingTestSource()). + Build() + + stale := staleSourceCopyAfterConcurrentWrite(t, fakeClient) + + controllerutil.RemoveFinalizer(stale, resourcev1alpha1.FinalizerName) + err := fakeClient.Update(context.Background(), stale) + if err == nil || !apierrors.IsConflict(err) { + t.Fatalf("expected a 409 conflict from a stale full-object Update, got: %v", err) + } +} diff --git a/pkg/connection/reconcile_tenant.go b/pkg/connection/reconcile_tenant.go index c866b623..f70e95f6 100644 --- a/pkg/connection/reconcile_tenant.go +++ b/pkg/connection/reconcile_tenant.go @@ -102,20 +102,24 @@ func (r *PulsarTenantReconciler) ReconcileTenant(ctx context.Context, pulsarAdmi } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(tenant, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, tenant); err != nil { - log.Error(err, "Failed to remove finalizer") - return err + patch := client.MergeFrom(tenant.DeepCopy()) + if controllerutil.RemoveFinalizer(tenant, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, tenant, patch); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } } return nil } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(tenant, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, tenant); err != nil { - log.Error(err, "Failed to add finalizer") - return err + patch := client.MergeFrom(tenant.DeepCopy()) + if controllerutil.AddFinalizer(tenant, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, tenant, patch); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } } if resourcev1alpha1.IsPulsarResourceReady(tenant) && diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index dff0de14..afa56c0b 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -137,20 +137,24 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(topic, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, topic); err != nil { - log.Error(err, "Failed to remove finalizer") - return err + patch := client.MergeFrom(topic.DeepCopy()) + if controllerutil.RemoveFinalizer(topic, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, topic, patch); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } } return nil } if topic.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(topic, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, topic); err != nil { - log.Error(err, "Failed to add finalizer") - return err + patch := client.MergeFrom(topic.DeepCopy()) + if controllerutil.AddFinalizer(topic, resourcev1alpha1.FinalizerName) { + if err := r.conn.client.Patch(ctx, topic, patch); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } } } diff --git a/pkg/connection/reconciler.go b/pkg/connection/reconciler.go index ef2aeeb2..e9c4829d 100644 --- a/pkg/connection/reconciler.go +++ b/pkg/connection/reconciler.go @@ -123,9 +123,11 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error { // keep the connection until all resources has been removed // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(r.connection, resourcev1alpha1.FinalizerName) - if err := r.client.Update(ctx, r.connection); err != nil { - return err + patch := client.MergeFrom(r.connection.DeepCopy()) + if controllerutil.RemoveFinalizer(r.connection, resourcev1alpha1.FinalizerName) { + if err := r.client.Patch(ctx, r.connection, patch); err != nil { + return err + } } } else { r.log.Info("There are still remaining resources before deleting the connection", "tenants", len(r.tenants), "namespaces", @@ -147,17 +149,18 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error { log.Info("Reconciling pulsar resources", "resources", r.unreadyResources) connectionChanged := false + // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer + patch := client.MergeFrom(r.connection.DeepCopy()) if r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL != "" { r.connection.Spec.AdminServiceURL = r.connection.Spec.AdminServiceSecureURL connectionChanged = true } - // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer if controllerutil.AddFinalizer(r.connection, resourcev1alpha1.FinalizerName) { connectionChanged = true } if connectionChanged { - if err := r.client.Update(ctx, r.connection); err != nil { + if err := r.client.Patch(ctx, r.connection, patch); err != nil { return err } } From c07b0b14d1e8c8861eae29b2c29b1a15a2fe13b6 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 26 Jun 2026 10:51:24 +0800 Subject: [PATCH 2/3] fix(connection): preserve foreign finalizers and concurrent edits on finalizer writes The previous change persisted finalizers (and the connection's admin-URL default) with a client.MergeFrom patch. A JSON merge patch replaces list fields atomically, so removing the operator finalizer from a stale cached object emits a patch that clears the whole finalizers list. That clobbers a finalizer added concurrently by another controller (e.g. the Kubernetes foregroundDeletion finalizer, or a GitOps finalizer) and deletes the object prematurely while that finalizer is still pending. The connection's adminServiceURL default had the same hazard: a MergeFrom patch carries no resourceVersion precondition, so it could overwrite a concurrent spec edit. Read the live object under optimistic-lock retry instead. New helpers ensureFinalizer / removeFinalizer (finalizer_helpers.go) Get the latest object, mutate only the operator finalizer, and Update with retry.RetryOnConflict, so concurrently-added finalizers are preserved and a stale cache no longer wedges deletion. The PulsarConnection reconciler applies the same Get-mutate-Update-retry to its adminServiceURL default and finalizer. Tests: - PulsarSource deletion preserves a concurrently-added foreign finalizer and does not garbage-collect the object while that finalizer is still pending. - PulsarSource deletion still removes the operator finalizer and the object is collected promptly even when the cached resourceVersion is stale. - PulsarConnection reconcile does not overwrite a concurrent adminServiceURL edit when defaulting from the secure URL. Co-Authored-By: Claude Opus 4.8 --- pkg/connection/finalizer_helpers.go | 69 +++++++++ pkg/connection/reconcile_function.go | 19 +-- pkg/connection/reconcile_geo_replication.go | 19 +-- pkg/connection/reconcile_namespace.go | 19 +-- pkg/connection/reconcile_nsisolationpolicy.go | 19 +-- pkg/connection/reconcile_package.go | 19 +-- pkg/connection/reconcile_permission.go | 19 +-- pkg/connection/reconcile_sink.go | 19 +-- pkg/connection/reconcile_source.go | 19 +-- pkg/connection/reconcile_source_test.go | 132 ++++++++---------- pkg/connection/reconcile_tenant.go | 19 +-- pkg/connection/reconcile_topic.go | 19 +-- pkg/connection/reconciler.go | 43 +++--- pkg/connection/reconciler_test.go | 44 ++++++ 14 files changed, 260 insertions(+), 218 deletions(-) create mode 100644 pkg/connection/finalizer_helpers.go diff --git a/pkg/connection/finalizer_helpers.go b/pkg/connection/finalizer_helpers.go new file mode 100644 index 00000000..a3c279e8 --- /dev/null +++ b/pkg/connection/finalizer_helpers.go @@ -0,0 +1,69 @@ +// Copyright 2025 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +// ensureFinalizer adds finalizer to obj, persisting the change against the live API +// object and retrying on optimistic-lock conflicts. +// +// It re-reads obj from the API server before each attempt and mutates only this +// operator's finalizer, so finalizers added concurrently by other controllers are +// preserved. A full-object Update with optimistic concurrency is used (not a JSON +// merge patch, which replaces the whole finalizers list and could drop a concurrent +// finalizer when applied from a stale base). On success obj holds the server's latest +// state, including the refreshed resourceVersion, so callers may safely issue a +// follow-up Status().Update(ctx, obj). +func ensureFinalizer(ctx context.Context, c client.Client, obj client.Object, finalizer string) error { + key := client.ObjectKeyFromObject(obj) + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := c.Get(ctx, key, obj); err != nil { + return err + } + if !controllerutil.AddFinalizer(obj, finalizer) { + // Already present; nothing to write. + return nil + } + return c.Update(ctx, obj) + }) +} + +// removeFinalizer removes finalizer from obj, persisting the change against the live +// API object and retrying on optimistic-lock conflicts. +// +// It re-reads obj before each attempt and removes only this operator's finalizer, so it +// never clobbers finalizers owned by other controllers and never deletes the object +// prematurely while a foreign finalizer is still pending. A NotFound result (the object +// was garbage-collected once its last finalizer was gone) is treated as success. +func removeFinalizer(ctx context.Context, c client.Client, obj client.Object, finalizer string) error { + key := client.ObjectKeyFromObject(obj) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := c.Get(ctx, key, obj); err != nil { + return err + } + if !controllerutil.RemoveFinalizer(obj, finalizer) { + // Already absent; nothing to write. + return nil + } + return c.Update(ctx, obj) + }) + return client.IgnoreNotFound(err) +} diff --git a/pkg/connection/reconcile_function.go b/pkg/connection/reconcile_function.go index 78791e48..14f3ea9d 100644 --- a/pkg/connection/reconcile_function.go +++ b/pkg/connection/reconcile_function.go @@ -25,7 +25,6 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/feature" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -101,24 +100,18 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(instance.DeepCopy()) - if controllerutil.RemoveFinalizer(instance, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, instance, patch); err != nil { - log.Error(err, "Failed to remove finalizer") - return err - } + if err := removeFinalizer(ctx, r.conn.client, instance, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to remove finalizer") + return err } return nil } if instance.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(instance.DeepCopy()) - if controllerutil.AddFinalizer(instance, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, instance, patch); err != nil { - log.Error(err, "Failed to add finalizer") - return err - } + if err := ensureFinalizer(ctx, r.conn.client, instance, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to add finalizer") + return err } } diff --git a/pkg/connection/reconcile_geo_replication.go b/pkg/connection/reconcile_geo_replication.go index 715dfe2d..05098e7b 100644 --- a/pkg/connection/reconcile_geo_replication.go +++ b/pkg/connection/reconcile_geo_replication.go @@ -27,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -168,21 +167,15 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con } } } - patch := client.MergeFrom(geoReplication.DeepCopy()) - if controllerutil.RemoveFinalizer(geoReplication, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, geoReplication, patch); err != nil { - log.Error(err, "Failed to remove finalizer") - return err - } + if err := removeFinalizer(ctx, r.conn.client, geoReplication, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to remove finalizer") + return err } } } - patch := client.MergeFrom(geoReplication.DeepCopy()) - if controllerutil.AddFinalizer(geoReplication, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, geoReplication, patch); err != nil { - log.Error(err, "Failed to add finalizer") - return err - } + if err := ensureFinalizer(ctx, r.conn.client, geoReplication, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to add finalizer") + return err } secretUpdated, err := r.checkSecretRefUpdate(*destConnection, geoReplication.Spec.ClusterParamsOverride) diff --git a/pkg/connection/reconcile_namespace.go b/pkg/connection/reconcile_namespace.go index b5d21c7e..5986c05d 100644 --- a/pkg/connection/reconcile_namespace.go +++ b/pkg/connection/reconcile_namespace.go @@ -25,7 +25,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -124,12 +123,9 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(namespace.DeepCopy()) - if controllerutil.RemoveFinalizer(namespace, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, namespace, patch); err != nil { - log.Error(err, "Failed to remove finalizer") - return err - } + if err := removeFinalizer(ctx, r.conn.client, namespace, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to remove finalizer") + return err } return nil @@ -137,12 +133,9 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls if namespace.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(namespace.DeepCopy()) - if controllerutil.AddFinalizer(namespace, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, namespace, patch); err != nil { - log.Error(err, "Failed to add finalizer") - return err - } + if err := ensureFinalizer(ctx, r.conn.client, namespace, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to add finalizer") + return err } } diff --git a/pkg/connection/reconcile_nsisolationpolicy.go b/pkg/connection/reconcile_nsisolationpolicy.go index 4a80d8d0..b37c994a 100644 --- a/pkg/connection/reconcile_nsisolationpolicy.go +++ b/pkg/connection/reconcile_nsisolationpolicy.go @@ -26,7 +26,6 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) // PulsarNSIsolationPolicyReconciler reconciles a PulsarNSIsolationPolicy object @@ -105,24 +104,18 @@ func (r *PulsarNSIsolationPolicyReconciler) ReconcilePolicy(ctx context.Context, } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(policy.DeepCopy()) - if controllerutil.RemoveFinalizer(policy, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, policy, patch); err != nil { - log.Error(err, "Failed to remove finalizer") - return err - } + if err := removeFinalizer(ctx, r.conn.client, policy, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to remove finalizer") + return err } return nil } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(policy.DeepCopy()) - if controllerutil.AddFinalizer(policy, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, policy, patch); err != nil { - log.Error(err, "Failed to add finalizer") - return err - } + if err := ensureFinalizer(ctx, r.conn.client, policy, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to add finalizer") + return err } if resourcev1alpha1.IsPulsarResourceReady(policy) && diff --git a/pkg/connection/reconcile_package.go b/pkg/connection/reconcile_package.go index cba3e748..b7873f6b 100644 --- a/pkg/connection/reconcile_package.go +++ b/pkg/connection/reconcile_package.go @@ -27,7 +27,6 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/feature" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -154,23 +153,17 @@ func (r *PulsarPackageReconciler) ReconcilePackage(ctx context.Context, pulsarAd } } - patch := client.MergeFrom(pkg.DeepCopy()) - if controllerutil.RemoveFinalizer(pkg, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, pkg, patch); err != nil { - log.Error(err, "Failed to remove finalizer") - return err - } + if err := removeFinalizer(ctx, r.conn.client, pkg, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to remove finalizer") + return err } return nil } if pkg.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { - patch := client.MergeFrom(pkg.DeepCopy()) - if controllerutil.AddFinalizer(pkg, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, pkg, patch); err != nil { - log.Error(err, "Failed to add finalizer") - return err - } + if err := ensureFinalizer(ctx, r.conn.client, pkg, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to add finalizer") + return err } } diff --git a/pkg/connection/reconcile_permission.go b/pkg/connection/reconcile_permission.go index 1d253999..16b30c88 100644 --- a/pkg/connection/reconcile_permission.go +++ b/pkg/connection/reconcile_permission.go @@ -24,7 +24,6 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -102,24 +101,18 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu } } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(permission.DeepCopy()) - if controllerutil.RemoveFinalizer(permission, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, permission, patch); err != nil { - log.Error(err, "Failed to remove finalizer") - return err - } + if err := removeFinalizer(ctx, r.conn.client, permission, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to remove finalizer") + return err } return nil } if permission.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(permission.DeepCopy()) - if controllerutil.AddFinalizer(permission, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, permission, patch); err != nil { - log.Error(err, "Failed to add finalizer") - return err - } + if err := ensureFinalizer(ctx, r.conn.client, permission, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to add finalizer") + return err } } diff --git a/pkg/connection/reconcile_sink.go b/pkg/connection/reconcile_sink.go index dffa1df5..7aa1c6e1 100644 --- a/pkg/connection/reconcile_sink.go +++ b/pkg/connection/reconcile_sink.go @@ -26,7 +26,6 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) // PulsarSinkReconciler reconciles a PulsarSink object @@ -99,24 +98,18 @@ func (r *PulsarSinkReconciler) ReconcileSink(ctx context.Context, pulsarAdmin ad } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(sink.DeepCopy()) - if controllerutil.RemoveFinalizer(sink, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, sink, patch); err != nil { - log.Error(err, "Failed to remove finalizer") - return err - } + if err := removeFinalizer(ctx, r.conn.client, sink, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to remove finalizer") + return err } return nil } if sink.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(sink.DeepCopy()) - if controllerutil.AddFinalizer(sink, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, sink, patch); err != nil { - log.Error(err, "Failed to add finalizer") - return err - } + if err := ensureFinalizer(ctx, r.conn.client, sink, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to add finalizer") + return err } } diff --git a/pkg/connection/reconcile_source.go b/pkg/connection/reconcile_source.go index 3867b21e..7dadfdb9 100644 --- a/pkg/connection/reconcile_source.go +++ b/pkg/connection/reconcile_source.go @@ -26,7 +26,6 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) // PulsarSourceReconciler reconciles a PulsarSource object @@ -100,24 +99,18 @@ func (r *PulsarSourceReconciler) ReconcileSource(ctx context.Context, pulsarAdmi } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(source.DeepCopy()) - if controllerutil.RemoveFinalizer(source, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, source, patch); err != nil { - log.Error(err, "Failed to remove finalizer") - return err - } + if err := removeFinalizer(ctx, r.conn.client, source, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to remove finalizer") + return err } return nil } if source.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(source.DeepCopy()) - if controllerutil.AddFinalizer(source, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, source, patch); err != nil { - log.Error(err, "Failed to add finalizer") - return err - } + if err := ensureFinalizer(ctx, r.conn.client, source, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to add finalizer") + return err } } diff --git a/pkg/connection/reconcile_source_test.go b/pkg/connection/reconcile_source_test.go index 7012f3f3..e0144590 100644 --- a/pkg/connection/reconcile_source_test.go +++ b/pkg/connection/reconcile_source_test.go @@ -32,24 +32,11 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/admin" ) -// finalizerCountingClient wraps a client.Client and counts full-object Update and Patch -// calls, so a test can assert that finalizer changes are persisted via a MergeFrom Patch -// (which carries no resourceVersion precondition) and never via a full-object Update. -type finalizerCountingClient struct { - client.Client - updateCalls int - patchCalls int -} - -func (c *finalizerCountingClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { - c.updateCalls++ - return c.Client.Update(ctx, obj, opts...) -} +// foreignFinalizer stands in for a finalizer owned by another controller (e.g. the +// Kubernetes foregroundDeletion finalizer, or a GitOps tool's finalizer). +const foreignFinalizer = "other.io/protect" -func (c *finalizerCountingClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { - c.patchCalls++ - return c.Client.Patch(ctx, obj, patch, opts...) -} +var sourceKey = types.NamespacedName{Namespace: "test-ns", Name: "source"} func newSourceTestScheme(t *testing.T) *runtime.Scheme { t.Helper() @@ -82,34 +69,42 @@ func newDeletingTestSource() *resourcev1alpha1.PulsarSource { } } -// staleSourceCopyAfterConcurrentWrite returns the source as the reconciler would have -// cached it, then advances the server-side resourceVersion out of band (simulating a +func newSourceReconciler(t *testing.T, k8sClient client.Client) *PulsarSourceReconciler { + t.Helper() + return &PulsarSourceReconciler{ + conn: &PulsarConnectionReconciler{ + connection: newReadyTestConnection(), + log: logr.Discard(), + client: k8sClient, + }, + log: logr.Discard(), + } +} + +// cachedSourceAfterConcurrentWrite returns the source as the reconciler would have cached +// it, then advances the server-side object out of band (simulating informer-cache lag or a // concurrent writer such as ArgoCD) so the returned copy is stale relative to the server. -// This is the exact condition under which a full-object Update fails with a 409 conflict. -func staleSourceCopyAfterConcurrentWrite(t *testing.T, k8sClient client.Client) *resourcev1alpha1.PulsarSource { +func cachedSourceAfterConcurrentWrite(t *testing.T, k8sClient client.Client, mutateLive func(*resourcev1alpha1.PulsarSource)) *resourcev1alpha1.PulsarSource { t.Helper() cached := &resourcev1alpha1.PulsarSource{} - if err := k8sClient.Get(context.Background(), - types.NamespacedName{Namespace: "test-ns", Name: "source"}, cached); err != nil { + if err := k8sClient.Get(context.Background(), sourceKey, cached); err != nil { t.Fatalf("get cached source: %v", err) } live := cached.DeepCopy() - live.Annotations = map[string]string{"out-of-band": "bump"} + mutateLive(live) if err := k8sClient.Update(context.Background(), live); err != nil { - t.Fatalf("out-of-band update to bump resourceVersion: %v", err) + t.Fatalf("out-of-band update to advance server state: %v", err) } return cached } // TestReconcileSourceDeletionRemovesFinalizerWithStaleResourceVersion is the regression -// test for the finalizer-conflict fix. When a PulsarSource is being deleted, the operator -// must remove its finalizer via a MergeFrom Patch (no resourceVersion precondition) even -// when the in-memory copy is stale, so the object is garbage-collected promptly without a -// 409 conflict — regardless of informer-cache lag or concurrent writers (e.g. GitOps). -// -// Against the pre-fix implementation (a full-object Update) this test fails: the Update -// carries the stale resourceVersion and the fake API server rejects it with a 409. +// test for the original stuck-Terminating bug: deleting a PulsarSource must remove the +// operator finalizer and let the object be garbage-collected promptly, even when the +// reconciler's in-memory copy is stale relative to the server (informer-cache lag or a +// concurrent writer). The reconcile reads the live object under conflict-retry, so it does +// not get wedged on a 409. func TestReconcileSourceDeletionRemovesFinalizerWithStaleResourceVersion(t *testing.T) { scheme := newSourceTestScheme(t) fakeClient := fake.NewClientBuilder(). @@ -118,51 +113,32 @@ func TestReconcileSourceDeletionRemovesFinalizerWithStaleResourceVersion(t *test WithObjects(newDeletingTestSource()). Build() - // Capture the cached copy, then move the server resourceVersion ahead of it. - stale := staleSourceCopyAfterConcurrentWrite(t, fakeClient) - - countingClient := &finalizerCountingClient{Client: fakeClient} - r := &PulsarSourceReconciler{ - conn: &PulsarConnectionReconciler{ - connection: newReadyTestConnection(), - log: logr.Discard(), - client: countingClient, - }, - log: logr.Discard(), - } + // Capture the cached copy, then advance the live object so the cached copy is stale. + stale := cachedSourceAfterConcurrentWrite(t, fakeClient, func(live *resourcev1alpha1.PulsarSource) { + live.Annotations = map[string]string{"out-of-band": "bump"} + }) + r := newSourceReconciler(t, fakeClient) // DummyPulsarAdmin.DeletePulsarSource is a no-op; the assertion targets the finalizer path. if err := r.ReconcileSource(context.Background(), &admin.DummyPulsarAdmin{}, stale); err != nil { t.Fatalf("ReconcileSource on a deleting source with a stale resourceVersion returned an error "+ "(want nil, no 409 conflict): %v", err) } - if countingClient.updateCalls != 0 { - t.Fatalf("finalizer removal must not use a full-object Update (it carries a resourceVersion "+ - "precondition); got %d Update call(s)", countingClient.updateCalls) - } - if countingClient.patchCalls == 0 { - t.Fatal("expected finalizer removal to persist via a MergeFrom Patch, got 0 Patch calls") - } - - got := &resourcev1alpha1.PulsarSource{} - err := fakeClient.Get(context.Background(), - types.NamespacedName{Namespace: "test-ns", Name: "source"}, got) - if err == nil { - t.Fatalf("expected source to be garbage-collected after finalizer removal, "+ - "but it still exists with finalizers %v", got.Finalizers) - } + err := fakeClient.Get(context.Background(), sourceKey, &resourcev1alpha1.PulsarSource{}) if !apierrors.IsNotFound(err) { - t.Fatalf("expected NotFound after finalizer removal, got: %v", err) + t.Fatalf("expected source to be garbage-collected after finalizer removal, got err: %v", err) } } -// TestSourceStaleFinalizerUpdateConflicts documents the pre-fix failure mode and proves the -// test harness genuinely surfaces it: removing the finalizer with a full-object Update from a -// stale-resourceVersion copy returns a 409 conflict. This is precisely the conflict that the -// MergeFrom Patch fix avoids, and it guards against any regression back to Update-based -// finalizer writes. -func TestSourceStaleFinalizerUpdateConflicts(t *testing.T) { +// TestReconcileSourceDeletionPreservesForeignFinalizer is the regression test for the +// finalizer-clobbering hazard. The reconciler's cached copy only knows about the operator +// finalizer, while another controller has concurrently added its own finalizer to the live +// object. Removing the operator finalizer must remove ONLY the operator finalizer — it must +// not clobber the foreign finalizer (a JSON merge patch from a stale base would replace the +// whole list and drop it) and must not delete the object while the foreign finalizer is +// still pending. +func TestReconcileSourceDeletionPreservesForeignFinalizer(t *testing.T) { scheme := newSourceTestScheme(t) fakeClient := fake.NewClientBuilder(). WithScheme(scheme). @@ -170,11 +146,25 @@ func TestSourceStaleFinalizerUpdateConflicts(t *testing.T) { WithObjects(newDeletingTestSource()). Build() - stale := staleSourceCopyAfterConcurrentWrite(t, fakeClient) + // The reconciler's cached copy has only the operator finalizer; a concurrent controller + // adds a foreign finalizer to the live object, so the cached copy is now stale. + stale := cachedSourceAfterConcurrentWrite(t, fakeClient, func(live *resourcev1alpha1.PulsarSource) { + controllerutil.AddFinalizer(live, foreignFinalizer) + }) + + r := newSourceReconciler(t, fakeClient) + if err := r.ReconcileSource(context.Background(), &admin.DummyPulsarAdmin{}, stale); err != nil { + t.Fatalf("ReconcileSource on a deleting source returned an error: %v", err) + } - controllerutil.RemoveFinalizer(stale, resourcev1alpha1.FinalizerName) - err := fakeClient.Update(context.Background(), stale) - if err == nil || !apierrors.IsConflict(err) { - t.Fatalf("expected a 409 conflict from a stale full-object Update, got: %v", err) + got := &resourcev1alpha1.PulsarSource{} + if err := fakeClient.Get(context.Background(), sourceKey, got); err != nil { + t.Fatalf("source must survive while the foreign finalizer is pending, but Get failed: %v", err) + } + if containsString(got.Finalizers, resourcev1alpha1.FinalizerName) { + t.Fatalf("operator finalizer should have been removed, got %v", got.Finalizers) + } + if !containsString(got.Finalizers, foreignFinalizer) { + t.Fatalf("foreign finalizer must be preserved, got %v", got.Finalizers) } } diff --git a/pkg/connection/reconcile_tenant.go b/pkg/connection/reconcile_tenant.go index f70e95f6..83529444 100644 --- a/pkg/connection/reconcile_tenant.go +++ b/pkg/connection/reconcile_tenant.go @@ -23,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -102,24 +101,18 @@ func (r *PulsarTenantReconciler) ReconcileTenant(ctx context.Context, pulsarAdmi } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(tenant.DeepCopy()) - if controllerutil.RemoveFinalizer(tenant, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, tenant, patch); err != nil { - log.Error(err, "Failed to remove finalizer") - return err - } + if err := removeFinalizer(ctx, r.conn.client, tenant, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to remove finalizer") + return err } return nil } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(tenant.DeepCopy()) - if controllerutil.AddFinalizer(tenant, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, tenant, patch); err != nil { - log.Error(err, "Failed to add finalizer") - return err - } + if err := ensureFinalizer(ctx, r.conn.client, tenant, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to add finalizer") + return err } if resourcev1alpha1.IsPulsarResourceReady(tenant) && diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index afa56c0b..8b8e42b2 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -137,24 +136,18 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(topic.DeepCopy()) - if controllerutil.RemoveFinalizer(topic, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, topic, patch); err != nil { - log.Error(err, "Failed to remove finalizer") - return err - } + if err := removeFinalizer(ctx, r.conn.client, topic, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to remove finalizer") + return err } return nil } if topic.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(topic.DeepCopy()) - if controllerutil.AddFinalizer(topic, resourcev1alpha1.FinalizerName) { - if err := r.conn.client.Patch(ctx, topic, patch); err != nil { - log.Error(err, "Failed to add finalizer") - return err - } + if err := ensureFinalizer(ctx, r.conn.client, topic, resourcev1alpha1.FinalizerName); err != nil { + log.Error(err, "Failed to add finalizer") + return err } } diff --git a/pkg/connection/reconciler.go b/pkg/connection/reconciler.go index e9c4829d..a0bd0341 100644 --- a/pkg/connection/reconciler.go +++ b/pkg/connection/reconciler.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -123,11 +124,8 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error { // keep the connection until all resources has been removed // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(r.connection.DeepCopy()) - if controllerutil.RemoveFinalizer(r.connection, resourcev1alpha1.FinalizerName) { - if err := r.client.Patch(ctx, r.connection, patch); err != nil { - return err - } + if err := removeFinalizer(ctx, r.client, r.connection, resourcev1alpha1.FinalizerName); err != nil { + return err } } else { r.log.Info("There are still remaining resources before deleting the connection", "tenants", len(r.tenants), "namespaces", @@ -148,19 +146,30 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error { } log.Info("Reconciling pulsar resources", "resources", r.unreadyResources) - connectionChanged := false // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - patch := client.MergeFrom(r.connection.DeepCopy()) - if r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL != "" { - r.connection.Spec.AdminServiceURL = r.connection.Spec.AdminServiceSecureURL - connectionChanged = true - } - - if controllerutil.AddFinalizer(r.connection, resourcev1alpha1.FinalizerName) { - connectionChanged = true - } - if connectionChanged { - if err := r.client.Patch(ctx, r.connection, patch); err != nil { + // Default the admin service URL and add the finalizer against the live object under + // optimistic-lock retry. Reading the latest object first ensures a stale cached copy + // cannot overwrite a concurrent spec edit or drop a finalizer owned by another writer. + if (r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL != "") || + !controllerutil.ContainsFinalizer(r.connection, resourcev1alpha1.FinalizerName) { + connectionKey := client.ObjectKeyFromObject(r.connection) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := r.client.Get(ctx, connectionKey, r.connection); err != nil { + return err + } + connectionChanged := false + if r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL != "" { + r.connection.Spec.AdminServiceURL = r.connection.Spec.AdminServiceSecureURL + connectionChanged = true + } + if controllerutil.AddFinalizer(r.connection, resourcev1alpha1.FinalizerName) { + connectionChanged = true + } + if !connectionChanged { + return nil + } + return r.client.Update(ctx, r.connection) + }); err != nil { return err } } diff --git a/pkg/connection/reconciler_test.go b/pkg/connection/reconciler_test.go index fc6cae53..08908060 100644 --- a/pkg/connection/reconciler_test.go +++ b/pkg/connection/reconciler_test.go @@ -177,6 +177,50 @@ func TestPulsarConnectionDeletionKeepsRemainingResourceGuardWhenAlwaysUpdateEnab } } +// TestPulsarConnectionReconcilePreservesConcurrentAdminURLEdit verifies that defaulting +// spec.AdminServiceURL from the secure URL never overwrites a value written concurrently by +// another writer. The reconciler reads the live object under conflict-retry before +// defaulting, so a stale cached copy (AdminServiceURL still empty) cannot clobber a +// concurrent spec edit. +func TestPulsarConnectionReconcilePreservesConcurrentAdminURLEdit(t *testing.T) { + setAlwaysUpdatePulsarResourceForTest(t, true) + + connection := newReadyTestConnection() + connection.Spec.AdminServiceURL = "" + connection.Spec.AdminServiceSecureURL = "https://secure:6651" + + reconciler, _, _ := newConnectionReconcilerForReadyChildrenTest(t, connection) + reconciler.tenants = []resourcev1alpha1.PulsarTenant{newReadyTestTenant()} + + // A concurrent writer sets AdminServiceURL on the live object after the reconciler + // captured its now-stale copy (which still has AdminServiceURL == ""). + key := types.NamespacedName{Namespace: connection.Namespace, Name: connection.Name} + live := &resourcev1alpha1.PulsarConnection{} + if err := reconciler.client.Get(context.Background(), key, live); err != nil { + t.Fatalf("get live connection: %v", err) + } + live.Spec.AdminServiceURL = "http://user-edit:8080" + if err := reconciler.client.Update(context.Background(), live); err != nil { + t.Fatalf("concurrent admin URL edit: %v", err) + } + + if err := reconciler.Reconcile(context.Background()); err != nil { + t.Fatalf("reconcile connection: %v", err) + } + + got := &resourcev1alpha1.PulsarConnection{} + if err := reconciler.client.Get(context.Background(), key, got); err != nil { + t.Fatalf("get updated connection: %v", err) + } + if got.Spec.AdminServiceURL != "http://user-edit:8080" { + t.Fatalf("concurrent AdminServiceURL edit was overwritten: got %q, want %q", + got.Spec.AdminServiceURL, "http://user-edit:8080") + } + if !containsString(got.Finalizers, resourcev1alpha1.FinalizerName) { + t.Fatalf("expected finalizer %q to be added, got %v", resourcev1alpha1.FinalizerName, got.Finalizers) + } +} + func newConnectionReconcilerForReadyChildrenTest(t *testing.T, connection *resourcev1alpha1.PulsarConnection) (*PulsarConnectionReconciler, *countingConnectionChildReconciler, *int) { t.Helper() From ad1cbfdf6255c66a7be4c1e717e307db3fb69b03 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 26 Jun 2026 17:56:26 +0800 Subject: [PATCH 3/3] fix(connection): read finalizer target via uncached API reader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The finalizer helpers and the connection reconcile loop re-read the object under retry.RetryOnConflict before writing, but used the manager's default client, whose reads go through the informer cache. Under cache lag that Get can return a stale resourceVersion, the Update 409s, and the retry re-reads the same stale cache — so deletion could still be delayed until the cache converges, missing the "remove the finalizer on the first reconcile, regardless of informer-cache lag" goal. Thread mgr.GetAPIReader() (an uncached, direct-from-apiserver reader) into the PulsarConnectionReconciler and use it for the finalizer Get; writes still go through the normal client. A ContainsFinalizer guard skips the uncached read entirely on the steady-state path (finalizer already present), so direct reads happen only on the first add and on deletion — both rare. This does not change the clobber fix: a stale-resourceVersion Update is fail-safe (409, never a lost update), so concurrent finalizers were already preserved; the uncached read additionally guarantees deletion is not wedged by a lagging cache. Tests: a finalizer removal driven by a persistently stale reader conflicts (the stuck symptom), while the same removal with a live reader succeeds; and the add-path guard issues no API call when the finalizer is already present. Co-Authored-By: Claude Opus 4.8 --- controllers/pulsarconnection_controller.go | 5 +- main.go | 1 + pkg/connection/finalizer_helpers.go | 63 ++++++---- pkg/connection/finalizer_helpers_test.go | 114 ++++++++++++++++++ pkg/connection/reconcile_function.go | 4 +- pkg/connection/reconcile_geo_replication.go | 4 +- pkg/connection/reconcile_namespace.go | 4 +- pkg/connection/reconcile_nsisolationpolicy.go | 4 +- pkg/connection/reconcile_package.go | 4 +- pkg/connection/reconcile_permission.go | 4 +- pkg/connection/reconcile_sink.go | 4 +- pkg/connection/reconcile_source.go | 4 +- pkg/connection/reconcile_source_test.go | 1 + pkg/connection/reconcile_tenant.go | 4 +- pkg/connection/reconcile_topic.go | 4 +- pkg/connection/reconciler.go | 10 +- pkg/connection/reconciler_test.go | 1 + 17 files changed, 188 insertions(+), 47 deletions(-) create mode 100644 pkg/connection/finalizer_helpers_test.go diff --git a/controllers/pulsarconnection_controller.go b/controllers/pulsarconnection_controller.go index 0ae51fb4..18989480 100644 --- a/controllers/pulsarconnection_controller.go +++ b/controllers/pulsarconnection_controller.go @@ -43,6 +43,9 @@ import ( // PulsarConnectionReconciler reconciles a PulsarConnection object type PulsarConnectionReconciler struct { client.Client + // APIReader is an uncached reader (mgr.GetAPIReader()) used for finalizer reads that must + // observe a fresh resourceVersion, so deletion is not wedged by informer-cache lag. + APIReader client.Reader Scheme *runtime.Scheme Log logr.Logger Recorder record.EventRecorder @@ -112,7 +115,7 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context, req ctrl.Req r.Log.Info("Reconciling PulsarConnection", "name", pulsarConnection.Name, "namespace", pulsarConnection.Namespace) - reconciler := connection.MakeReconciler(r.Log, r.Client, r.PulsarAdminCreator, pulsarConnection, r.Retryer) + reconciler := connection.MakeReconciler(r.Log, r.Client, r.APIReader, r.PulsarAdminCreator, pulsarConnection, r.Retryer) if err := reconciler.Observe(ctx); err != nil { return ctrl.Result{}, err } diff --git a/main.go b/main.go index c99901ba..65482099 100644 --- a/main.go +++ b/main.go @@ -120,6 +120,7 @@ func main() { if err = (&controllers.PulsarConnectionReconciler{ Client: mgr.GetClient(), + APIReader: mgr.GetAPIReader(), Scheme: mgr.GetScheme(), Log: ctrl.Log.WithName("controllers").WithName("PulsarConnection"), Recorder: mgr.GetEventRecorderFor("pulsarconnection-controller"), diff --git a/pkg/connection/finalizer_helpers.go b/pkg/connection/finalizer_helpers.go index a3c279e8..1290e742 100644 --- a/pkg/connection/finalizer_helpers.go +++ b/pkg/connection/finalizer_helpers.go @@ -22,48 +22,65 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) -// ensureFinalizer adds finalizer to obj, persisting the change against the live API -// object and retrying on optimistic-lock conflicts. +// ensureFinalizer adds finalizer to obj, persisting the change against the live API object +// and retrying on optimistic-lock conflicts. +// +// reader must be an uncached reader (e.g. mgr.GetAPIReader()): the manager's default client +// reads through the informer cache, which can lag behind a recent write and hand back a stale +// resourceVersion, causing the Update to 409 and the retry to keep re-reading the same stale +// copy. Reading live guarantees a fresh resourceVersion so the write succeeds on the first +// reconcile regardless of cache lag. Writes use the normal writer client. +// +// Guard: when the cached copy of obj already contains finalizer this returns immediately with +// no API calls, so the common steady-state path (finalizer already present) pays nothing — +// only the first add issues the uncached read. // -// It re-reads obj from the API server before each attempt and mutates only this -// operator's finalizer, so finalizers added concurrently by other controllers are -// preserved. A full-object Update with optimistic concurrency is used (not a JSON -// merge patch, which replaces the whole finalizers list and could drop a concurrent -// finalizer when applied from a stale base). On success obj holds the server's latest -// state, including the refreshed resourceVersion, so callers may safely issue a -// follow-up Status().Update(ctx, obj). -func ensureFinalizer(ctx context.Context, c client.Client, obj client.Object, finalizer string) error { +// Only this operator's finalizer is mutated, so finalizers added concurrently by other +// controllers are preserved. On success obj holds the server's latest state (including the +// refreshed resourceVersion), so callers may safely issue a follow-up Status().Update. +func ensureFinalizer(ctx context.Context, reader client.Reader, writer client.Client, obj client.Object, finalizer string) error { + if controllerutil.ContainsFinalizer(obj, finalizer) { + return nil + } key := client.ObjectKeyFromObject(obj) return retry.RetryOnConflict(retry.DefaultRetry, func() error { - if err := c.Get(ctx, key, obj); err != nil { + if err := reader.Get(ctx, key, obj); err != nil { return err } if !controllerutil.AddFinalizer(obj, finalizer) { - // Already present; nothing to write. + // Already present on the live object; nothing to write. return nil } - return c.Update(ctx, obj) + return writer.Update(ctx, obj) }) } -// removeFinalizer removes finalizer from obj, persisting the change against the live -// API object and retrying on optimistic-lock conflicts. +// removeFinalizer removes finalizer from obj, persisting the change against the live API +// object and retrying on optimistic-lock conflicts. +// +// reader must be an uncached reader (e.g. mgr.GetAPIReader()). Re-reading the live object +// before each attempt yields a fresh resourceVersion, so deletion is not wedged by a stale +// informer cache (the original stuck-Terminating symptom), and only this operator's finalizer +// is removed — finalizers owned by other controllers are preserved and the object is not +// garbage-collected prematurely. Writes use the normal writer client. // -// It re-reads obj before each attempt and removes only this operator's finalizer, so it -// never clobbers finalizers owned by other controllers and never deletes the object -// prematurely while a foreign finalizer is still pending. A NotFound result (the object -// was garbage-collected once its last finalizer was gone) is treated as success. -func removeFinalizer(ctx context.Context, c client.Client, obj client.Object, finalizer string) error { +// Guard: when the cached copy of obj does not contain finalizer this returns immediately with +// no API calls. A NotFound result (the object was garbage-collected once its last finalizer +// was removed) is treated as success. +func removeFinalizer(ctx context.Context, reader client.Reader, writer client.Client, obj client.Object, finalizer string) error { + if !controllerutil.ContainsFinalizer(obj, finalizer) { + return nil + } key := client.ObjectKeyFromObject(obj) err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - if err := c.Get(ctx, key, obj); err != nil { + if err := reader.Get(ctx, key, obj); err != nil { return err } if !controllerutil.RemoveFinalizer(obj, finalizer) { - // Already absent; nothing to write. + // Already absent on the live object; nothing to write. return nil } - return c.Update(ctx, obj) + return writer.Update(ctx, obj) }) return client.IgnoreNotFound(err) } diff --git a/pkg/connection/finalizer_helpers_test.go b/pkg/connection/finalizer_helpers_test.go new file mode 100644 index 00000000..f83958f6 --- /dev/null +++ b/pkg/connection/finalizer_helpers_test.go @@ -0,0 +1,114 @@ +// Copyright 2026 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + "testing" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" +) + +// TestRemoveFinalizerUsesLiveReaderResourceVersion proves the finalizer write is driven by the +// reader, not by the caller's (possibly stale) in-hand object. A reader that keeps returning a +// stale resourceVersion — as the manager's cached client can under informer lag — makes every +// retry 409, so the removal fails (the original stuck-Terminating symptom). The same removal +// with a live reader reads a fresh resourceVersion and succeeds. In production the reader is the +// uncached mgr.GetAPIReader(), so a lagging informer cache cannot wedge deletion. +// +// This is the case ordinary single-fake-client tests cannot exercise: here the reader and the +// write store are deliberately decoupled to simulate cache lag. +func TestRemoveFinalizerUsesLiveReaderResourceVersion(t *testing.T) { + scheme := newSourceTestScheme(t) + topic := &resourcev1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Name: "t", + Namespace: "ns", + Finalizers: []string{resourcev1alpha1.FinalizerName}, + }, + } + writer := fake.NewClientBuilder().WithScheme(scheme).WithObjects(topic).Build() + key := client.ObjectKeyFromObject(topic) + + // Snapshot the object at its current resourceVersion, then advance the live RV out of band. + staleObj := &resourcev1alpha1.PulsarTopic{} + if err := writer.Get(context.Background(), key, staleObj); err != nil { + t.Fatalf("get snapshot: %v", err) + } + bumped := staleObj.DeepCopy() + bumped.Annotations = map[string]string{"bump": "1"} + if err := writer.Update(context.Background(), bumped); err != nil { + t.Fatalf("bump live resourceVersion: %v", err) + } + + // staleReader always returns the pre-bump snapshot — a persistently stale resourceVersion. + staleReader := interceptor.NewClient(writer, interceptor.Funcs{ + Get: func(_ context.Context, _ client.WithWatch, _ client.ObjectKey, obj client.Object, _ ...client.GetOption) error { + staleObj.DeepCopyInto(obj.(*resourcev1alpha1.PulsarTopic)) + return nil + }, + }) + + // Persistently stale reader: every retry re-reads the stale RV and the Update conflicts. + if err := removeFinalizer(context.Background(), staleReader, writer, staleObj.DeepCopy(), + resourcev1alpha1.FinalizerName); !apierrors.IsConflict(err) { + t.Fatalf("expected a 409 conflict from a persistently stale reader, got: %v", err) + } + + // Live (uncached) reader: the removal reads a fresh RV and succeeds. + if err := removeFinalizer(context.Background(), writer, writer, staleObj.DeepCopy(), + resourcev1alpha1.FinalizerName); err != nil { + t.Fatalf("removeFinalizer with a live reader: %v", err) + } + got := &resourcev1alpha1.PulsarTopic{} + if err := writer.Get(context.Background(), key, got); err != nil { + t.Fatalf("get after removal: %v", err) + } + if controllerutil.ContainsFinalizer(got, resourcev1alpha1.FinalizerName) { + t.Fatalf("operator finalizer should have been removed, got %v", got.Finalizers) + } +} + +// TestEnsureFinalizerGuardSkipsReadWhenPresent verifies the steady-state fast path: when the +// cached object already carries the finalizer, ensureFinalizer issues no API calls at all, so +// routing the read through the uncached API reader adds no per-reconcile apiserver load on the +// common path. +func TestEnsureFinalizerGuardSkipsReadWhenPresent(t *testing.T) { + scheme := newSourceTestScheme(t) + writer := fake.NewClientBuilder().WithScheme(scheme).Build() + panicReader := interceptor.NewClient(writer, interceptor.Funcs{ + Get: func(_ context.Context, _ client.WithWatch, _ client.ObjectKey, _ client.Object, _ ...client.GetOption) error { + panic("reader.Get must not be called when the finalizer is already present") + }, + }) + + obj := &resourcev1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Name: "t", + Namespace: "ns", + Finalizers: []string{resourcev1alpha1.FinalizerName}, + }, + } + if err := ensureFinalizer(context.Background(), panicReader, writer, obj, resourcev1alpha1.FinalizerName); err != nil { + t.Fatalf("ensureFinalizer (guard fast path) returned error: %v", err) + } +} diff --git a/pkg/connection/reconcile_function.go b/pkg/connection/reconcile_function.go index 14f3ea9d..5292da44 100644 --- a/pkg/connection/reconcile_function.go +++ b/pkg/connection/reconcile_function.go @@ -100,7 +100,7 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := removeFinalizer(ctx, r.conn.client, instance, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, instance, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -109,7 +109,7 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar if instance.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := ensureFinalizer(ctx, r.conn.client, instance, resourcev1alpha1.FinalizerName); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, instance, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_geo_replication.go b/pkg/connection/reconcile_geo_replication.go index 05098e7b..b5e7bee3 100644 --- a/pkg/connection/reconcile_geo_replication.go +++ b/pkg/connection/reconcile_geo_replication.go @@ -167,13 +167,13 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con } } } - if err := removeFinalizer(ctx, r.conn.client, geoReplication, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, geoReplication, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } } } - if err := ensureFinalizer(ctx, r.conn.client, geoReplication, resourcev1alpha1.FinalizerName); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, geoReplication, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_namespace.go b/pkg/connection/reconcile_namespace.go index 5986c05d..11fd42e4 100644 --- a/pkg/connection/reconcile_namespace.go +++ b/pkg/connection/reconcile_namespace.go @@ -123,7 +123,7 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := removeFinalizer(ctx, r.conn.client, namespace, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, namespace, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -133,7 +133,7 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls if namespace.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := ensureFinalizer(ctx, r.conn.client, namespace, resourcev1alpha1.FinalizerName); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, namespace, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_nsisolationpolicy.go b/pkg/connection/reconcile_nsisolationpolicy.go index b37c994a..df9c68dc 100644 --- a/pkg/connection/reconcile_nsisolationpolicy.go +++ b/pkg/connection/reconcile_nsisolationpolicy.go @@ -104,7 +104,7 @@ func (r *PulsarNSIsolationPolicyReconciler) ReconcilePolicy(ctx context.Context, } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := removeFinalizer(ctx, r.conn.client, policy, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, policy, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -113,7 +113,7 @@ func (r *PulsarNSIsolationPolicyReconciler) ReconcilePolicy(ctx context.Context, } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := ensureFinalizer(ctx, r.conn.client, policy, resourcev1alpha1.FinalizerName); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, policy, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_package.go b/pkg/connection/reconcile_package.go index b7873f6b..79ba44f7 100644 --- a/pkg/connection/reconcile_package.go +++ b/pkg/connection/reconcile_package.go @@ -153,7 +153,7 @@ func (r *PulsarPackageReconciler) ReconcilePackage(ctx context.Context, pulsarAd } } - if err := removeFinalizer(ctx, r.conn.client, pkg, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, pkg, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -161,7 +161,7 @@ func (r *PulsarPackageReconciler) ReconcilePackage(ctx context.Context, pulsarAd } if pkg.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { - if err := ensureFinalizer(ctx, r.conn.client, pkg, resourcev1alpha1.FinalizerName); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, pkg, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_permission.go b/pkg/connection/reconcile_permission.go index 16b30c88..b2642163 100644 --- a/pkg/connection/reconcile_permission.go +++ b/pkg/connection/reconcile_permission.go @@ -101,7 +101,7 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu } } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := removeFinalizer(ctx, r.conn.client, permission, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, permission, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -110,7 +110,7 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu if permission.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := ensureFinalizer(ctx, r.conn.client, permission, resourcev1alpha1.FinalizerName); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, permission, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_sink.go b/pkg/connection/reconcile_sink.go index 7aa1c6e1..4344ff83 100644 --- a/pkg/connection/reconcile_sink.go +++ b/pkg/connection/reconcile_sink.go @@ -98,7 +98,7 @@ func (r *PulsarSinkReconciler) ReconcileSink(ctx context.Context, pulsarAdmin ad } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := removeFinalizer(ctx, r.conn.client, sink, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, sink, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -107,7 +107,7 @@ func (r *PulsarSinkReconciler) ReconcileSink(ctx context.Context, pulsarAdmin ad if sink.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := ensureFinalizer(ctx, r.conn.client, sink, resourcev1alpha1.FinalizerName); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, sink, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_source.go b/pkg/connection/reconcile_source.go index 7dadfdb9..526df361 100644 --- a/pkg/connection/reconcile_source.go +++ b/pkg/connection/reconcile_source.go @@ -99,7 +99,7 @@ func (r *PulsarSourceReconciler) ReconcileSource(ctx context.Context, pulsarAdmi } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := removeFinalizer(ctx, r.conn.client, source, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, source, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -108,7 +108,7 @@ func (r *PulsarSourceReconciler) ReconcileSource(ctx context.Context, pulsarAdmi if source.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := ensureFinalizer(ctx, r.conn.client, source, resourcev1alpha1.FinalizerName); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, source, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_source_test.go b/pkg/connection/reconcile_source_test.go index e0144590..43d4cbc8 100644 --- a/pkg/connection/reconcile_source_test.go +++ b/pkg/connection/reconcile_source_test.go @@ -76,6 +76,7 @@ func newSourceReconciler(t *testing.T, k8sClient client.Client) *PulsarSourceRec connection: newReadyTestConnection(), log: logr.Discard(), client: k8sClient, + apiReader: k8sClient, }, log: logr.Discard(), } diff --git a/pkg/connection/reconcile_tenant.go b/pkg/connection/reconcile_tenant.go index 83529444..f6c01cb3 100644 --- a/pkg/connection/reconcile_tenant.go +++ b/pkg/connection/reconcile_tenant.go @@ -101,7 +101,7 @@ func (r *PulsarTenantReconciler) ReconcileTenant(ctx context.Context, pulsarAdmi } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := removeFinalizer(ctx, r.conn.client, tenant, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, tenant, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -110,7 +110,7 @@ func (r *PulsarTenantReconciler) ReconcileTenant(ctx context.Context, pulsarAdmi } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := ensureFinalizer(ctx, r.conn.client, tenant, resourcev1alpha1.FinalizerName); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, tenant, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index 8b8e42b2..5ef86e5f 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -136,7 +136,7 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := removeFinalizer(ctx, r.conn.client, topic, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, topic, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -145,7 +145,7 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin if topic.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := ensureFinalizer(ctx, r.conn.client, topic, resourcev1alpha1.FinalizerName); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, topic, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconciler.go b/pkg/connection/reconciler.go index a0bd0341..f64f5ce8 100644 --- a/pkg/connection/reconciler.go +++ b/pkg/connection/reconciler.go @@ -44,6 +44,7 @@ type PulsarConnectionReconciler struct { connection *resourcev1alpha1.PulsarConnection log logr.Logger client client.Client + apiReader client.Reader creator admin.PulsarAdminCreator tenants []resourcev1alpha1.PulsarTenant namespaces []resourcev1alpha1.PulsarNamespace @@ -67,13 +68,14 @@ type PulsarConnectionReconciler struct { var _ reconciler.Interface = &PulsarConnectionReconciler{} // MakeReconciler creates resource reconcilers -func MakeReconciler(log logr.Logger, k8sClient client.Client, creator admin.PulsarAdminCreator, +func MakeReconciler(log logr.Logger, k8sClient client.Client, apiReader client.Reader, creator admin.PulsarAdminCreator, connection *resourcev1alpha1.PulsarConnection, retryer *utils.ReconcileRetryer) reconciler.Interface { r := &PulsarConnectionReconciler{ log: log, connection: connection, creator: creator, client: k8sClient, + apiReader: apiReader, retryer: retryer, } r.reconcilers = []reconciler.Interface{ @@ -124,7 +126,7 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error { // keep the connection until all resources has been removed // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if err := removeFinalizer(ctx, r.client, r.connection, resourcev1alpha1.FinalizerName); err != nil { + if err := removeFinalizer(ctx, r.apiReader, r.client, r.connection, resourcev1alpha1.FinalizerName); err != nil { return err } } else { @@ -154,7 +156,9 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error { !controllerutil.ContainsFinalizer(r.connection, resourcev1alpha1.FinalizerName) { connectionKey := client.ObjectKeyFromObject(r.connection) if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - if err := r.client.Get(ctx, connectionKey, r.connection); err != nil { + // Read the live object (uncached) so the retry always starts from a fresh + // resourceVersion; the write below still uses the normal cached client. + if err := r.apiReader.Get(ctx, connectionKey, r.connection); err != nil { return err } connectionChanged := false diff --git a/pkg/connection/reconciler_test.go b/pkg/connection/reconciler_test.go index 08908060..adb8586a 100644 --- a/pkg/connection/reconciler_test.go +++ b/pkg/connection/reconciler_test.go @@ -241,6 +241,7 @@ func newConnectionReconcilerForReadyChildrenTest(t *testing.T, connection *resou connection: connection, log: logr.Discard(), client: k8sClient, + apiReader: k8sClient, creator: func(admin.PulsarAdminConfig) (admin.PulsarAdmin, error) { adminCreateCalls++ return &admin.DummyPulsarAdmin{}, nil