Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 55 additions & 46 deletions internal/quota/admission/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/singleflight"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -97,9 +98,10 @@ type ResourceQuotaEnforcementPlugin struct {
// being marked ready until the resource type's cache has been synced.
resourceTypeValidator validation.ResourceTypeValidator

watchManagers sync.Map // map[string]ClaimWatchManager (projectID -> watch manager, "" = root)
config *AdmissionPluginConfig
logger logr.Logger
watchManagers sync.Map // map[string]ClaimWatchManager (projectID -> watch manager, "" = root)
watchManagerGroup singleflight.Group // collapses concurrent creates for the same projectID
config *AdmissionPluginConfig
logger logr.Logger
}

// Ensure ResourceQuotaEnforcementPlugin implements the required initializer interfaces
Expand Down Expand Up @@ -257,59 +259,66 @@ func (p *ResourceQuotaEnforcementPlugin) getWatchManager(ctx context.Context) (C
return cached.(ClaimWatchManager), nil
}

var client dynamic.Interface
var err error
if projectID == "" {
client = p.dynamicClient
} else {
client, err = p.getProjectClient(projectID)
if err != nil {
return nil, fmt.Errorf("failed to get project client for watch manager: %w", err)
v, err, _ := p.watchManagerGroup.Do(projectID, func() (any, error) {
// Re-check: a prior Do() for the same key may have already stored a watch
// manager while this call was queued.
if cached, ok := p.watchManagers.Load(projectID); ok {
return cached.(ClaimWatchManager), nil
}
}

logger := p.logger.WithName("watch-manager")
if projectID != "" {
logger = logger.WithValues("project", projectID)
}
var client dynamic.Interface
var clientErr error
if projectID == "" {
client = p.dynamicClient
} else {
client, clientErr = p.getProjectClient(projectID)
if clientErr != nil {
return nil, fmt.Errorf("failed to get project client for watch manager: %w", clientErr)
}
}

wm := NewWatchManager(client, logger, projectID)
logger := p.logger.WithName("watch-manager")
if projectID != "" {
logger = logger.WithValues("project", projectID)
}

if wmWithCallback, ok := wm.(*watchManager); ok {
wmWithCallback.SetTTLExpiredCallback(func() {
p.logger.Info("Watch manager TTL expired, removing from cache",
"project", projectID)
p.watchManagers.Delete(projectID)
})
}
wm := NewWatchManager(client, logger, projectID)

// Dedicated timeout prevents admission request timeout from affecting watch manager startup
startupTimeout := 30 * time.Second
startupCtx, startupCancel := context.WithTimeout(context.Background(), startupTimeout)
defer startupCancel()
if wmWithCallback, ok := wm.(*watchManager); ok {
wmWithCallback.SetTTLExpiredCallback(func() {
p.logger.Info("Watch manager TTL expired, removing from cache",
"project", projectID)
p.watchManagers.Delete(projectID)
})
}

startChan := make(chan error, 1)
go func() {
startChan <- wm.Start(startupCtx)
}()
// Dedicated timeout prevents admission request timeout from affecting watch manager startup
startupTimeout := 30 * time.Second
startupCtx, startupCancel := context.WithTimeout(context.Background(), startupTimeout)
defer startupCancel()

select {
case err := <-startChan:
if err != nil {
return nil, fmt.Errorf("failed to start watch manager: %w", err)
startChan := make(chan error, 1)
go func() {
startChan <- wm.Start(startupCtx)
}()

select {
case startErr := <-startChan:
if startErr != nil {
return nil, fmt.Errorf("failed to start watch manager: %w", startErr)
}
case <-startupCtx.Done():
return nil, fmt.Errorf("watch manager startup timed out after %v: %w", startupTimeout, startupCtx.Err())
}
case <-startupCtx.Done():
return nil, fmt.Errorf("watch manager startup timed out after %v: %w", startupTimeout, startupCtx.Err())
}

actual, _ := p.watchManagers.LoadOrStore(projectID, wm)
if projectID == "" {
p.logger.V(2).Info("Created and started watch manager")
} else {
p.logger.V(2).Info("Created and started watch manager",
"project", projectID)
p.watchManagers.Store(projectID, wm)
logger.V(2).Info("Created and started watch manager")
return wm, nil
})
if err != nil {
return nil, err
}
return actual.(ClaimWatchManager), nil
return v.(ClaimWatchManager), nil
}

// Validate implements admission.ValidationInterface and orchestrates the main admission flow
Expand Down
Loading