From 944546b3b7393109845871f86e9cb1816cdee18d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 22 Jun 2026 21:37:20 -0700 Subject: [PATCH] perf(events): use h2c multiplexing client for internal event hops The executor->events-proxy and events-proxy->run-service hops were built with http.DefaultClient (HTTP/1.1, MaxIdleConnsPerHost=2), which churns short-lived connections under concurrent reconciles. Add app.InternalHTTPClient() (native h2c via http.Transport.Protocols) and use it on both hops so concurrent RPCs multiplex over a stable connection instead of re-dialing. Bench: 12x fewer connection dials (2443 -> 200) at equal throughput; tighter p99. Signed-off-by: Kevin Su --- events/setup.go | 3 +-- executor/setup.go | 3 +-- flytestdlib/app/app.go | 24 ++++++++++++++++++++++++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/events/setup.go b/events/setup.go index e51663e0e0..db6c0ddc3e 100644 --- a/events/setup.go +++ b/events/setup.go @@ -3,7 +3,6 @@ package events import ( "context" "fmt" - "net/http" "connectrpc.com/connect" "connectrpc.com/otelconnect" @@ -38,7 +37,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { if sc.BaseURL != "" { runServiceURL = sc.BaseURL } - runClient := workflowconnect.NewInternalRunServiceClient(http.DefaultClient, runServiceURL, connect.WithInterceptors(otelInterceptor)) + runClient := workflowconnect.NewInternalRunServiceClient(app.InternalHTTPClient(), runServiceURL, connect.WithInterceptors(otelInterceptor)) eventsSvc := service.NewEventsProxyService(runClient) diff --git a/executor/setup.go b/executor/setup.go index 87a982f19c..3e9fb6d53b 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" "fmt" - "net/http" "os" "connectrpc.com/connect" @@ -155,7 +154,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { if eventsServiceURL == "" { eventsServiceURL = cfg.EventsServiceURL } - eventsClient := workflowconnect.NewEventsProxyServiceClient(http.DefaultClient, eventsServiceURL, connect.WithInterceptors(otelInterceptor)) + eventsClient := workflowconnect.NewEventsProxyServiceClient(app.InternalHTTPClient(), eventsServiceURL, connect.WithInterceptors(otelInterceptor)) catalogCfg := catalog.GetConfig() cacheServiceURL := sc.BaseURL if cacheServiceURL == "" { diff --git a/flytestdlib/app/app.go b/flytestdlib/app/app.go index 49dccb350b..4bb1bdda8d 100644 --- a/flytestdlib/app/app.go +++ b/flytestdlib/app/app.go @@ -203,6 +203,30 @@ func httpProtocols() *http.Protocols { return protocols } +// InternalHTTPClient returns an *http.Client tuned for service-to-service +// connect-rpc calls within the cluster (e.g. executor -> events-proxy -> +// run-service). +// +// http.DefaultClient uses HTTP/1.1 with MaxIdleConnsPerHost=2, so under +// concurrent load requests serialize on two connections and pay a fresh TCP +// handshake per call -- which dominates p99 send latency and caps throughput. +// This client speaks unencrypted HTTP/2 (h2c), so all concurrent RPCs multiplex +// over a single connection. The internal servers advertise h2c via +// httpProtocols(); these hops are plaintext http:// so we force cleartext h2 +// with prior knowledge. The generous idle pool only matters on an h1 fallback. +func InternalHTTPClient() *http.Client { + protocols := &http.Protocols{} + protocols.SetUnencryptedHTTP2(true) + return &http.Client{ + Transport: &http.Transport{ + Protocols: protocols, + MaxIdleConns: 256, + MaxIdleConnsPerHost: 256, + IdleConnTimeout: 90 * time.Second, + }, + } +} + // requestGzipDecompressMiddleware pre-decompresses request bodies that carry // Content-Encoding: gzip before they reach the connect-rpc handler. //