diff --git a/cmd/milo/apiserver/server.go b/cmd/milo/apiserver/server.go index d6858f9c..f2702a6a 100644 --- a/cmd/milo/apiserver/server.go +++ b/cmd/milo/apiserver/server.go @@ -11,6 +11,7 @@ import ( "github.com/spf13/cobra" crd "go.miloapis.com/milo/config/crd" "go.miloapis.com/milo/internal/apiserver/admission/plugin/namespace/lifecycle" + "go.miloapis.com/milo/internal/apiserver/storage/etcdshared" projectstorage "go.miloapis.com/milo/internal/apiserver/storage/project" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -79,6 +80,7 @@ var ( eventsProviderTimeoutSeconds int eventsProviderRetries int eventsForwardExtras []string + sharedETCDClientPoolSize int ) // NewCommand creates a *cobra.Command object with default parameters @@ -199,6 +201,7 @@ func NewCommand() *cobra.Command { fs.IntVar(&eventsProviderTimeoutSeconds, "events-provider-timeout", 30, "Activity provider request timeout in seconds") fs.IntVar(&eventsProviderRetries, "events-provider-retries", 3, "Activity provider request retries") fs.StringSliceVar(&eventsForwardExtras, "events-forward-extras", []string{"iam.miloapis.com/parent-api-group", "iam.miloapis.com/parent-type", "iam.miloapis.com/parent-name"}, "User extras keys to forward to Activity for events") + fs.IntVar(&sharedETCDClientPoolSize, "shared-etcd-client-pool-size", 32, "Number of etcd client connections opened per transport and round-robined across all project control plane watch caches. Higher values spread watch progress traffic across more gRPC streams at the cost of more connections. Minimum 1.") cols, _, _ := term.TerminalSize(cmd.OutOrStdout()) cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols) @@ -240,6 +243,9 @@ func Run(ctx context.Context, opts options.CompletedOptions) error { klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK")) + etcdshared.SetSharedClientPoolSize(sharedETCDClientPoolSize) + klog.InfoS("Shared etcd client pool configured", "size", sharedETCDClientPoolSize) + config, err := NewConfig(opts) if err != nil { return err diff --git a/config/apiserver/deployment.yaml b/config/apiserver/deployment.yaml index a42c0936..2da9b290 100644 --- a/config/apiserver/deployment.yaml +++ b/config/apiserver/deployment.yaml @@ -84,12 +84,19 @@ spec: - --events-provider-retries=$(EVENTS_PROVIDER_RETRIES) - --events-forward-extras=$(EVENTS_FORWARD_EXTRAS) - --shutdown-delay-duration=$(SHUTDOWN_DELAY_DURATION) + # Number of pooled etcd connections per transport for the shared + # watch-cache client; tune per environment via SHARED_ETCD_CLIENT_POOL_SIZE + - --shared-etcd-client-pool-size=$(SHARED_ETCD_CLIENT_POOL_SIZE) env: # Feature gates configuration # Sessions and UserIdentities are GA (enabled by default) # Add EventsProxy=true to enable Events proxy to Activity service - name: FEATURE_GATES value: "" + # Pooled etcd connections per transport for the shared watch-cache + # client. Override per environment to tune watch progress spread. + - name: SHARED_ETCD_CLIENT_POOL_SIZE + value: "32" - name: LOG_LEVEL value: "4" - name: LOGGING_FORMAT diff --git a/internal/apiserver/storage/etcdshared/client.go b/internal/apiserver/storage/etcdshared/client.go new file mode 100644 index 00000000..00b4ee5b --- /dev/null +++ b/internal/apiserver/storage/etcdshared/client.go @@ -0,0 +1,322 @@ +package etcdshared + +import ( + "context" + "fmt" + "net" + "net/url" + "slices" + "strings" + "sync" + "time" + + grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.etcd.io/etcd/client/pkg/v3/transport" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.uber.org/zap" + "google.golang.org/grpc" + "k8s.io/klog/v2" + + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" + genericfeatures "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/server/egressselector" + "k8s.io/apiserver/pkg/storage/etcd3" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" + "k8s.io/apiserver/pkg/storage/storagebackend" + utilfeature "k8s.io/apiserver/pkg/util/feature" + tracing "k8s.io/component-base/tracing" +) + +const ( + keepaliveTime = 30 * time.Second + keepaliveTimeout = 10 * time.Second + dialTimeout = 20 * time.Second + + dbMetricsMonitorJitter = 0.5 +) + +var etcd3ClientLogger = zap.NewNop().Named("etcd-client-shared") + +// newSharedETCDClient builds a *kubernetes.Client from a transport config. It is +// a package var so tests can substitute a dial-free fake. It is a faithful copy +// of the unexported newETCD3Client in +// k8s.io/apiserver/pkg/storage/storagebackend/factory: only the transport +// (endpoints + TLS) determines the connection, which is identical across every +// project control plane and resource type. +var newSharedETCDClient = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) { + tlsInfo := transport.TLSInfo{ + CertFile: c.CertFile, + KeyFile: c.KeyFile, + TrustedCAFile: c.TrustedCAFile, + } + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return nil, err + } + if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.TrustedCAFile) == 0 { + tlsConfig = nil + } + networkContext := egressselector.Etcd.AsNetworkContext() + var egressDialer utilnet.DialFunc + if c.EgressLookup != nil { + egressDialer, err = c.EgressLookup(networkContext) + if err != nil { + return nil, err + } + } + dialOptions := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithChainUnaryInterceptor(grpcprom.UnaryClientInterceptor), + grpc.WithChainStreamInterceptor(grpcprom.StreamClientInterceptor), + } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { + tracingOpts := []otelgrpc.Option{ + otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents), + otelgrpc.WithPropagators(tracing.Propagators()), + otelgrpc.WithTracerProvider(c.TracerProvider), + } + dialOptions = append(dialOptions, + grpc.WithStatsHandler(otelgrpc.NewClientHandler(tracingOpts...))) + } + if egressDialer != nil { + dialer := func(ctx context.Context, addr string) (net.Conn, error) { + if strings.Contains(addr, "//") { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + addr = u.Host + } + return egressDialer(ctx, "tcp", addr) + } + dialOptions = append(dialOptions, grpc.WithContextDialer(dialer)) + } + + cfg := clientv3.Config{ + DialTimeout: dialTimeout, + DialKeepAliveTime: keepaliveTime, + DialKeepAliveTimeout: keepaliveTimeout, + DialOptions: dialOptions, + Endpoints: c.ServerList, + TLS: tlsConfig, + Logger: etcd3ClientLogger, + } + + return kubernetes.New(cfg) +} + +// sharedClientPoolSize is the number of etcd connections opened per transport +// and round-robined across all project control planes and resource types. A +// single connection multiplexes every watch-cache watch over one gRPC watch +// stream; at our scale (~hundreds of projects x dozens of resources that is +// tens of thousands of watches) one stream becomes a head-of-line bottleneck +// and per-prefix caches fall progressively behind the global revision, which +// surfaces as "Too large resource version" / cache consistency failures and +// breaks streaming WatchList. Spreading watches across a small pool keeps the +// per-connection watch count in the range a normal apiserver handles while +// still collapsing the tens of thousands of per-(project x resource) +// connections this package replaced — the memory win is preserved (a few dozen +// connections, not one per resource). It is tunable via +// --shared-etcd-client-pool-size; SetSharedClientPoolSize must be called once at +// startup before any storage is built. +var sharedClientPoolSize = 32 + +// SetSharedClientPoolSize overrides the per-transport pool size. It is intended +// to be called exactly once during apiserver startup, before the first storage +// (and therefore the first acquireClient) is created, so no synchronization is +// needed against acquireClient. Values below 1 are clamped to 1. +func SetSharedClientPoolSize(n int) { + if n < 1 { + n = 1 + } + sharedClientPoolSize = n +} + +type runningClient struct { + clients []*kubernetes.Client + stopDBSizeMonitor func() + next uint64 + refs int +} + +var ( + clientsMu sync.Mutex + clients = map[string]*runningClient{} +) + +// transportKey derives the shared-client cache key. Only the transport +// (endpoints + TLS) determines the connection. +func transportKey(c storagebackend.TransportConfig) string { + endpoints := strings.Join(slices.Sorted(slices.Values(c.ServerList)), ",") + return fmt.Sprintf("%s|%s|%s|%s", endpoints, c.CertFile, c.KeyFile, c.TrustedCAFile) +} + +// acquireClient returns one etcd client from a fixed-size pool shared per +// transport config. All project control planes and resource types that share +// the same transport reuse the same pool of underlying gRPC connections, +// assigned round-robin; per-project isolation is enforced by the etcd key +// prefix at the store layer, not by the connection. Each client's KV is wrapped +// once with the latency tracker (it is stateless and request-context scoped, so +// a single wrapper per client is safe) and a single DB-size monitor is started +// for the pool. The returned release func closes the pool only when the last +// reference for the transport is released. +func acquireClient(c storagebackend.TransportConfig, dbMetricPollInterval time.Duration) (*kubernetes.Client, func(), error) { + clientsMu.Lock() + defer clientsMu.Unlock() + + key := transportKey(c) + rc, found := clients[key] + if !found { + pool := make([]*kubernetes.Client, 0, sharedClientPoolSize) + for i := 0; i < sharedClientPoolSize; i++ { + client, err := newSharedETCDClient(c) + if err != nil { + for _, pc := range pool { + _ = pc.Close() + } + return nil, nil, err + } + client.KV = etcd3.NewETCDLatencyTracker(client.KV) + pool = append(pool, client) + } + + // The DB-size monitor dedups per endpoint internally, so one client + // from the pool is sufficient to drive it. + stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(pool[0].Client, dbMetricPollInterval) + if err != nil { + for _, pc := range pool { + _ = pc.Close() + } + return nil, nil, err + } + + rc = &runningClient{ + clients: pool, + stopDBSizeMonitor: stopDBSizeMonitor, + } + clients[key] = rc + } + + rc.refs++ + client := rc.clients[rc.next%uint64(len(rc.clients))] + rc.next++ + + return client, func() { + clientsMu.Lock() + defer clientsMu.Unlock() + + rc := clients[key] + rc.refs-- + if rc.refs == 0 { + rc.stopDBSizeMonitor() + for _, pc := range rc.clients { + _ = pc.Close() + } + delete(clients, key) + } + }, nil +} + +type runningCompactor struct { + interval time.Duration + client *clientv3.Client + compactor etcd3.Compactor + cancel func() + refs int +} + +var ( + compactorsMu sync.Mutex + compactors = map[string]*runningCompactor{} +) + +// startCompactorOnce starts one compactor per transport, mirroring the +// refcounting semantics of the unexported startCompactorOnce in the upstream +// factory package. The compactor uses its own dedicated client (it must outlive +// individual stores and is never KV-wrapped). +func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (etcd3.Compactor, func(), error) { + compactorsMu.Lock() + defer compactorsMu.Unlock() + + if interval == 0 { + return nil, func() {}, nil + } + key := fmt.Sprintf("%v", c) + if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval { + client, err := newSharedETCDClient(c) + if err != nil { + return nil, nil, err + } + compactorClient := client.Client + + if foundBefore { + compactor.cancel() + } else { + compactor = &runningCompactor{} + compactors[key] = compactor + } + + compactor.interval = interval + compactor.client = compactorClient + cmp := etcd3.StartCompactorPerEndpoint(compactorClient, interval) + compactor.compactor = cmp + compactor.cancel = cmp.Stop + } + + compactors[key].refs++ + + return compactors[key].compactor, func() { + compactorsMu.Lock() + defer compactorsMu.Unlock() + + compactor := compactors[key] + compactor.refs-- + if compactor.refs == 0 { + compactor.cancel() + compactor.client.Close() + delete(compactors, key) + } + }, nil +} + +var ( + dbMetricsMonitorsMu sync.Mutex + dbMetricsMonitors = map[string]struct{}{} +) + +// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and +// update etcd_db_total_size_in_bytes per endpoint. Faithful copy of the +// upstream factory helper; deduped per endpoint within this package. +func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) { + if interval == 0 { + return func() {}, nil + } + dbMetricsMonitorsMu.Lock() + defer dbMetricsMonitorsMu.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + for _, ep := range client.Endpoints() { + if _, found := dbMetricsMonitors[ep]; found { + continue + } + dbMetricsMonitors[ep] = struct{}{} + endpoint := ep + klog.V(4).Infof("Start monitoring storage db size metric for endpoint %s with polling interval %v", endpoint, interval) + go wait.JitterUntilWithContext(ctx, func(context.Context) { + epStatus, err := client.Maintenance.Status(ctx, endpoint) + if err != nil { + klog.V(4).Infof("Failed to get storage db size for ep %s: %v", endpoint, err) + metrics.UpdateEtcdDbSize(endpoint, -1) + } else { + metrics.UpdateEtcdDbSize(endpoint, epStatus.DbSize) + } + }, interval, dbMetricsMonitorJitter, true) + } + + return func() { + cancel() + }, nil +} diff --git a/internal/apiserver/storage/etcdshared/client_test.go b/internal/apiserver/storage/etcdshared/client_test.go new file mode 100644 index 00000000..4ffad253 --- /dev/null +++ b/internal/apiserver/storage/etcdshared/client_test.go @@ -0,0 +1,125 @@ +package etcdshared + +import ( + "context" + "testing" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" + + "k8s.io/apiserver/pkg/storage/storagebackend" +) + +func newFakeClient() *kubernetes.Client { + return &kubernetes.Client{Client: clientv3.NewCtxClient(context.Background())} +} + +func closed(c *kubernetes.Client) bool { + return c.Ctx().Err() != nil +} + +func withFakeClientConstructor(t *testing.T) *int { + t.Helper() + calls := 0 + orig := newSharedETCDClient + newSharedETCDClient = func(storagebackend.TransportConfig) (*kubernetes.Client, error) { + calls++ + return newFakeClient(), nil + } + t.Cleanup(func() { + newSharedETCDClient = orig + clientsMu.Lock() + clients = map[string]*runningClient{} + clientsMu.Unlock() + }) + return &calls +} + +func mkTransport(servers ...string) storagebackend.TransportConfig { + return storagebackend.TransportConfig{ServerList: servers} +} + +func TestAcquireClient_SharedAcrossProjects(t *testing.T) { + calls := withFakeClientConstructor(t) + tc := mkTransport("https://etcd:2379") + + c1, rel1, err := acquireClient(tc, 0) + if err != nil { + t.Fatalf("first acquire: %v", err) + } + c2, rel2, err := acquireClient(tc, 0) + if err != nil { + t.Fatalf("second acquire: %v", err) + } + + // The pool for a transport is dialed once, lazily, on first acquire. + if *calls != sharedClientPoolSize { + t.Fatalf("expected pool dialed once (%d clients), got %d", sharedClientPoolSize, *calls) + } + if c1 == nil || c2 == nil { + t.Fatalf("expected non-nil clients from the pool") + } + + // Releasing one project's storage must NOT close the pool while another holds a ref. + rel1() + if closed(c1) { + t.Fatalf("pool closed while a reference is still held") + } + + // Last release closes the pool and drops the cache entry. + rel2() + if !closed(c1) || !closed(c2) { + t.Fatalf("pool not closed after final release") + } + + clientsMu.Lock() + _, present := clients[transportKey(tc)] + clientsMu.Unlock() + if present { + t.Fatalf("cache entry not removed after final release") + } +} + +func TestAcquireClient_DistinctTransportsDoNotShare(t *testing.T) { + calls := withFakeClientConstructor(t) + + cA, relA, err := acquireClient(mkTransport("https://a:2379"), 0) + if err != nil { + t.Fatalf("acquire A: %v", err) + } + cB, relB, err := acquireClient(mkTransport("https://b:2379"), 0) + if err != nil { + t.Fatalf("acquire B: %v", err) + } + + if *calls != 2*sharedClientPoolSize { + t.Fatalf("expected two pools dialed (%d clients), got %d", 2*sharedClientPoolSize, *calls) + } + if cA == cB { + t.Fatalf("distinct transports unexpectedly shared a client") + } + + relA() + relB() + if !closed(cA) || !closed(cB) { + t.Fatalf("clients not closed after release") + } +} + +func TestAcquireClient_ReClosedAfterFullReleaseCycle(t *testing.T) { + calls := withFakeClientConstructor(t) + tc := mkTransport("https://etcd:2379") + + _, rel1, _ := acquireClient(tc, 0) + rel1() + if *calls != sharedClientPoolSize { + t.Fatalf("expected one pool dialed (%d clients), got %d", sharedClientPoolSize, *calls) + } + + // After the entry was torn down, a fresh acquire dials a new pool. + _, rel2, _ := acquireClient(tc, 0) + if *calls != 2*sharedClientPoolSize { + t.Fatalf("expected re-dial after teardown (%d clients), got %d", 2*sharedClientPoolSize, *calls) + } + rel2() +} diff --git a/internal/apiserver/storage/etcdshared/decorator.go b/internal/apiserver/storage/etcdshared/decorator.go new file mode 100644 index 00000000..79c1a68e --- /dev/null +++ b/internal/apiserver/storage/etcdshared/decorator.go @@ -0,0 +1,150 @@ +package etcdshared + +import ( + "sync" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" + "k8s.io/klog/v2" + + "k8s.io/apimachinery/pkg/runtime" + genericfeatures "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/storage" + cacherstorage "k8s.io/apiserver/pkg/storage/cacher" + "k8s.io/apiserver/pkg/storage/etcd3" + etcdfeature "k8s.io/apiserver/pkg/storage/feature" + "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/apiserver/pkg/storage/storagebackend/factory" + "k8s.io/apiserver/pkg/storage/value/encrypt/identity" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/tools/cache" +) + +// newRawStorage builds an etcd3 store backed by the transport-keyed shared +// client. It is a copy of the unexported newETCD3Storage in the upstream +// factory package with one change: the client is acquired from the refcounted +// shared cache instead of dialing a fresh connection per (project x resource). +func newRawStorage(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, factory.DestroyFunc, error) { + compactor, stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval) + if err != nil { + return nil, nil, err + } + + client, releaseClient, err := acquireClient(c.Transport, c.DBMetricPollInterval) + if err != nil { + stopCompactor() + return nil, nil, err + } + + // etcd3.New calls DefaultFeatureSupportChecker.CheckClient using the client we + // pass it. The per-store client below has no endpoints (NewCtxClient), so the + // check would be a no-op. Call it explicitly for the pool client here instead. + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ConsistentListFromCache) || + utilfeature.DefaultFeatureGate.Enabled(genericfeatures.WatchList) { + etcdfeature.DefaultFeatureSupportChecker.CheckClient( + client.Ctx(), client.Client, storage.RequestWatchProgress) + } + + // Each store gets its own gRPC watch stream over the shared TCP connection so + // that watch creations don't serialize across stores sharing the same pool slot. + // KV, Lease, and Maintenance still use the shared pool client. + // + // NewCtxClient allocates a fresh *clientv3.Client with its own mutexes — no + // mutex-bearing struct copy. We then graft only the interface fields we need. + perStoreBase := clientv3.NewCtxClient(client.Ctx()) + perStoreBase.KV = client.KV + perStoreBase.Lease = client.Client.Lease + perStoreBase.Maintenance = client.Client.Maintenance + perStoreBase.Watcher = clientv3.NewWatcher(client.Client) + perStoreClient := &kubernetes.Client{Client: perStoreBase} + perStoreClient.Kubernetes = perStoreClient + + transformer := c.Transformer + if transformer == nil { + transformer = identity.NewEncryptCheckTransformer() + } + + versioner := storage.APIObjectVersioner{} + decoder := etcd3.NewDefaultDecoder(c.Codec, versioner) + + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AllowUnsafeMalformedObjectDeletion) { + transformer = etcd3.WithCorruptObjErrorHandlingTransformer(transformer) + decoder = etcd3.WithCorruptObjErrorHandlingDecoder(decoder) + } + store, err := etcd3.New(perStoreClient, compactor, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner) + if err != nil { + stopCompactor() + perStoreBase.Watcher.Close() + releaseClient() + return nil, nil, err + } + var once sync.Once + destroyFunc := func() { + once.Do(func() { + stopCompactor() + store.Close() + perStoreBase.Watcher.Close() + releaseClient() + }) + } + var st storage.Interface = store + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AllowUnsafeMalformedObjectDeletion) { + st = etcd3.NewStoreWithUnsafeCorruptObjectDeletion(st, c.GroupResource) + } + return st, destroyFunc, nil +} + +// StorageWithSharedCacher mirrors genericregistry.StorageWithCacher but builds +// the raw store from the transport-keyed shared etcd client. The cacher wrapping +// is identical to upstream and never touches the client. +func StorageWithSharedCacher() generic.StorageDecorator { + return func( + storageConfig *storagebackend.ConfigForResource, + resourcePrefix string, + keyFunc func(obj runtime.Object) (string, error), + newFunc func() runtime.Object, + newListFunc func() runtime.Object, + getAttrsFunc storage.AttrFunc, + triggerFuncs storage.IndexerFuncs, + indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { + + s, d, err := newRawStorage(*storageConfig, newFunc, newListFunc, resourcePrefix) + if err != nil { + return s, d, err + } + if klogV := klog.V(5); klogV.Enabled() { + klogV.InfoS("Storage caching is enabled (shared etcd client)", "type", newFunc()) + } + + cacherConfig := cacherstorage.Config{ + Storage: s, + Versioner: storage.APIObjectVersioner{}, + GroupResource: storageConfig.GroupResource, + EventsHistoryWindow: storageConfig.EventsHistoryWindow, + ResourcePrefix: resourcePrefix, + KeyFunc: keyFunc, + NewFunc: newFunc, + NewListFunc: newListFunc, + GetAttrsFunc: getAttrsFunc, + IndexerFuncs: triggerFuncs, + Indexers: indexers, + Codec: storageConfig.Codec, + } + cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig) + if err != nil { + return nil, func() {}, err + } + delegator := cacherstorage.NewCacheDelegator(cacher, s) + var once sync.Once + destroyFunc := func() { + once.Do(func() { + delegator.Stop() + cacher.Stop() + d() + }) + } + + return delegator, destroyFunc, nil + } +} diff --git a/internal/apiserver/storage/project/restoptions.go b/internal/apiserver/storage/project/restoptions.go index 17278259..bcbcd9f1 100644 --- a/internal/apiserver/storage/project/restoptions.go +++ b/internal/apiserver/storage/project/restoptions.go @@ -6,7 +6,8 @@ import ( "k8s.io/client-go/rest" generic "k8s.io/apiserver/pkg/registry/generic" - genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" + + "go.miloapis.com/milo/internal/apiserver/storage/etcdshared" ) // Wrap the upstream RESTOptionsGetter to install a per-project decorator. @@ -37,10 +38,6 @@ func (g roGetter) GetRESTOptions(gr schema.GroupResource, example runtime.Object } // Ensure we always wrap with our project-aware decorator. - if opts.Decorator == nil { - opts.Decorator = ProjectAwareDecorator(gr, genericregistry.StorageWithCacher(), g.loopbackConfig) - } else { - opts.Decorator = ProjectAwareDecorator(gr, opts.Decorator, g.loopbackConfig) - } + opts.Decorator = ProjectAwareDecorator(gr, etcdshared.StorageWithSharedCacher(), g.loopbackConfig) return opts, nil }