From 69a27ca469f4de55173e3b08a8ca811152ee862d Mon Sep 17 00:00:00 2001 From: Scot Wells Date: Thu, 25 Jun 2026 12:38:47 -0500 Subject: [PATCH 1/7] perf(apiserver): share etcd client across project control planes The apiserver opened a dedicated etcd connection for every (project x resource) pair and never shared them. Across ~500 project control planes this produced tens of thousands of mostly-idle connections that dominated apiserver memory through per-connection gRPC read/write buffers and the goroutines each connection spawns. Share a single etcd client per transport config across all projects and resources. Per-project isolation is the etcd key prefix, applied at the store layer, so the connection carries no per-project state and is safe to pool. Clients are reference-counted and closed only when the last project storage using them is torn down. Implemented as a self-contained etcdshared package with no upstream or vendor changes: it builds a shared-client raw storage backend and wraps it with the unchanged upstream cacher, then points the project-aware decorator at it. Claude-Session: https://claude.ai/code/session_01PgQX8ky2mbuEieE7BR5Eu8 --- .../apiserver/storage/etcdshared/client.go | 277 ++++++++++++++++++ .../storage/etcdshared/client_test.go | 124 ++++++++ .../apiserver/storage/etcdshared/decorator.go | 122 ++++++++ .../apiserver/storage/project/restoptions.go | 5 +- 4 files changed, 526 insertions(+), 2 deletions(-) create mode 100644 internal/apiserver/storage/etcdshared/client.go create mode 100644 internal/apiserver/storage/etcdshared/client_test.go create mode 100644 internal/apiserver/storage/etcdshared/decorator.go diff --git a/internal/apiserver/storage/etcdshared/client.go b/internal/apiserver/storage/etcdshared/client.go new file mode 100644 index 00000000..03baa796 --- /dev/null +++ b/internal/apiserver/storage/etcdshared/client.go @@ -0,0 +1,277 @@ +package etcdshared + +import ( + "context" + "fmt" + "net" + "net/url" + "strings" + "sync" + "time" + + grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.etcd.io/etcd/client/pkg/v3/transport" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.uber.org/zap" + "google.golang.org/grpc" + "k8s.io/klog/v2" + + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" + genericfeatures "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/server/egressselector" + "k8s.io/apiserver/pkg/storage/etcd3" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" + "k8s.io/apiserver/pkg/storage/storagebackend" + utilfeature "k8s.io/apiserver/pkg/util/feature" + tracing "k8s.io/component-base/tracing" +) + +const ( + keepaliveTime = 30 * time.Second + keepaliveTimeout = 10 * time.Second + dialTimeout = 20 * time.Second + + dbMetricsMonitorJitter = 0.5 +) + +var etcd3ClientLogger = zap.NewNop().Named("etcd-client-shared") + +// newSharedETCDClient builds a *kubernetes.Client from a transport config. It is +// a package var so tests can substitute a dial-free fake. It is a faithful copy +// of the unexported newETCD3Client in +// k8s.io/apiserver/pkg/storage/storagebackend/factory: only the transport +// (endpoints + TLS) determines the connection, which is identical across every +// project control plane and resource type. +var newSharedETCDClient = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) { + tlsInfo := transport.TLSInfo{ + CertFile: c.CertFile, + KeyFile: c.KeyFile, + TrustedCAFile: c.TrustedCAFile, + } + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return nil, err + } + if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.TrustedCAFile) == 0 { + tlsConfig = nil + } + networkContext := egressselector.Etcd.AsNetworkContext() + var egressDialer utilnet.DialFunc + if c.EgressLookup != nil { + egressDialer, err = c.EgressLookup(networkContext) + if err != nil { + return nil, err + } + } + dialOptions := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithChainUnaryInterceptor(grpcprom.UnaryClientInterceptor), + grpc.WithChainStreamInterceptor(grpcprom.StreamClientInterceptor), + } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { + tracingOpts := []otelgrpc.Option{ + otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents), + otelgrpc.WithPropagators(tracing.Propagators()), + otelgrpc.WithTracerProvider(c.TracerProvider), + } + dialOptions = append(dialOptions, + grpc.WithStatsHandler(otelgrpc.NewClientHandler(tracingOpts...))) + } + if egressDialer != nil { + dialer := func(ctx context.Context, addr string) (net.Conn, error) { + if strings.Contains(addr, "//") { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + addr = u.Host + } + return egressDialer(ctx, "tcp", addr) + } + dialOptions = append(dialOptions, grpc.WithContextDialer(dialer)) + } + + cfg := clientv3.Config{ + DialTimeout: dialTimeout, + DialKeepAliveTime: keepaliveTime, + DialKeepAliveTimeout: keepaliveTimeout, + DialOptions: dialOptions, + Endpoints: c.ServerList, + TLS: tlsConfig, + Logger: etcd3ClientLogger, + } + + return kubernetes.New(cfg) +} + +type runningClient struct { + client *kubernetes.Client + stopDBSizeMonitor func() + refs int +} + +var ( + clientsMu sync.Mutex + clients = map[string]*runningClient{} +) + +// transportKey derives the shared-client cache key. Only the transport +// (endpoints + TLS) determines the connection. +func transportKey(c storagebackend.TransportConfig) string { + return fmt.Sprintf("%v", c) +} + +// acquireClient returns a single shared etcd client per transport config. All +// project control planes and resource types that share the same transport reuse +// the same underlying gRPC connection; per-project isolation is enforced by the +// etcd key prefix at the store layer, not by the connection. The client's KV is +// wrapped once with the latency tracker (it is stateless and request-context +// scoped, so a single wrapper is safe and avoids compounding wrappers across +// thousands of resources) and a single DB-size monitor is started for it. The +// returned release func closes the client only when the last reference for the +// transport is released. +func acquireClient(c storagebackend.TransportConfig, dbMetricPollInterval time.Duration) (*kubernetes.Client, func(), error) { + clientsMu.Lock() + defer clientsMu.Unlock() + + key := transportKey(c) + rc, found := clients[key] + if !found { + client, err := newSharedETCDClient(c) + if err != nil { + return nil, nil, err + } + client.KV = etcd3.NewETCDLatencyTracker(client.KV) + + stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client.Client, dbMetricPollInterval) + if err != nil { + _ = client.Close() + return nil, nil, err + } + + rc = &runningClient{ + client: client, + stopDBSizeMonitor: stopDBSizeMonitor, + } + clients[key] = rc + } + + rc.refs++ + + return rc.client, func() { + clientsMu.Lock() + defer clientsMu.Unlock() + + rc := clients[key] + rc.refs-- + if rc.refs == 0 { + rc.stopDBSizeMonitor() + _ = rc.client.Close() + delete(clients, key) + } + }, nil +} + +type runningCompactor struct { + interval time.Duration + client *clientv3.Client + compactor etcd3.Compactor + cancel func() + refs int +} + +var ( + compactorsMu sync.Mutex + compactors = map[string]*runningCompactor{} +) + +// startCompactorOnce starts one compactor per transport, mirroring the +// refcounting semantics of the unexported startCompactorOnce in the upstream +// factory package. The compactor uses its own dedicated client (it must outlive +// individual stores and is never KV-wrapped). +func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (etcd3.Compactor, func(), error) { + compactorsMu.Lock() + defer compactorsMu.Unlock() + + if interval == 0 { + return nil, func() {}, nil + } + key := fmt.Sprintf("%v", c) + if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval { + client, err := newSharedETCDClient(c) + if err != nil { + return nil, nil, err + } + compactorClient := client.Client + + if foundBefore { + compactor.cancel() + } else { + compactor = &runningCompactor{} + compactors[key] = compactor + } + + compactor.interval = interval + compactor.client = compactorClient + cmp := etcd3.StartCompactorPerEndpoint(compactorClient, interval) + compactor.compactor = cmp + compactor.cancel = cmp.Stop + } + + compactors[key].refs++ + + return compactors[key].compactor, func() { + compactorsMu.Lock() + defer compactorsMu.Unlock() + + compactor := compactors[key] + compactor.refs-- + if compactor.refs == 0 { + compactor.cancel() + compactor.client.Close() + delete(compactors, key) + } + }, nil +} + +var ( + dbMetricsMonitorsMu sync.Mutex + dbMetricsMonitors = map[string]struct{}{} +) + +// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and +// update etcd_db_total_size_in_bytes per endpoint. Faithful copy of the +// upstream factory helper; deduped per endpoint within this package. +func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) { + if interval == 0 { + return func() {}, nil + } + dbMetricsMonitorsMu.Lock() + defer dbMetricsMonitorsMu.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + for _, ep := range client.Endpoints() { + if _, found := dbMetricsMonitors[ep]; found { + continue + } + dbMetricsMonitors[ep] = struct{}{} + endpoint := ep + klog.V(4).Infof("Start monitoring storage db size metric for endpoint %s with polling interval %v", endpoint, interval) + go wait.JitterUntilWithContext(ctx, func(context.Context) { + epStatus, err := client.Maintenance.Status(ctx, endpoint) + if err != nil { + klog.V(4).Infof("Failed to get storage db size for ep %s: %v", endpoint, err) + metrics.UpdateEtcdDbSize(endpoint, -1) + } else { + metrics.UpdateEtcdDbSize(endpoint, epStatus.DbSize) + } + }, interval, dbMetricsMonitorJitter, true) + } + + return func() { + cancel() + }, nil +} diff --git a/internal/apiserver/storage/etcdshared/client_test.go b/internal/apiserver/storage/etcdshared/client_test.go new file mode 100644 index 00000000..cccb06a3 --- /dev/null +++ b/internal/apiserver/storage/etcdshared/client_test.go @@ -0,0 +1,124 @@ +package etcdshared + +import ( + "context" + "testing" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" + + "k8s.io/apiserver/pkg/storage/storagebackend" +) + +func newFakeClient() *kubernetes.Client { + return &kubernetes.Client{Client: clientv3.NewCtxClient(context.Background())} +} + +func closed(c *kubernetes.Client) bool { + return c.Ctx().Err() != nil +} + +func withFakeClientConstructor(t *testing.T) *int { + t.Helper() + calls := 0 + orig := newSharedETCDClient + newSharedETCDClient = func(storagebackend.TransportConfig) (*kubernetes.Client, error) { + calls++ + return newFakeClient(), nil + } + t.Cleanup(func() { + newSharedETCDClient = orig + clientsMu.Lock() + clients = map[string]*runningClient{} + clientsMu.Unlock() + }) + return &calls +} + +func mkTransport(servers ...string) storagebackend.TransportConfig { + return storagebackend.TransportConfig{ServerList: servers} +} + +func TestAcquireClient_SharedAcrossProjects(t *testing.T) { + calls := withFakeClientConstructor(t) + tc := mkTransport("https://etcd:2379") + + c1, rel1, err := acquireClient(tc, 0) + if err != nil { + t.Fatalf("first acquire: %v", err) + } + c2, rel2, err := acquireClient(tc, 0) + if err != nil { + t.Fatalf("second acquire: %v", err) + } + + if *calls != 1 { + t.Fatalf("expected client dialed once, got %d", *calls) + } + if c1 != c2 { + t.Fatalf("expected same shared client pointer across acquisitions") + } + + // Releasing one project's storage must NOT close the client while another holds a ref. + rel1() + if closed(c1) { + t.Fatalf("client closed while a reference is still held") + } + + // Last release closes exactly once and drops the cache entry. + rel2() + if !closed(c1) { + t.Fatalf("client not closed after final release") + } + + clientsMu.Lock() + _, present := clients[transportKey(tc)] + clientsMu.Unlock() + if present { + t.Fatalf("cache entry not removed after final release") + } +} + +func TestAcquireClient_DistinctTransportsDoNotShare(t *testing.T) { + calls := withFakeClientConstructor(t) + + cA, relA, err := acquireClient(mkTransport("https://a:2379"), 0) + if err != nil { + t.Fatalf("acquire A: %v", err) + } + cB, relB, err := acquireClient(mkTransport("https://b:2379"), 0) + if err != nil { + t.Fatalf("acquire B: %v", err) + } + + if *calls != 2 { + t.Fatalf("expected two clients dialed, got %d", *calls) + } + if cA == cB { + t.Fatalf("distinct transports unexpectedly shared a client") + } + + relA() + relB() + if !closed(cA) || !closed(cB) { + t.Fatalf("clients not closed after release") + } +} + +func TestAcquireClient_ReClosedAfterFullReleaseCycle(t *testing.T) { + calls := withFakeClientConstructor(t) + tc := mkTransport("https://etcd:2379") + + _, rel1, _ := acquireClient(tc, 0) + rel1() + if *calls != 1 { + t.Fatalf("expected one dial, got %d", *calls) + } + + // After the entry was torn down, a fresh acquire dials a new client. + _, rel2, _ := acquireClient(tc, 0) + if *calls != 2 { + t.Fatalf("expected re-dial after teardown, got %d", *calls) + } + rel2() +} diff --git a/internal/apiserver/storage/etcdshared/decorator.go b/internal/apiserver/storage/etcdshared/decorator.go new file mode 100644 index 00000000..703020f0 --- /dev/null +++ b/internal/apiserver/storage/etcdshared/decorator.go @@ -0,0 +1,122 @@ +package etcdshared + +import ( + "sync" + + "k8s.io/klog/v2" + + "k8s.io/apimachinery/pkg/runtime" + genericfeatures "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/storage" + cacherstorage "k8s.io/apiserver/pkg/storage/cacher" + "k8s.io/apiserver/pkg/storage/etcd3" + "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/apiserver/pkg/storage/storagebackend/factory" + "k8s.io/apiserver/pkg/storage/value/encrypt/identity" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/tools/cache" +) + +// newRawStorage builds an etcd3 store backed by the transport-keyed shared +// client. It is a copy of the unexported newETCD3Storage in the upstream +// factory package with one change: the client is acquired from the refcounted +// shared cache instead of dialing a fresh connection per (project x resource). +func newRawStorage(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, factory.DestroyFunc, error) { + compactor, stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval) + if err != nil { + return nil, nil, err + } + + client, releaseClient, err := acquireClient(c.Transport, c.DBMetricPollInterval) + if err != nil { + stopCompactor() + return nil, nil, err + } + + transformer := c.Transformer + if transformer == nil { + transformer = identity.NewEncryptCheckTransformer() + } + + versioner := storage.APIObjectVersioner{} + decoder := etcd3.NewDefaultDecoder(c.Codec, versioner) + + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AllowUnsafeMalformedObjectDeletion) { + transformer = etcd3.WithCorruptObjErrorHandlingTransformer(transformer) + decoder = etcd3.WithCorruptObjErrorHandlingDecoder(decoder) + } + store, err := etcd3.New(client, compactor, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner) + if err != nil { + stopCompactor() + releaseClient() + return nil, nil, err + } + var once sync.Once + destroyFunc := func() { + once.Do(func() { + stopCompactor() + store.Close() + releaseClient() + }) + } + var st storage.Interface = store + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AllowUnsafeMalformedObjectDeletion) { + st = etcd3.NewStoreWithUnsafeCorruptObjectDeletion(st, c.GroupResource) + } + return st, destroyFunc, nil +} + +// StorageWithSharedCacher mirrors genericregistry.StorageWithCacher but builds +// the raw store from the transport-keyed shared etcd client. The cacher wrapping +// is identical to upstream and never touches the client. +func StorageWithSharedCacher() generic.StorageDecorator { + return func( + storageConfig *storagebackend.ConfigForResource, + resourcePrefix string, + keyFunc func(obj runtime.Object) (string, error), + newFunc func() runtime.Object, + newListFunc func() runtime.Object, + getAttrsFunc storage.AttrFunc, + triggerFuncs storage.IndexerFuncs, + indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { + + s, d, err := newRawStorage(*storageConfig, newFunc, newListFunc, resourcePrefix) + if err != nil { + return s, d, err + } + if klogV := klog.V(5); klogV.Enabled() { + klogV.InfoS("Storage caching is enabled (shared etcd client)", "type", newFunc()) + } + + cacherConfig := cacherstorage.Config{ + Storage: s, + Versioner: storage.APIObjectVersioner{}, + GroupResource: storageConfig.GroupResource, + EventsHistoryWindow: storageConfig.EventsHistoryWindow, + ResourcePrefix: resourcePrefix, + KeyFunc: keyFunc, + NewFunc: newFunc, + NewListFunc: newListFunc, + GetAttrsFunc: getAttrsFunc, + IndexerFuncs: triggerFuncs, + Indexers: indexers, + Codec: storageConfig.Codec, + } + cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig) + if err != nil { + return nil, func() {}, err + } + delegator := cacherstorage.NewCacheDelegator(cacher, s) + var once sync.Once + destroyFunc := func() { + once.Do(func() { + delegator.Stop() + cacher.Stop() + d() + }) + } + + return delegator, destroyFunc, nil + } +} diff --git a/internal/apiserver/storage/project/restoptions.go b/internal/apiserver/storage/project/restoptions.go index 17278259..3e9b2c04 100644 --- a/internal/apiserver/storage/project/restoptions.go +++ b/internal/apiserver/storage/project/restoptions.go @@ -6,7 +6,8 @@ import ( "k8s.io/client-go/rest" generic "k8s.io/apiserver/pkg/registry/generic" - genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" + + "go.miloapis.com/milo/internal/apiserver/storage/etcdshared" ) // Wrap the upstream RESTOptionsGetter to install a per-project decorator. @@ -38,7 +39,7 @@ func (g roGetter) GetRESTOptions(gr schema.GroupResource, example runtime.Object // Ensure we always wrap with our project-aware decorator. if opts.Decorator == nil { - opts.Decorator = ProjectAwareDecorator(gr, genericregistry.StorageWithCacher(), g.loopbackConfig) + opts.Decorator = ProjectAwareDecorator(gr, etcdshared.StorageWithSharedCacher(), g.loopbackConfig) } else { opts.Decorator = ProjectAwareDecorator(gr, opts.Decorator, g.loopbackConfig) } From 41e22748d00bc71edb0a0412c1a8494a60668868 Mon Sep 17 00:00:00 2001 From: Alex Savanovich <40720931+savme@users.noreply.github.com> Date: Fri, 26 Jun 2026 14:37:33 +0200 Subject: [PATCH 2/7] perf(apiserver): specific fields in transportKey --- internal/apiserver/storage/etcdshared/client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/apiserver/storage/etcdshared/client.go b/internal/apiserver/storage/etcdshared/client.go index 03baa796..e3b1ec56 100644 --- a/internal/apiserver/storage/etcdshared/client.go +++ b/internal/apiserver/storage/etcdshared/client.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "net/url" + "slices" "strings" "sync" "time" @@ -121,7 +122,8 @@ var ( // transportKey derives the shared-client cache key. Only the transport // (endpoints + TLS) determines the connection. func transportKey(c storagebackend.TransportConfig) string { - return fmt.Sprintf("%v", c) + endpoints := strings.Join(slices.Sorted(slices.Values(c.ServerList)), ",") + return fmt.Sprintf("%s|%s|%s|%s", endpoints, c.CertFile, c.KeyFile, c.TrustedCAFile) } // acquireClient returns a single shared etcd client per transport config. All From a777ab0159a5974a6e3410ec076cd649188aad28 Mon Sep 17 00:00:00 2001 From: Alex Savanovich <40720931+savme@users.noreply.github.com> Date: Fri, 26 Jun 2026 16:32:42 +0200 Subject: [PATCH 3/7] perf(apiserver): always use etcdshared decorator --- internal/apiserver/storage/project/restoptions.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/apiserver/storage/project/restoptions.go b/internal/apiserver/storage/project/restoptions.go index 3e9b2c04..bcbcd9f1 100644 --- a/internal/apiserver/storage/project/restoptions.go +++ b/internal/apiserver/storage/project/restoptions.go @@ -38,10 +38,6 @@ func (g roGetter) GetRESTOptions(gr schema.GroupResource, example runtime.Object } // Ensure we always wrap with our project-aware decorator. - if opts.Decorator == nil { - opts.Decorator = ProjectAwareDecorator(gr, etcdshared.StorageWithSharedCacher(), g.loopbackConfig) - } else { - opts.Decorator = ProjectAwareDecorator(gr, opts.Decorator, g.loopbackConfig) - } + opts.Decorator = ProjectAwareDecorator(gr, etcdshared.StorageWithSharedCacher(), g.loopbackConfig) return opts, nil } From 05d51677b462eb63092369a1a8fc99feeade1b2e Mon Sep 17 00:00:00 2001 From: Scot Wells Date: Fri, 26 Jun 2026 15:24:30 -0500 Subject: [PATCH 4/7] perf(apiserver): pool shared etcd clients per transport Sharing a single etcd client across every project control plane collapsed ~50k watch streams onto one client (~19.7k watchers multiplexed ~60 per stream). etcd delivery stayed healthy, but milo watch-cache p99 read-wait pegged at the 3s block-timeout and consistency-check errors climbed steadily, because per-cacher RequestWatchProgress fans out across every watcher on the shared client - O(N^2) progress amplification - so idle per-prefix caches never reach the global revision a consistent read demands. Streaming WatchList consumers (network-services-operator) then fail their initial cache sync and crashloop. Pool sharedClientPoolSize (32) connections per transport and assign stores round-robin. This cuts watchers-per-client ~32x, shrinking the progress fan-out per client by the same factor, while still collapsing the tens of thousands of per-(project x resource) connections the package replaced - the memory win is preserved (a few dozen connections, not one per resource). --- .../apiserver/storage/etcdshared/client.go | 68 +++++++++++++------ .../storage/etcdshared/client_test.go | 33 ++++----- 2 files changed, 66 insertions(+), 35 deletions(-) diff --git a/internal/apiserver/storage/etcdshared/client.go b/internal/apiserver/storage/etcdshared/client.go index e3b1ec56..c329bbb4 100644 --- a/internal/apiserver/storage/etcdshared/client.go +++ b/internal/apiserver/storage/etcdshared/client.go @@ -108,9 +108,24 @@ var newSharedETCDClient = func(c storagebackend.TransportConfig) (*kubernetes.Cl return kubernetes.New(cfg) } +// sharedClientPoolSize is the number of etcd connections opened per transport +// and round-robined across all project control planes and resource types. A +// single connection multiplexes every watch-cache watch over one gRPC watch +// stream; at our scale (~hundreds of projects x dozens of resources that is +// tens of thousands of watches) one stream becomes a head-of-line bottleneck +// and per-prefix caches fall progressively behind the global revision, which +// surfaces as "Too large resource version" / cache consistency failures and +// breaks streaming WatchList. Spreading watches across a small pool keeps the +// per-connection watch count in the range a normal apiserver handles while +// still collapsing the tens of thousands of per-(project x resource) +// connections this package replaced — the memory win is preserved (a few dozen +// connections, not one per resource). +const sharedClientPoolSize = 32 + type runningClient struct { - client *kubernetes.Client + clients []*kubernetes.Client stopDBSizeMonitor func() + next uint64 refs int } @@ -126,15 +141,15 @@ func transportKey(c storagebackend.TransportConfig) string { return fmt.Sprintf("%s|%s|%s|%s", endpoints, c.CertFile, c.KeyFile, c.TrustedCAFile) } -// acquireClient returns a single shared etcd client per transport config. All -// project control planes and resource types that share the same transport reuse -// the same underlying gRPC connection; per-project isolation is enforced by the -// etcd key prefix at the store layer, not by the connection. The client's KV is -// wrapped once with the latency tracker (it is stateless and request-context -// scoped, so a single wrapper is safe and avoids compounding wrappers across -// thousands of resources) and a single DB-size monitor is started for it. The -// returned release func closes the client only when the last reference for the -// transport is released. +// acquireClient returns one etcd client from a fixed-size pool shared per +// transport config. All project control planes and resource types that share +// the same transport reuse the same pool of underlying gRPC connections, +// assigned round-robin; per-project isolation is enforced by the etcd key +// prefix at the store layer, not by the connection. Each client's KV is wrapped +// once with the latency tracker (it is stateless and request-context scoped, so +// a single wrapper per client is safe) and a single DB-size monitor is started +// for the pool. The returned release func closes the pool only when the last +// reference for the transport is released. func acquireClient(c storagebackend.TransportConfig, dbMetricPollInterval time.Duration) (*kubernetes.Client, func(), error) { clientsMu.Lock() defer clientsMu.Unlock() @@ -142,28 +157,41 @@ func acquireClient(c storagebackend.TransportConfig, dbMetricPollInterval time.D key := transportKey(c) rc, found := clients[key] if !found { - client, err := newSharedETCDClient(c) - if err != nil { - return nil, nil, err + pool := make([]*kubernetes.Client, 0, sharedClientPoolSize) + for i := 0; i < sharedClientPoolSize; i++ { + client, err := newSharedETCDClient(c) + if err != nil { + for _, pc := range pool { + _ = pc.Close() + } + return nil, nil, err + } + client.KV = etcd3.NewETCDLatencyTracker(client.KV) + pool = append(pool, client) } - client.KV = etcd3.NewETCDLatencyTracker(client.KV) - stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client.Client, dbMetricPollInterval) + // The DB-size monitor dedups per endpoint internally, so one client + // from the pool is sufficient to drive it. + stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(pool[0].Client, dbMetricPollInterval) if err != nil { - _ = client.Close() + for _, pc := range pool { + _ = pc.Close() + } return nil, nil, err } rc = &runningClient{ - client: client, + clients: pool, stopDBSizeMonitor: stopDBSizeMonitor, } clients[key] = rc } rc.refs++ + client := rc.clients[rc.next%uint64(len(rc.clients))] + rc.next++ - return rc.client, func() { + return client, func() { clientsMu.Lock() defer clientsMu.Unlock() @@ -171,7 +199,9 @@ func acquireClient(c storagebackend.TransportConfig, dbMetricPollInterval time.D rc.refs-- if rc.refs == 0 { rc.stopDBSizeMonitor() - _ = rc.client.Close() + for _, pc := range rc.clients { + _ = pc.Close() + } delete(clients, key) } }, nil diff --git a/internal/apiserver/storage/etcdshared/client_test.go b/internal/apiserver/storage/etcdshared/client_test.go index cccb06a3..4ffad253 100644 --- a/internal/apiserver/storage/etcdshared/client_test.go +++ b/internal/apiserver/storage/etcdshared/client_test.go @@ -52,23 +52,24 @@ func TestAcquireClient_SharedAcrossProjects(t *testing.T) { t.Fatalf("second acquire: %v", err) } - if *calls != 1 { - t.Fatalf("expected client dialed once, got %d", *calls) + // The pool for a transport is dialed once, lazily, on first acquire. + if *calls != sharedClientPoolSize { + t.Fatalf("expected pool dialed once (%d clients), got %d", sharedClientPoolSize, *calls) } - if c1 != c2 { - t.Fatalf("expected same shared client pointer across acquisitions") + if c1 == nil || c2 == nil { + t.Fatalf("expected non-nil clients from the pool") } - // Releasing one project's storage must NOT close the client while another holds a ref. + // Releasing one project's storage must NOT close the pool while another holds a ref. rel1() if closed(c1) { - t.Fatalf("client closed while a reference is still held") + t.Fatalf("pool closed while a reference is still held") } - // Last release closes exactly once and drops the cache entry. + // Last release closes the pool and drops the cache entry. rel2() - if !closed(c1) { - t.Fatalf("client not closed after final release") + if !closed(c1) || !closed(c2) { + t.Fatalf("pool not closed after final release") } clientsMu.Lock() @@ -91,8 +92,8 @@ func TestAcquireClient_DistinctTransportsDoNotShare(t *testing.T) { t.Fatalf("acquire B: %v", err) } - if *calls != 2 { - t.Fatalf("expected two clients dialed, got %d", *calls) + if *calls != 2*sharedClientPoolSize { + t.Fatalf("expected two pools dialed (%d clients), got %d", 2*sharedClientPoolSize, *calls) } if cA == cB { t.Fatalf("distinct transports unexpectedly shared a client") @@ -111,14 +112,14 @@ func TestAcquireClient_ReClosedAfterFullReleaseCycle(t *testing.T) { _, rel1, _ := acquireClient(tc, 0) rel1() - if *calls != 1 { - t.Fatalf("expected one dial, got %d", *calls) + if *calls != sharedClientPoolSize { + t.Fatalf("expected one pool dialed (%d clients), got %d", sharedClientPoolSize, *calls) } - // After the entry was torn down, a fresh acquire dials a new client. + // After the entry was torn down, a fresh acquire dials a new pool. _, rel2, _ := acquireClient(tc, 0) - if *calls != 2 { - t.Fatalf("expected re-dial after teardown, got %d", *calls) + if *calls != 2*sharedClientPoolSize { + t.Fatalf("expected re-dial after teardown (%d clients), got %d", 2*sharedClientPoolSize, *calls) } rel2() } From d7ee21f4a40a465cabc380553044543cb4642f34 Mon Sep 17 00:00:00 2001 From: Scot Wells Date: Sat, 27 Jun 2026 10:48:32 -0500 Subject: [PATCH 5/7] perf(apiserver): make shared etcd client pool size configurable Replace the hardcoded sharedClientPoolSize const with a --shared-etcd-client-pool-size flag (default 32, min 1) so the per-transport pool can be tuned per environment without rebuilding the image. The parsed value is pushed into the etcdshared package once at apiserver startup, before any storage is built. Key changes: - Add SetSharedClientPoolSize to the etcdshared package; sharedClientPoolSize becomes a package var clamped to a floor of 1 - Register --shared-etcd-client-pool-size and apply it at the top of Run --- cmd/milo/apiserver/server.go | 6 ++++++ internal/apiserver/storage/etcdshared/client.go | 17 +++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/cmd/milo/apiserver/server.go b/cmd/milo/apiserver/server.go index c17bea35..c24c48e4 100644 --- a/cmd/milo/apiserver/server.go +++ b/cmd/milo/apiserver/server.go @@ -11,6 +11,7 @@ import ( "github.com/spf13/cobra" crd "go.miloapis.com/milo/config/crd" "go.miloapis.com/milo/internal/apiserver/admission/plugin/namespace/lifecycle" + "go.miloapis.com/milo/internal/apiserver/storage/etcdshared" projectstorage "go.miloapis.com/milo/internal/apiserver/storage/project" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -79,6 +80,7 @@ var ( eventsProviderTimeoutSeconds int eventsProviderRetries int eventsForwardExtras []string + sharedETCDClientPoolSize int ) // NewCommand creates a *cobra.Command object with default parameters @@ -199,6 +201,7 @@ func NewCommand() *cobra.Command { fs.IntVar(&eventsProviderTimeoutSeconds, "events-provider-timeout", 30, "Activity provider request timeout in seconds") fs.IntVar(&eventsProviderRetries, "events-provider-retries", 3, "Activity provider request retries") fs.StringSliceVar(&eventsForwardExtras, "events-forward-extras", []string{"iam.miloapis.com/parent-api-group", "iam.miloapis.com/parent-type", "iam.miloapis.com/parent-name"}, "User extras keys to forward to Activity for events") + fs.IntVar(&sharedETCDClientPoolSize, "shared-etcd-client-pool-size", 32, "Number of etcd client connections opened per transport and round-robined across all project control plane watch caches. Higher values spread watch progress traffic across more gRPC streams at the cost of more connections. Minimum 1.") cols, _, _ := term.TerminalSize(cmd.OutOrStdout()) cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols) @@ -240,6 +243,9 @@ func Run(ctx context.Context, opts options.CompletedOptions) error { klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK")) + etcdshared.SetSharedClientPoolSize(sharedETCDClientPoolSize) + klog.InfoS("Shared etcd client pool configured", "size", sharedETCDClientPoolSize) + config, err := NewConfig(opts) if err != nil { return err diff --git a/internal/apiserver/storage/etcdshared/client.go b/internal/apiserver/storage/etcdshared/client.go index c329bbb4..00b4ee5b 100644 --- a/internal/apiserver/storage/etcdshared/client.go +++ b/internal/apiserver/storage/etcdshared/client.go @@ -119,8 +119,21 @@ var newSharedETCDClient = func(c storagebackend.TransportConfig) (*kubernetes.Cl // per-connection watch count in the range a normal apiserver handles while // still collapsing the tens of thousands of per-(project x resource) // connections this package replaced — the memory win is preserved (a few dozen -// connections, not one per resource). -const sharedClientPoolSize = 32 +// connections, not one per resource). It is tunable via +// --shared-etcd-client-pool-size; SetSharedClientPoolSize must be called once at +// startup before any storage is built. +var sharedClientPoolSize = 32 + +// SetSharedClientPoolSize overrides the per-transport pool size. It is intended +// to be called exactly once during apiserver startup, before the first storage +// (and therefore the first acquireClient) is created, so no synchronization is +// needed against acquireClient. Values below 1 are clamped to 1. +func SetSharedClientPoolSize(n int) { + if n < 1 { + n = 1 + } + sharedClientPoolSize = n +} type runningClient struct { clients []*kubernetes.Client From f336b60dc11d6352d66bdbfa7497d42220b2e663 Mon Sep 17 00:00:00 2001 From: Scot Wells Date: Sat, 27 Jun 2026 10:49:37 -0500 Subject: [PATCH 6/7] chore(apiserver): expose shared etcd client pool size via deployment env Map the new --shared-etcd-client-pool-size flag to a SHARED_ETCD_CLIENT_POOL_SIZE env var (default 32) in the apiserver deployment so the pool can be tuned per environment without rebuilding. --- config/apiserver/deployment.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/config/apiserver/deployment.yaml b/config/apiserver/deployment.yaml index a42c0936..2da9b290 100644 --- a/config/apiserver/deployment.yaml +++ b/config/apiserver/deployment.yaml @@ -84,12 +84,19 @@ spec: - --events-provider-retries=$(EVENTS_PROVIDER_RETRIES) - --events-forward-extras=$(EVENTS_FORWARD_EXTRAS) - --shutdown-delay-duration=$(SHUTDOWN_DELAY_DURATION) + # Number of pooled etcd connections per transport for the shared + # watch-cache client; tune per environment via SHARED_ETCD_CLIENT_POOL_SIZE + - --shared-etcd-client-pool-size=$(SHARED_ETCD_CLIENT_POOL_SIZE) env: # Feature gates configuration # Sessions and UserIdentities are GA (enabled by default) # Add EventsProxy=true to enable Events proxy to Activity service - name: FEATURE_GATES value: "" + # Pooled etcd connections per transport for the shared watch-cache + # client. Override per environment to tune watch progress spread. + - name: SHARED_ETCD_CLIENT_POOL_SIZE + value: "32" - name: LOG_LEVEL value: "4" - name: LOGGING_FORMAT From 1790fe5346736e5909d938cfd7d07b9e50085147 Mon Sep 17 00:00:00 2001 From: Alex Savanovich <40720931+savme@users.noreply.github.com> Date: Mon, 29 Jun 2026 12:28:11 +0200 Subject: [PATCH 7/7] chore(apiserver): ensure watch gRPC streams are multiplexed --- .../apiserver/storage/etcdshared/decorator.go | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/internal/apiserver/storage/etcdshared/decorator.go b/internal/apiserver/storage/etcdshared/decorator.go index 703020f0..79c1a68e 100644 --- a/internal/apiserver/storage/etcdshared/decorator.go +++ b/internal/apiserver/storage/etcdshared/decorator.go @@ -3,6 +3,8 @@ package etcdshared import ( "sync" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/runtime" @@ -11,6 +13,7 @@ import ( "k8s.io/apiserver/pkg/storage" cacherstorage "k8s.io/apiserver/pkg/storage/cacher" "k8s.io/apiserver/pkg/storage/etcd3" + etcdfeature "k8s.io/apiserver/pkg/storage/feature" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend/factory" "k8s.io/apiserver/pkg/storage/value/encrypt/identity" @@ -34,6 +37,29 @@ func newRawStorage(c storagebackend.ConfigForResource, newFunc, newListFunc func return nil, nil, err } + // etcd3.New calls DefaultFeatureSupportChecker.CheckClient using the client we + // pass it. The per-store client below has no endpoints (NewCtxClient), so the + // check would be a no-op. Call it explicitly for the pool client here instead. + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ConsistentListFromCache) || + utilfeature.DefaultFeatureGate.Enabled(genericfeatures.WatchList) { + etcdfeature.DefaultFeatureSupportChecker.CheckClient( + client.Ctx(), client.Client, storage.RequestWatchProgress) + } + + // Each store gets its own gRPC watch stream over the shared TCP connection so + // that watch creations don't serialize across stores sharing the same pool slot. + // KV, Lease, and Maintenance still use the shared pool client. + // + // NewCtxClient allocates a fresh *clientv3.Client with its own mutexes — no + // mutex-bearing struct copy. We then graft only the interface fields we need. + perStoreBase := clientv3.NewCtxClient(client.Ctx()) + perStoreBase.KV = client.KV + perStoreBase.Lease = client.Client.Lease + perStoreBase.Maintenance = client.Client.Maintenance + perStoreBase.Watcher = clientv3.NewWatcher(client.Client) + perStoreClient := &kubernetes.Client{Client: perStoreBase} + perStoreClient.Kubernetes = perStoreClient + transformer := c.Transformer if transformer == nil { transformer = identity.NewEncryptCheckTransformer() @@ -46,9 +72,10 @@ func newRawStorage(c storagebackend.ConfigForResource, newFunc, newListFunc func transformer = etcd3.WithCorruptObjErrorHandlingTransformer(transformer) decoder = etcd3.WithCorruptObjErrorHandlingDecoder(decoder) } - store, err := etcd3.New(client, compactor, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner) + store, err := etcd3.New(perStoreClient, compactor, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner) if err != nil { stopCompactor() + perStoreBase.Watcher.Close() releaseClient() return nil, nil, err } @@ -57,6 +84,7 @@ func newRawStorage(c storagebackend.ConfigForResource, newFunc, newListFunc func once.Do(func() { stopCompactor() store.Close() + perStoreBase.Watcher.Close() releaseClient() }) }