diff --git a/controllers/pulsarconnection_controller.go b/controllers/pulsarconnection_controller.go index 0ae51fb4..18989480 100644 --- a/controllers/pulsarconnection_controller.go +++ b/controllers/pulsarconnection_controller.go @@ -43,6 +43,9 @@ import ( // PulsarConnectionReconciler reconciles a PulsarConnection object type PulsarConnectionReconciler struct { client.Client + // APIReader is an uncached reader (mgr.GetAPIReader()) used for finalizer reads that must + // observe a fresh resourceVersion, so deletion is not wedged by informer-cache lag. + APIReader client.Reader Scheme *runtime.Scheme Log logr.Logger Recorder record.EventRecorder @@ -112,7 +115,7 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context, req ctrl.Req r.Log.Info("Reconciling PulsarConnection", "name", pulsarConnection.Name, "namespace", pulsarConnection.Namespace) - reconciler := connection.MakeReconciler(r.Log, r.Client, r.PulsarAdminCreator, pulsarConnection, r.Retryer) + reconciler := connection.MakeReconciler(r.Log, r.Client, r.APIReader, r.PulsarAdminCreator, pulsarConnection, r.Retryer) if err := reconciler.Observe(ctx); err != nil { return ctrl.Result{}, err } diff --git a/main.go b/main.go index c99901ba..65482099 100644 --- a/main.go +++ b/main.go @@ -120,6 +120,7 @@ func main() { if err = (&controllers.PulsarConnectionReconciler{ Client: mgr.GetClient(), + APIReader: mgr.GetAPIReader(), Scheme: mgr.GetScheme(), Log: ctrl.Log.WithName("controllers").WithName("PulsarConnection"), Recorder: mgr.GetEventRecorderFor("pulsarconnection-controller"), diff --git a/pkg/connection/finalizer_helpers.go b/pkg/connection/finalizer_helpers.go new file mode 100644 index 00000000..1290e742 --- /dev/null +++ b/pkg/connection/finalizer_helpers.go @@ -0,0 +1,86 @@ +// Copyright 2025 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +// ensureFinalizer adds finalizer to obj, persisting the change against the live API object +// and retrying on optimistic-lock conflicts. +// +// reader must be an uncached reader (e.g. mgr.GetAPIReader()): the manager's default client +// reads through the informer cache, which can lag behind a recent write and hand back a stale +// resourceVersion, causing the Update to 409 and the retry to keep re-reading the same stale +// copy. Reading live guarantees a fresh resourceVersion so the write succeeds on the first +// reconcile regardless of cache lag. Writes use the normal writer client. +// +// Guard: when the cached copy of obj already contains finalizer this returns immediately with +// no API calls, so the common steady-state path (finalizer already present) pays nothing — +// only the first add issues the uncached read. +// +// Only this operator's finalizer is mutated, so finalizers added concurrently by other +// controllers are preserved. On success obj holds the server's latest state (including the +// refreshed resourceVersion), so callers may safely issue a follow-up Status().Update. +func ensureFinalizer(ctx context.Context, reader client.Reader, writer client.Client, obj client.Object, finalizer string) error { + if controllerutil.ContainsFinalizer(obj, finalizer) { + return nil + } + key := client.ObjectKeyFromObject(obj) + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := reader.Get(ctx, key, obj); err != nil { + return err + } + if !controllerutil.AddFinalizer(obj, finalizer) { + // Already present on the live object; nothing to write. + return nil + } + return writer.Update(ctx, obj) + }) +} + +// removeFinalizer removes finalizer from obj, persisting the change against the live API +// object and retrying on optimistic-lock conflicts. +// +// reader must be an uncached reader (e.g. mgr.GetAPIReader()). Re-reading the live object +// before each attempt yields a fresh resourceVersion, so deletion is not wedged by a stale +// informer cache (the original stuck-Terminating symptom), and only this operator's finalizer +// is removed — finalizers owned by other controllers are preserved and the object is not +// garbage-collected prematurely. Writes use the normal writer client. +// +// Guard: when the cached copy of obj does not contain finalizer this returns immediately with +// no API calls. A NotFound result (the object was garbage-collected once its last finalizer +// was removed) is treated as success. +func removeFinalizer(ctx context.Context, reader client.Reader, writer client.Client, obj client.Object, finalizer string) error { + if !controllerutil.ContainsFinalizer(obj, finalizer) { + return nil + } + key := client.ObjectKeyFromObject(obj) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := reader.Get(ctx, key, obj); err != nil { + return err + } + if !controllerutil.RemoveFinalizer(obj, finalizer) { + // Already absent on the live object; nothing to write. + return nil + } + return writer.Update(ctx, obj) + }) + return client.IgnoreNotFound(err) +} diff --git a/pkg/connection/finalizer_helpers_test.go b/pkg/connection/finalizer_helpers_test.go new file mode 100644 index 00000000..f83958f6 --- /dev/null +++ b/pkg/connection/finalizer_helpers_test.go @@ -0,0 +1,114 @@ +// Copyright 2026 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + "testing" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" +) + +// TestRemoveFinalizerUsesLiveReaderResourceVersion proves the finalizer write is driven by the +// reader, not by the caller's (possibly stale) in-hand object. A reader that keeps returning a +// stale resourceVersion — as the manager's cached client can under informer lag — makes every +// retry 409, so the removal fails (the original stuck-Terminating symptom). The same removal +// with a live reader reads a fresh resourceVersion and succeeds. In production the reader is the +// uncached mgr.GetAPIReader(), so a lagging informer cache cannot wedge deletion. +// +// This is the case ordinary single-fake-client tests cannot exercise: here the reader and the +// write store are deliberately decoupled to simulate cache lag. +func TestRemoveFinalizerUsesLiveReaderResourceVersion(t *testing.T) { + scheme := newSourceTestScheme(t) + topic := &resourcev1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Name: "t", + Namespace: "ns", + Finalizers: []string{resourcev1alpha1.FinalizerName}, + }, + } + writer := fake.NewClientBuilder().WithScheme(scheme).WithObjects(topic).Build() + key := client.ObjectKeyFromObject(topic) + + // Snapshot the object at its current resourceVersion, then advance the live RV out of band. + staleObj := &resourcev1alpha1.PulsarTopic{} + if err := writer.Get(context.Background(), key, staleObj); err != nil { + t.Fatalf("get snapshot: %v", err) + } + bumped := staleObj.DeepCopy() + bumped.Annotations = map[string]string{"bump": "1"} + if err := writer.Update(context.Background(), bumped); err != nil { + t.Fatalf("bump live resourceVersion: %v", err) + } + + // staleReader always returns the pre-bump snapshot — a persistently stale resourceVersion. + staleReader := interceptor.NewClient(writer, interceptor.Funcs{ + Get: func(_ context.Context, _ client.WithWatch, _ client.ObjectKey, obj client.Object, _ ...client.GetOption) error { + staleObj.DeepCopyInto(obj.(*resourcev1alpha1.PulsarTopic)) + return nil + }, + }) + + // Persistently stale reader: every retry re-reads the stale RV and the Update conflicts. + if err := removeFinalizer(context.Background(), staleReader, writer, staleObj.DeepCopy(), + resourcev1alpha1.FinalizerName); !apierrors.IsConflict(err) { + t.Fatalf("expected a 409 conflict from a persistently stale reader, got: %v", err) + } + + // Live (uncached) reader: the removal reads a fresh RV and succeeds. + if err := removeFinalizer(context.Background(), writer, writer, staleObj.DeepCopy(), + resourcev1alpha1.FinalizerName); err != nil { + t.Fatalf("removeFinalizer with a live reader: %v", err) + } + got := &resourcev1alpha1.PulsarTopic{} + if err := writer.Get(context.Background(), key, got); err != nil { + t.Fatalf("get after removal: %v", err) + } + if controllerutil.ContainsFinalizer(got, resourcev1alpha1.FinalizerName) { + t.Fatalf("operator finalizer should have been removed, got %v", got.Finalizers) + } +} + +// TestEnsureFinalizerGuardSkipsReadWhenPresent verifies the steady-state fast path: when the +// cached object already carries the finalizer, ensureFinalizer issues no API calls at all, so +// routing the read through the uncached API reader adds no per-reconcile apiserver load on the +// common path. +func TestEnsureFinalizerGuardSkipsReadWhenPresent(t *testing.T) { + scheme := newSourceTestScheme(t) + writer := fake.NewClientBuilder().WithScheme(scheme).Build() + panicReader := interceptor.NewClient(writer, interceptor.Funcs{ + Get: func(_ context.Context, _ client.WithWatch, _ client.ObjectKey, _ client.Object, _ ...client.GetOption) error { + panic("reader.Get must not be called when the finalizer is already present") + }, + }) + + obj := &resourcev1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Name: "t", + Namespace: "ns", + Finalizers: []string{resourcev1alpha1.FinalizerName}, + }, + } + if err := ensureFinalizer(context.Background(), panicReader, writer, obj, resourcev1alpha1.FinalizerName); err != nil { + t.Fatalf("ensureFinalizer (guard fast path) returned error: %v", err) + } +} diff --git a/pkg/connection/reconcile_function.go b/pkg/connection/reconcile_function.go index 6755f06c..5292da44 100644 --- a/pkg/connection/reconcile_function.go +++ b/pkg/connection/reconcile_function.go @@ -25,7 +25,6 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/feature" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -101,8 +100,7 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(instance, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, instance); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, instance, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -111,8 +109,7 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar if instance.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(instance, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, instance); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, instance, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_geo_replication.go b/pkg/connection/reconcile_geo_replication.go index 54063e5c..b5e7bee3 100644 --- a/pkg/connection/reconcile_geo_replication.go +++ b/pkg/connection/reconcile_geo_replication.go @@ -27,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -168,15 +167,13 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con } } } - controllerutil.RemoveFinalizer(geoReplication, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, geoReplication); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, geoReplication, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } } } - controllerutil.AddFinalizer(geoReplication, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, geoReplication); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, geoReplication, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_namespace.go b/pkg/connection/reconcile_namespace.go index 27e76179..11fd42e4 100644 --- a/pkg/connection/reconcile_namespace.go +++ b/pkg/connection/reconcile_namespace.go @@ -25,7 +25,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -124,8 +123,7 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(namespace, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, namespace); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, namespace, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -135,8 +133,7 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls if namespace.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(namespace, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, namespace); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, namespace, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_nsisolationpolicy.go b/pkg/connection/reconcile_nsisolationpolicy.go index 0c78f26a..df9c68dc 100644 --- a/pkg/connection/reconcile_nsisolationpolicy.go +++ b/pkg/connection/reconcile_nsisolationpolicy.go @@ -26,7 +26,6 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) // PulsarNSIsolationPolicyReconciler reconciles a PulsarNSIsolationPolicy object @@ -105,8 +104,7 @@ func (r *PulsarNSIsolationPolicyReconciler) ReconcilePolicy(ctx context.Context, } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(policy, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, policy); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, policy, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -115,8 +113,7 @@ func (r *PulsarNSIsolationPolicyReconciler) ReconcilePolicy(ctx context.Context, } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(policy, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, policy); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, policy, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_package.go b/pkg/connection/reconcile_package.go index cc6992bc..79ba44f7 100644 --- a/pkg/connection/reconcile_package.go +++ b/pkg/connection/reconcile_package.go @@ -27,7 +27,6 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/feature" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -154,8 +153,7 @@ func (r *PulsarPackageReconciler) ReconcilePackage(ctx context.Context, pulsarAd } } - controllerutil.RemoveFinalizer(pkg, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, pkg); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, pkg, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -163,8 +161,7 @@ func (r *PulsarPackageReconciler) ReconcilePackage(ctx context.Context, pulsarAd } if pkg.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { - controllerutil.AddFinalizer(pkg, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, pkg); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, pkg, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_permission.go b/pkg/connection/reconcile_permission.go index e2e9a74b..b2642163 100644 --- a/pkg/connection/reconcile_permission.go +++ b/pkg/connection/reconcile_permission.go @@ -24,7 +24,6 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -102,8 +101,7 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu } } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(permission, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, permission); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, permission, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -112,8 +110,7 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu if permission.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(permission, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, permission); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, permission, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_sink.go b/pkg/connection/reconcile_sink.go index b9aff332..4344ff83 100644 --- a/pkg/connection/reconcile_sink.go +++ b/pkg/connection/reconcile_sink.go @@ -26,7 +26,6 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) // PulsarSinkReconciler reconciles a PulsarSink object @@ -99,8 +98,7 @@ func (r *PulsarSinkReconciler) ReconcileSink(ctx context.Context, pulsarAdmin ad } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(sink, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, sink); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, sink, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -109,8 +107,7 @@ func (r *PulsarSinkReconciler) ReconcileSink(ctx context.Context, pulsarAdmin ad if sink.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(sink, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, sink); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, sink, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_source.go b/pkg/connection/reconcile_source.go index 23a687ac..526df361 100644 --- a/pkg/connection/reconcile_source.go +++ b/pkg/connection/reconcile_source.go @@ -26,7 +26,6 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) // PulsarSourceReconciler reconciles a PulsarSource object @@ -100,8 +99,7 @@ func (r *PulsarSourceReconciler) ReconcileSource(ctx context.Context, pulsarAdmi } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(source, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, source); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, source, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -110,8 +108,7 @@ func (r *PulsarSourceReconciler) ReconcileSource(ctx context.Context, pulsarAdmi if source.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(source, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, source); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, source, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_source_test.go b/pkg/connection/reconcile_source_test.go new file mode 100644 index 00000000..43d4cbc8 --- /dev/null +++ b/pkg/connection/reconcile_source_test.go @@ -0,0 +1,171 @@ +// Copyright 2026 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" + "github.com/streamnative/pulsar-resources-operator/pkg/admin" +) + +// foreignFinalizer stands in for a finalizer owned by another controller (e.g. the +// Kubernetes foregroundDeletion finalizer, or a GitOps tool's finalizer). +const foreignFinalizer = "other.io/protect" + +var sourceKey = types.NamespacedName{Namespace: "test-ns", Name: "source"} + +func newSourceTestScheme(t *testing.T) *runtime.Scheme { + t.Helper() + scheme := runtime.NewScheme() + if err := resourcev1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("add resource scheme: %v", err) + } + return scheme +} + +// newDeletingTestSource returns a PulsarSource that is mid-deletion: it carries a +// deletionTimestamp and the operator finalizer. The fake client permits creating an object +// with a deletionTimestamp as long as it still has at least one finalizer. +func newDeletingTestSource() *resourcev1alpha1.PulsarSource { + deletionTime := metav1.NewTime(time.Unix(1700000000, 0)) + return &resourcev1alpha1.PulsarSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "source", + Namespace: "test-ns", + Generation: 1, + Finalizers: []string{resourcev1alpha1.FinalizerName}, + DeletionTimestamp: &deletionTime, + }, + Spec: resourcev1alpha1.PulsarSourceSpec{ + Tenant: "public", + Namespace: "default", + Name: "source", + LifecyclePolicy: resourcev1alpha1.CleanUpAfterDeletion, + }, + } +} + +func newSourceReconciler(t *testing.T, k8sClient client.Client) *PulsarSourceReconciler { + t.Helper() + return &PulsarSourceReconciler{ + conn: &PulsarConnectionReconciler{ + connection: newReadyTestConnection(), + log: logr.Discard(), + client: k8sClient, + apiReader: k8sClient, + }, + log: logr.Discard(), + } +} + +// cachedSourceAfterConcurrentWrite returns the source as the reconciler would have cached +// it, then advances the server-side object out of band (simulating informer-cache lag or a +// concurrent writer such as ArgoCD) so the returned copy is stale relative to the server. +func cachedSourceAfterConcurrentWrite(t *testing.T, k8sClient client.Client, mutateLive func(*resourcev1alpha1.PulsarSource)) *resourcev1alpha1.PulsarSource { + t.Helper() + cached := &resourcev1alpha1.PulsarSource{} + if err := k8sClient.Get(context.Background(), sourceKey, cached); err != nil { + t.Fatalf("get cached source: %v", err) + } + + live := cached.DeepCopy() + mutateLive(live) + if err := k8sClient.Update(context.Background(), live); err != nil { + t.Fatalf("out-of-band update to advance server state: %v", err) + } + return cached +} + +// TestReconcileSourceDeletionRemovesFinalizerWithStaleResourceVersion is the regression +// test for the original stuck-Terminating bug: deleting a PulsarSource must remove the +// operator finalizer and let the object be garbage-collected promptly, even when the +// reconciler's in-memory copy is stale relative to the server (informer-cache lag or a +// concurrent writer). The reconcile reads the live object under conflict-retry, so it does +// not get wedged on a 409. +func TestReconcileSourceDeletionRemovesFinalizerWithStaleResourceVersion(t *testing.T) { + scheme := newSourceTestScheme(t) + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&resourcev1alpha1.PulsarSource{}). + WithObjects(newDeletingTestSource()). + Build() + + // Capture the cached copy, then advance the live object so the cached copy is stale. + stale := cachedSourceAfterConcurrentWrite(t, fakeClient, func(live *resourcev1alpha1.PulsarSource) { + live.Annotations = map[string]string{"out-of-band": "bump"} + }) + + r := newSourceReconciler(t, fakeClient) + // DummyPulsarAdmin.DeletePulsarSource is a no-op; the assertion targets the finalizer path. + if err := r.ReconcileSource(context.Background(), &admin.DummyPulsarAdmin{}, stale); err != nil { + t.Fatalf("ReconcileSource on a deleting source with a stale resourceVersion returned an error "+ + "(want nil, no 409 conflict): %v", err) + } + + err := fakeClient.Get(context.Background(), sourceKey, &resourcev1alpha1.PulsarSource{}) + if !apierrors.IsNotFound(err) { + t.Fatalf("expected source to be garbage-collected after finalizer removal, got err: %v", err) + } +} + +// TestReconcileSourceDeletionPreservesForeignFinalizer is the regression test for the +// finalizer-clobbering hazard. The reconciler's cached copy only knows about the operator +// finalizer, while another controller has concurrently added its own finalizer to the live +// object. Removing the operator finalizer must remove ONLY the operator finalizer — it must +// not clobber the foreign finalizer (a JSON merge patch from a stale base would replace the +// whole list and drop it) and must not delete the object while the foreign finalizer is +// still pending. +func TestReconcileSourceDeletionPreservesForeignFinalizer(t *testing.T) { + scheme := newSourceTestScheme(t) + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&resourcev1alpha1.PulsarSource{}). + WithObjects(newDeletingTestSource()). + Build() + + // The reconciler's cached copy has only the operator finalizer; a concurrent controller + // adds a foreign finalizer to the live object, so the cached copy is now stale. + stale := cachedSourceAfterConcurrentWrite(t, fakeClient, func(live *resourcev1alpha1.PulsarSource) { + controllerutil.AddFinalizer(live, foreignFinalizer) + }) + + r := newSourceReconciler(t, fakeClient) + if err := r.ReconcileSource(context.Background(), &admin.DummyPulsarAdmin{}, stale); err != nil { + t.Fatalf("ReconcileSource on a deleting source returned an error: %v", err) + } + + got := &resourcev1alpha1.PulsarSource{} + if err := fakeClient.Get(context.Background(), sourceKey, got); err != nil { + t.Fatalf("source must survive while the foreign finalizer is pending, but Get failed: %v", err) + } + if containsString(got.Finalizers, resourcev1alpha1.FinalizerName) { + t.Fatalf("operator finalizer should have been removed, got %v", got.Finalizers) + } + if !containsString(got.Finalizers, foreignFinalizer) { + t.Fatalf("foreign finalizer must be preserved, got %v", got.Finalizers) + } +} diff --git a/pkg/connection/reconcile_tenant.go b/pkg/connection/reconcile_tenant.go index c866b623..f6c01cb3 100644 --- a/pkg/connection/reconcile_tenant.go +++ b/pkg/connection/reconcile_tenant.go @@ -23,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -102,8 +101,7 @@ func (r *PulsarTenantReconciler) ReconcileTenant(ctx context.Context, pulsarAdmi } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(tenant, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, tenant); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, tenant, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -112,8 +110,7 @@ func (r *PulsarTenantReconciler) ReconcileTenant(ctx context.Context, pulsarAdmi } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(tenant, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, tenant); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, tenant, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index dff0de14..5ef86e5f 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" @@ -137,8 +136,7 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin } // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(topic, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, topic); err != nil { + if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, topic, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to remove finalizer") return err } @@ -147,8 +145,7 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin if topic.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.AddFinalizer(topic, resourcev1alpha1.FinalizerName) - if err := r.conn.client.Update(ctx, topic); err != nil { + if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, topic, resourcev1alpha1.FinalizerName); err != nil { log.Error(err, "Failed to add finalizer") return err } diff --git a/pkg/connection/reconciler.go b/pkg/connection/reconciler.go index ef2aeeb2..f64f5ce8 100644 --- a/pkg/connection/reconciler.go +++ b/pkg/connection/reconciler.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -43,6 +44,7 @@ type PulsarConnectionReconciler struct { connection *resourcev1alpha1.PulsarConnection log logr.Logger client client.Client + apiReader client.Reader creator admin.PulsarAdminCreator tenants []resourcev1alpha1.PulsarTenant namespaces []resourcev1alpha1.PulsarNamespace @@ -66,13 +68,14 @@ type PulsarConnectionReconciler struct { var _ reconciler.Interface = &PulsarConnectionReconciler{} // MakeReconciler creates resource reconcilers -func MakeReconciler(log logr.Logger, k8sClient client.Client, creator admin.PulsarAdminCreator, +func MakeReconciler(log logr.Logger, k8sClient client.Client, apiReader client.Reader, creator admin.PulsarAdminCreator, connection *resourcev1alpha1.PulsarConnection, retryer *utils.ReconcileRetryer) reconciler.Interface { r := &PulsarConnectionReconciler{ log: log, connection: connection, creator: creator, client: k8sClient, + apiReader: apiReader, retryer: retryer, } r.reconcilers = []reconciler.Interface{ @@ -123,8 +126,7 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error { // keep the connection until all resources has been removed // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - controllerutil.RemoveFinalizer(r.connection, resourcev1alpha1.FinalizerName) - if err := r.client.Update(ctx, r.connection); err != nil { + if err := removeFinalizer(ctx, r.apiReader, r.client, r.connection, resourcev1alpha1.FinalizerName); err != nil { return err } } else { @@ -146,18 +148,32 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error { } log.Info("Reconciling pulsar resources", "resources", r.unreadyResources) - connectionChanged := false - if r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL != "" { - r.connection.Spec.AdminServiceURL = r.connection.Spec.AdminServiceSecureURL - connectionChanged = true - } - // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer - if controllerutil.AddFinalizer(r.connection, resourcev1alpha1.FinalizerName) { - connectionChanged = true - } - if connectionChanged { - if err := r.client.Update(ctx, r.connection); err != nil { + // Default the admin service URL and add the finalizer against the live object under + // optimistic-lock retry. Reading the latest object first ensures a stale cached copy + // cannot overwrite a concurrent spec edit or drop a finalizer owned by another writer. + if (r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL != "") || + !controllerutil.ContainsFinalizer(r.connection, resourcev1alpha1.FinalizerName) { + connectionKey := client.ObjectKeyFromObject(r.connection) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Read the live object (uncached) so the retry always starts from a fresh + // resourceVersion; the write below still uses the normal cached client. + if err := r.apiReader.Get(ctx, connectionKey, r.connection); err != nil { + return err + } + connectionChanged := false + if r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL != "" { + r.connection.Spec.AdminServiceURL = r.connection.Spec.AdminServiceSecureURL + connectionChanged = true + } + if controllerutil.AddFinalizer(r.connection, resourcev1alpha1.FinalizerName) { + connectionChanged = true + } + if !connectionChanged { + return nil + } + return r.client.Update(ctx, r.connection) + }); err != nil { return err } } diff --git a/pkg/connection/reconciler_test.go b/pkg/connection/reconciler_test.go index fc6cae53..adb8586a 100644 --- a/pkg/connection/reconciler_test.go +++ b/pkg/connection/reconciler_test.go @@ -177,6 +177,50 @@ func TestPulsarConnectionDeletionKeepsRemainingResourceGuardWhenAlwaysUpdateEnab } } +// TestPulsarConnectionReconcilePreservesConcurrentAdminURLEdit verifies that defaulting +// spec.AdminServiceURL from the secure URL never overwrites a value written concurrently by +// another writer. The reconciler reads the live object under conflict-retry before +// defaulting, so a stale cached copy (AdminServiceURL still empty) cannot clobber a +// concurrent spec edit. +func TestPulsarConnectionReconcilePreservesConcurrentAdminURLEdit(t *testing.T) { + setAlwaysUpdatePulsarResourceForTest(t, true) + + connection := newReadyTestConnection() + connection.Spec.AdminServiceURL = "" + connection.Spec.AdminServiceSecureURL = "https://secure:6651" + + reconciler, _, _ := newConnectionReconcilerForReadyChildrenTest(t, connection) + reconciler.tenants = []resourcev1alpha1.PulsarTenant{newReadyTestTenant()} + + // A concurrent writer sets AdminServiceURL on the live object after the reconciler + // captured its now-stale copy (which still has AdminServiceURL == ""). + key := types.NamespacedName{Namespace: connection.Namespace, Name: connection.Name} + live := &resourcev1alpha1.PulsarConnection{} + if err := reconciler.client.Get(context.Background(), key, live); err != nil { + t.Fatalf("get live connection: %v", err) + } + live.Spec.AdminServiceURL = "http://user-edit:8080" + if err := reconciler.client.Update(context.Background(), live); err != nil { + t.Fatalf("concurrent admin URL edit: %v", err) + } + + if err := reconciler.Reconcile(context.Background()); err != nil { + t.Fatalf("reconcile connection: %v", err) + } + + got := &resourcev1alpha1.PulsarConnection{} + if err := reconciler.client.Get(context.Background(), key, got); err != nil { + t.Fatalf("get updated connection: %v", err) + } + if got.Spec.AdminServiceURL != "http://user-edit:8080" { + t.Fatalf("concurrent AdminServiceURL edit was overwritten: got %q, want %q", + got.Spec.AdminServiceURL, "http://user-edit:8080") + } + if !containsString(got.Finalizers, resourcev1alpha1.FinalizerName) { + t.Fatalf("expected finalizer %q to be added, got %v", resourcev1alpha1.FinalizerName, got.Finalizers) + } +} + func newConnectionReconcilerForReadyChildrenTest(t *testing.T, connection *resourcev1alpha1.PulsarConnection) (*PulsarConnectionReconciler, *countingConnectionChildReconciler, *int) { t.Helper() @@ -197,6 +241,7 @@ func newConnectionReconcilerForReadyChildrenTest(t *testing.T, connection *resou connection: connection, log: logr.Discard(), client: k8sClient, + apiReader: k8sClient, creator: func(admin.PulsarAdminConfig) (admin.PulsarAdmin, error) { adminCreateCalls++ return &admin.DummyPulsarAdmin{}, nil