From ed7140ff4e4910c52ab84e0bac4421c50aad4d2b Mon Sep 17 00:00:00 2001 From: Alex Savanovich <40720931+savme@users.noreply.github.com> Date: Tue, 16 Jun 2026 18:50:05 +0200 Subject: [PATCH] fix: collapse concurrent watch manager creates with singleflight --- internal/quota/admission/plugin.go | 101 ++++++++++++++++------------- 1 file changed, 55 insertions(+), 46 deletions(-) diff --git a/internal/quota/admission/plugin.go b/internal/quota/admission/plugin.go index d2dee948..ec462c5d 100644 --- a/internal/quota/admission/plugin.go +++ b/internal/quota/admission/plugin.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/singleflight" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -97,9 +98,10 @@ type ResourceQuotaEnforcementPlugin struct { // being marked ready until the resource type's cache has been synced. resourceTypeValidator validation.ResourceTypeValidator - watchManagers sync.Map // map[string]ClaimWatchManager (projectID -> watch manager, "" = root) - config *AdmissionPluginConfig - logger logr.Logger + watchManagers sync.Map // map[string]ClaimWatchManager (projectID -> watch manager, "" = root) + watchManagerGroup singleflight.Group // collapses concurrent creates for the same projectID + config *AdmissionPluginConfig + logger logr.Logger } // Ensure ResourceQuotaEnforcementPlugin implements the required initializer interfaces @@ -257,59 +259,66 @@ func (p *ResourceQuotaEnforcementPlugin) getWatchManager(ctx context.Context) (C return cached.(ClaimWatchManager), nil } - var client dynamic.Interface - var err error - if projectID == "" { - client = p.dynamicClient - } else { - client, err = p.getProjectClient(projectID) - if err != nil { - return nil, fmt.Errorf("failed to get project client for watch manager: %w", err) + v, err, _ := p.watchManagerGroup.Do(projectID, func() (any, error) { + // Re-check: a prior Do() for the same key may have already stored a watch + // manager while this call was queued. + if cached, ok := p.watchManagers.Load(projectID); ok { + return cached.(ClaimWatchManager), nil } - } - logger := p.logger.WithName("watch-manager") - if projectID != "" { - logger = logger.WithValues("project", projectID) - } + var client dynamic.Interface + var clientErr error + if projectID == "" { + client = p.dynamicClient + } else { + client, clientErr = p.getProjectClient(projectID) + if clientErr != nil { + return nil, fmt.Errorf("failed to get project client for watch manager: %w", clientErr) + } + } - wm := NewWatchManager(client, logger, projectID) + logger := p.logger.WithName("watch-manager") + if projectID != "" { + logger = logger.WithValues("project", projectID) + } - if wmWithCallback, ok := wm.(*watchManager); ok { - wmWithCallback.SetTTLExpiredCallback(func() { - p.logger.Info("Watch manager TTL expired, removing from cache", - "project", projectID) - p.watchManagers.Delete(projectID) - }) - } + wm := NewWatchManager(client, logger, projectID) - // Dedicated timeout prevents admission request timeout from affecting watch manager startup - startupTimeout := 30 * time.Second - startupCtx, startupCancel := context.WithTimeout(context.Background(), startupTimeout) - defer startupCancel() + if wmWithCallback, ok := wm.(*watchManager); ok { + wmWithCallback.SetTTLExpiredCallback(func() { + p.logger.Info("Watch manager TTL expired, removing from cache", + "project", projectID) + p.watchManagers.Delete(projectID) + }) + } - startChan := make(chan error, 1) - go func() { - startChan <- wm.Start(startupCtx) - }() + // Dedicated timeout prevents admission request timeout from affecting watch manager startup + startupTimeout := 30 * time.Second + startupCtx, startupCancel := context.WithTimeout(context.Background(), startupTimeout) + defer startupCancel() - select { - case err := <-startChan: - if err != nil { - return nil, fmt.Errorf("failed to start watch manager: %w", err) + startChan := make(chan error, 1) + go func() { + startChan <- wm.Start(startupCtx) + }() + + select { + case startErr := <-startChan: + if startErr != nil { + return nil, fmt.Errorf("failed to start watch manager: %w", startErr) + } + case <-startupCtx.Done(): + return nil, fmt.Errorf("watch manager startup timed out after %v: %w", startupTimeout, startupCtx.Err()) } - case <-startupCtx.Done(): - return nil, fmt.Errorf("watch manager startup timed out after %v: %w", startupTimeout, startupCtx.Err()) - } - actual, _ := p.watchManagers.LoadOrStore(projectID, wm) - if projectID == "" { - p.logger.V(2).Info("Created and started watch manager") - } else { - p.logger.V(2).Info("Created and started watch manager", - "project", projectID) + p.watchManagers.Store(projectID, wm) + logger.V(2).Info("Created and started watch manager") + return wm, nil + }) + if err != nil { + return nil, err } - return actual.(ClaimWatchManager), nil + return v.(ClaimWatchManager), nil } // Validate implements admission.ValidationInterface and orchestrates the main admission flow