Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions events/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package events
import (
"context"
"fmt"
"net/http"

"connectrpc.com/connect"
"connectrpc.com/otelconnect"
Expand Down Expand Up @@ -38,7 +37,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
if sc.BaseURL != "" {
runServiceURL = sc.BaseURL
}
runClient := workflowconnect.NewInternalRunServiceClient(http.DefaultClient, runServiceURL, connect.WithInterceptors(otelInterceptor))
runClient := workflowconnect.NewInternalRunServiceClient(app.InternalHTTPClient(), runServiceURL, connect.WithInterceptors(otelInterceptor))

eventsSvc := service.NewEventsProxyService(runClient)

Expand Down
3 changes: 1 addition & 2 deletions executor/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"fmt"
"net/http"
"os"

"connectrpc.com/connect"
Expand Down Expand Up @@ -155,7 +154,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
if eventsServiceURL == "" {
eventsServiceURL = cfg.EventsServiceURL
}
eventsClient := workflowconnect.NewEventsProxyServiceClient(http.DefaultClient, eventsServiceURL, connect.WithInterceptors(otelInterceptor))
eventsClient := workflowconnect.NewEventsProxyServiceClient(app.InternalHTTPClient(), eventsServiceURL, connect.WithInterceptors(otelInterceptor))
catalogCfg := catalog.GetConfig()
cacheServiceURL := sc.BaseURL
if cacheServiceURL == "" {
Expand Down
24 changes: 24 additions & 0 deletions flytestdlib/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,30 @@ func httpProtocols() *http.Protocols {
return protocols
}

// InternalHTTPClient returns an *http.Client tuned for service-to-service
// connect-rpc calls within the cluster (e.g. executor -> events-proxy ->
// run-service).
//
// http.DefaultClient uses HTTP/1.1 with MaxIdleConnsPerHost=2, so under
// concurrent load requests serialize on two connections and pay a fresh TCP
// handshake per call -- which dominates p99 send latency and caps throughput.
// This client speaks unencrypted HTTP/2 (h2c), so all concurrent RPCs multiplex
// over a single connection. The internal servers advertise h2c via
// httpProtocols(); these hops are plaintext http:// so we force cleartext h2
// with prior knowledge. The generous idle pool only matters on an h1 fallback.
Comment on lines +210 to +216
func InternalHTTPClient() *http.Client {
protocols := &http.Protocols{}
protocols.SetUnencryptedHTTP2(true)
return &http.Client{
Transport: &http.Transport{
Protocols: protocols,
MaxIdleConns: 256,
MaxIdleConnsPerHost: 256,
IdleConnTimeout: 90 * time.Second,
},
}
}
Comment on lines +217 to +228

// requestGzipDecompressMiddleware pre-decompresses request bodies that carry
// Content-Encoding: gzip before they reach the connect-rpc handler.
//
Expand Down
Loading