diff --git a/events/setup.go b/events/setup.go index e51663e0e0..db6c0ddc3e 100644 --- a/events/setup.go +++ b/events/setup.go @@ -3,7 +3,6 @@ package events import ( "context" "fmt" - "net/http" "connectrpc.com/connect" "connectrpc.com/otelconnect" @@ -38,7 +37,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { if sc.BaseURL != "" { runServiceURL = sc.BaseURL } - runClient := workflowconnect.NewInternalRunServiceClient(http.DefaultClient, runServiceURL, connect.WithInterceptors(otelInterceptor)) + runClient := workflowconnect.NewInternalRunServiceClient(app.InternalHTTPClient(), runServiceURL, connect.WithInterceptors(otelInterceptor)) eventsSvc := service.NewEventsProxyService(runClient) diff --git a/executor/setup.go b/executor/setup.go index 87a982f19c..3e9fb6d53b 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" "fmt" - "net/http" "os" "connectrpc.com/connect" @@ -155,7 +154,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { if eventsServiceURL == "" { eventsServiceURL = cfg.EventsServiceURL } - eventsClient := workflowconnect.NewEventsProxyServiceClient(http.DefaultClient, eventsServiceURL, connect.WithInterceptors(otelInterceptor)) + eventsClient := workflowconnect.NewEventsProxyServiceClient(app.InternalHTTPClient(), eventsServiceURL, connect.WithInterceptors(otelInterceptor)) catalogCfg := catalog.GetConfig() cacheServiceURL := sc.BaseURL if cacheServiceURL == "" { diff --git a/flytestdlib/app/app.go b/flytestdlib/app/app.go index 49dccb350b..4bb1bdda8d 100644 --- a/flytestdlib/app/app.go +++ b/flytestdlib/app/app.go @@ -203,6 +203,30 @@ func httpProtocols() *http.Protocols { return protocols } +// InternalHTTPClient returns an *http.Client tuned for service-to-service +// connect-rpc calls within the cluster (e.g. executor -> events-proxy -> +// run-service). +// +// http.DefaultClient uses HTTP/1.1 with MaxIdleConnsPerHost=2, so under +// concurrent load requests serialize on two connections and pay a fresh TCP +// handshake per call -- which dominates p99 send latency and caps throughput. +// This client speaks unencrypted HTTP/2 (h2c), so all concurrent RPCs multiplex +// over a single connection. The internal servers advertise h2c via +// httpProtocols(); these hops are plaintext http:// so we force cleartext h2 +// with prior knowledge. The generous idle pool only matters on an h1 fallback. +func InternalHTTPClient() *http.Client { + protocols := &http.Protocols{} + protocols.SetUnencryptedHTTP2(true) + return &http.Client{ + Transport: &http.Transport{ + Protocols: protocols, + MaxIdleConns: 256, + MaxIdleConnsPerHost: 256, + IdleConnTimeout: 90 * time.Second, + }, + } +} + // requestGzipDecompressMiddleware pre-decompresses request bodies that carry // Content-Encoding: gzip before they reach the connect-rpc handler. //