Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion controllers/pulsarconnection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ import (
// PulsarConnectionReconciler reconciles a PulsarConnection object
type PulsarConnectionReconciler struct {
client.Client
// APIReader is an uncached reader (mgr.GetAPIReader()) used for finalizer reads that must
// observe a fresh resourceVersion, so deletion is not wedged by informer-cache lag.
APIReader client.Reader
Scheme *runtime.Scheme
Log logr.Logger
Recorder record.EventRecorder
Expand Down Expand Up @@ -112,7 +115,7 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context, req ctrl.Req

r.Log.Info("Reconciling PulsarConnection", "name", pulsarConnection.Name, "namespace", pulsarConnection.Namespace)

reconciler := connection.MakeReconciler(r.Log, r.Client, r.PulsarAdminCreator, pulsarConnection, r.Retryer)
reconciler := connection.MakeReconciler(r.Log, r.Client, r.APIReader, r.PulsarAdminCreator, pulsarConnection, r.Retryer)
if err := reconciler.Observe(ctx); err != nil {
return ctrl.Result{}, err
}
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func main() {

if err = (&controllers.PulsarConnectionReconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("PulsarConnection"),
Recorder: mgr.GetEventRecorderFor("pulsarconnection-controller"),
Expand Down
86 changes: 86 additions & 0 deletions pkg/connection/finalizer_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2025 StreamNative
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connection

import (
"context"

"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

// ensureFinalizer adds finalizer to obj, persisting the change against the live API object
// and retrying on optimistic-lock conflicts.
//
// reader must be an uncached reader (e.g. mgr.GetAPIReader()): the manager's default client
// reads through the informer cache, which can lag behind a recent write and hand back a stale
// resourceVersion, causing the Update to 409 and the retry to keep re-reading the same stale
// copy. Reading live guarantees a fresh resourceVersion so the write succeeds on the first
// reconcile regardless of cache lag. Writes use the normal writer client.
//
// Guard: when the cached copy of obj already contains finalizer this returns immediately with
// no API calls, so the common steady-state path (finalizer already present) pays nothing —
// only the first add issues the uncached read.
//
// Only this operator's finalizer is mutated, so finalizers added concurrently by other
// controllers are preserved. On success obj holds the server's latest state (including the
// refreshed resourceVersion), so callers may safely issue a follow-up Status().Update.
func ensureFinalizer(ctx context.Context, reader client.Reader, writer client.Client, obj client.Object, finalizer string) error {
if controllerutil.ContainsFinalizer(obj, finalizer) {
return nil
}
key := client.ObjectKeyFromObject(obj)
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := reader.Get(ctx, key, obj); err != nil {
return err
}
if !controllerutil.AddFinalizer(obj, finalizer) {
// Already present on the live object; nothing to write.
return nil
}
return writer.Update(ctx, obj)
})
}

// removeFinalizer removes finalizer from obj, persisting the change against the live API
// object and retrying on optimistic-lock conflicts.
//
// reader must be an uncached reader (e.g. mgr.GetAPIReader()). Re-reading the live object
// before each attempt yields a fresh resourceVersion, so deletion is not wedged by a stale
// informer cache (the original stuck-Terminating symptom), and only this operator's finalizer
// is removed — finalizers owned by other controllers are preserved and the object is not
// garbage-collected prematurely. Writes use the normal writer client.
//
// Guard: when the cached copy of obj does not contain finalizer this returns immediately with
// no API calls. A NotFound result (the object was garbage-collected once its last finalizer
// was removed) is treated as success.
func removeFinalizer(ctx context.Context, reader client.Reader, writer client.Client, obj client.Object, finalizer string) error {
if !controllerutil.ContainsFinalizer(obj, finalizer) {
return nil
}
key := client.ObjectKeyFromObject(obj)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := reader.Get(ctx, key, obj); err != nil {
return err
}
if !controllerutil.RemoveFinalizer(obj, finalizer) {
// Already absent on the live object; nothing to write.
return nil
}
return writer.Update(ctx, obj)
})
return client.IgnoreNotFound(err)
}
114 changes: 114 additions & 0 deletions pkg/connection/finalizer_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2026 StreamNative
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connection

import (
"context"
"testing"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
)

// TestRemoveFinalizerUsesLiveReaderResourceVersion proves the finalizer write is driven by the
// reader, not by the caller's (possibly stale) in-hand object. A reader that keeps returning a
// stale resourceVersion — as the manager's cached client can under informer lag — makes every
// retry 409, so the removal fails (the original stuck-Terminating symptom). The same removal
// with a live reader reads a fresh resourceVersion and succeeds. In production the reader is the
// uncached mgr.GetAPIReader(), so a lagging informer cache cannot wedge deletion.
//
// This is the case ordinary single-fake-client tests cannot exercise: here the reader and the
// write store are deliberately decoupled to simulate cache lag.
func TestRemoveFinalizerUsesLiveReaderResourceVersion(t *testing.T) {
scheme := newSourceTestScheme(t)
topic := &resourcev1alpha1.PulsarTopic{
ObjectMeta: metav1.ObjectMeta{
Name: "t",
Namespace: "ns",
Finalizers: []string{resourcev1alpha1.FinalizerName},
},
}
writer := fake.NewClientBuilder().WithScheme(scheme).WithObjects(topic).Build()
key := client.ObjectKeyFromObject(topic)

// Snapshot the object at its current resourceVersion, then advance the live RV out of band.
staleObj := &resourcev1alpha1.PulsarTopic{}
if err := writer.Get(context.Background(), key, staleObj); err != nil {
t.Fatalf("get snapshot: %v", err)
}
bumped := staleObj.DeepCopy()
bumped.Annotations = map[string]string{"bump": "1"}
if err := writer.Update(context.Background(), bumped); err != nil {
t.Fatalf("bump live resourceVersion: %v", err)
}

// staleReader always returns the pre-bump snapshot — a persistently stale resourceVersion.
staleReader := interceptor.NewClient(writer, interceptor.Funcs{
Get: func(_ context.Context, _ client.WithWatch, _ client.ObjectKey, obj client.Object, _ ...client.GetOption) error {
staleObj.DeepCopyInto(obj.(*resourcev1alpha1.PulsarTopic))
return nil
},
})

// Persistently stale reader: every retry re-reads the stale RV and the Update conflicts.
if err := removeFinalizer(context.Background(), staleReader, writer, staleObj.DeepCopy(),
resourcev1alpha1.FinalizerName); !apierrors.IsConflict(err) {
t.Fatalf("expected a 409 conflict from a persistently stale reader, got: %v", err)
}

// Live (uncached) reader: the removal reads a fresh RV and succeeds.
if err := removeFinalizer(context.Background(), writer, writer, staleObj.DeepCopy(),
resourcev1alpha1.FinalizerName); err != nil {
t.Fatalf("removeFinalizer with a live reader: %v", err)
}
got := &resourcev1alpha1.PulsarTopic{}
if err := writer.Get(context.Background(), key, got); err != nil {
t.Fatalf("get after removal: %v", err)
}
if controllerutil.ContainsFinalizer(got, resourcev1alpha1.FinalizerName) {
t.Fatalf("operator finalizer should have been removed, got %v", got.Finalizers)
}
}

// TestEnsureFinalizerGuardSkipsReadWhenPresent verifies the steady-state fast path: when the
// cached object already carries the finalizer, ensureFinalizer issues no API calls at all, so
// routing the read through the uncached API reader adds no per-reconcile apiserver load on the
// common path.
func TestEnsureFinalizerGuardSkipsReadWhenPresent(t *testing.T) {
scheme := newSourceTestScheme(t)
writer := fake.NewClientBuilder().WithScheme(scheme).Build()
panicReader := interceptor.NewClient(writer, interceptor.Funcs{
Get: func(_ context.Context, _ client.WithWatch, _ client.ObjectKey, _ client.Object, _ ...client.GetOption) error {
panic("reader.Get must not be called when the finalizer is already present")
},
})

obj := &resourcev1alpha1.PulsarTopic{
ObjectMeta: metav1.ObjectMeta{
Name: "t",
Namespace: "ns",
Finalizers: []string{resourcev1alpha1.FinalizerName},
},
}
if err := ensureFinalizer(context.Background(), panicReader, writer, obj, resourcev1alpha1.FinalizerName); err != nil {
t.Fatalf("ensureFinalizer (guard fast path) returned error: %v", err)
}
}
7 changes: 2 additions & 5 deletions pkg/connection/reconcile_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
"k8s.io/apimachinery/pkg/api/meta"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
"github.com/streamnative/pulsar-resources-operator/pkg/admin"
Expand Down Expand Up @@ -101,8 +100,7 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar
}

// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
controllerutil.RemoveFinalizer(instance, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, instance); err != nil {
if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, instance, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to remove finalizer")
return err
}
Expand All @@ -111,8 +109,7 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar

if instance.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion {
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
controllerutil.AddFinalizer(instance, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, instance); err != nil {
if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, instance, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to add finalizer")
return err
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/connection/reconcile_geo_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
"github.com/streamnative/pulsar-resources-operator/pkg/admin"
Expand Down Expand Up @@ -168,15 +167,13 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con
}
}
}
controllerutil.RemoveFinalizer(geoReplication, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, geoReplication); err != nil {
if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, geoReplication, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to remove finalizer")
return err
}
}
}
controllerutil.AddFinalizer(geoReplication, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, geoReplication); err != nil {
if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, geoReplication, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to add finalizer")
return err
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/connection/reconcile_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
"github.com/streamnative/pulsar-resources-operator/pkg/admin"
Expand Down Expand Up @@ -124,8 +123,7 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls
}

// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
controllerutil.RemoveFinalizer(namespace, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, namespace); err != nil {
if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, namespace, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to remove finalizer")
return err
}
Expand All @@ -135,8 +133,7 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls

if namespace.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion {
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
controllerutil.AddFinalizer(namespace, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, namespace); err != nil {
if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, namespace, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to add finalizer")
return err
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/connection/reconcile_nsisolationpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/streamnative/pulsar-resources-operator/pkg/reconciler"
"k8s.io/apimachinery/pkg/api/meta"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

// PulsarNSIsolationPolicyReconciler reconciles a PulsarNSIsolationPolicy object
Expand Down Expand Up @@ -105,8 +104,7 @@ func (r *PulsarNSIsolationPolicyReconciler) ReconcilePolicy(ctx context.Context,
}

// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
controllerutil.RemoveFinalizer(policy, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, policy); err != nil {
if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, policy, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to remove finalizer")
return err
}
Expand All @@ -115,8 +113,7 @@ func (r *PulsarNSIsolationPolicyReconciler) ReconcilePolicy(ctx context.Context,
}

// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
controllerutil.AddFinalizer(policy, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, policy); err != nil {
if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, policy, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to add finalizer")
return err
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/connection/reconcile_package.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
"k8s.io/apimachinery/pkg/api/meta"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
"github.com/streamnative/pulsar-resources-operator/pkg/admin"
Expand Down Expand Up @@ -154,17 +153,15 @@ func (r *PulsarPackageReconciler) ReconcilePackage(ctx context.Context, pulsarAd
}
}

controllerutil.RemoveFinalizer(pkg, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, pkg); err != nil {
if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, pkg, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to remove finalizer")
return err
}
return nil
}

if pkg.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion {
controllerutil.AddFinalizer(pkg, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, pkg); err != nil {
if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, pkg, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to add finalizer")
return err
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/connection/reconcile_permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
"github.com/streamnative/pulsar-resources-operator/pkg/admin"
Expand Down Expand Up @@ -102,8 +101,7 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu
}
}
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
controllerutil.RemoveFinalizer(permission, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, permission); err != nil {
if err := removeFinalizer(ctx, r.conn.apiReader, r.conn.client, permission, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to remove finalizer")
return err
}
Expand All @@ -112,8 +110,7 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu

if permission.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion {
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
controllerutil.AddFinalizer(permission, resourcev1alpha1.FinalizerName)
if err := r.conn.client.Update(ctx, permission); err != nil {
if err := ensureFinalizer(ctx, r.conn.apiReader, r.conn.client, permission, resourcev1alpha1.FinalizerName); err != nil {
log.Error(err, "Failed to add finalizer")
return err
}
Expand Down
Loading
Loading