diff --git a/proxyd/backend.go b/proxyd/backend.go index b1d035b4..7e6b8a75 100644 --- a/proxyd/backend.go +++ b/proxyd/backend.go @@ -311,6 +311,7 @@ type Backend struct { maxWSConns int outOfServiceInterval time.Duration stripTrailingXFF bool + ingressRPC string proxydIP string skipIsSyncingCheck bool diff --git a/proxyd/backend_test.go b/proxyd/backend_test.go index e8ed9d71..479cb797 100644 --- a/proxyd/backend_test.go +++ b/proxyd/backend_test.go @@ -2,6 +2,7 @@ package proxyd import ( "context" + "io" "net/http" "net/http/httptest" "strings" @@ -211,6 +212,75 @@ func TestAllowedStatusCodes(t *testing.T) { require.Equal(t, http.StatusServiceUnavailable, res[0].Error.HTTPErrorCode) } +func TestIngressForwarding(t *testing.T) { + backendRequests := make(chan []byte, 10) + ingressRequests := make(chan []byte, 10) + + // Mock backend server + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + backendRequests <- body + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"jsonrpc":"2.0","result":"0x1234","id":1}`)) + })) + defer backendServer.Close() + + // Mock ingress server + ingressServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + ingressRequests <- body + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"jsonrpc":"2.0","result":"ok","id":1}`)) + })) + defer ingressServer.Close() + + // Create backend with ingress RPC configured + backend := NewBackend( + "test-backend", + backendServer.URL, + "", + semaphore.NewWeighted(10), + WithIngressRPC(ingressServer.URL), + ) + + // Create a test RPC request + rpcReq := &RPCReq{ + JSONRPC: "2.0", + Method: "eth_blockNumber", + Params: []byte(`[]`), + ID: []byte(`1`), + } + + // Forward the request + ctx := context.Background() + res, err := backend.Forward(ctx, []*RPCReq{rpcReq}, false) + + // Verify the backend request was successful + require.NoError(t, err) + require.Len(t, res, 1) + require.False(t, res[0].IsError()) + + // Verify both backend and ingress received the request + select { + case backendBody := <-backendRequests: + require.Contains(t, string(backendBody), "eth_blockNumber") + require.Contains(t, string(backendBody), `"id":1`) + case <-time.After(100 * time.Millisecond): + t.Fatal("Backend did not receive request") + } + + // Give a bit more time for the async ingress request to complete + select { + case ingressBody := <-ingressRequests: + require.Contains(t, string(ingressBody), "eth_blockNumber") + require.Contains(t, string(ingressBody), `"id":1`) + case <-time.After(500 * time.Millisecond): + t.Fatal("Ingress did not receive request") + } +} + func getHttpResponseCodeCount(statusCode string) float64 { metricFamilies, err := prometheus.DefaultGatherer.Gather() if err != nil { diff --git a/proxyd/config.go b/proxyd/config.go index 6d09d9e1..7b07dc24 100644 --- a/proxyd/config.go +++ b/proxyd/config.go @@ -249,6 +249,7 @@ type Config struct { SenderRateLimit SenderRateLimitConfig `toml:"sender_rate_limit"` InteropValidationConfig InteropValidationConfig `toml:"interop_validation"` TxValidationMiddlewareConfig TxValidationMiddlewareConfig `toml:"tx_validation_middleware"` + IngressRPC string `toml:"ingress_rpc"` } type InteropValidationConfig struct { diff --git a/proxyd/metrics.go b/proxyd/metrics.go index 0400c574..9a029b3d 100644 --- a/proxyd/metrics.go +++ b/proxyd/metrics.go @@ -480,6 +480,12 @@ var ( "backend_name", }) + ingressRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "tips_ingress_requests_total", + Help: "Count of total requests forwarded to TIPS ingress service.", + }) + backendProbeChecksTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: MetricsNamespace, Name: "backend_probe_checks_total", @@ -497,6 +503,13 @@ var ( }, []string{ "backend_name", }) + + ingressRequestDurationSum = promauto.NewSummary(prometheus.SummaryOpts{ + Namespace: MetricsNamespace, + Name: "tips_ingress_request_duration_seconds", + Help: "Summary of TIPS ingress request response times broken down by backend.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) ) func RecordRedisError(source string) { @@ -686,6 +699,14 @@ func RecordBackendProbeDuration(backendName string, duration time.Duration) { backendProbeDurationHist.WithLabelValues(backendName).Observe(duration.Seconds()) } +func RecordIngressRequest() { + ingressRequestsTotal.Inc() +} + +func RecordIngressRequestDuration(duration time.Duration) { + ingressRequestDurationSum.Observe(duration.Seconds()) +} + func boolToFloat64(b bool) float64 { if b { return 1 diff --git a/proxyd/proxyd.go b/proxyd/proxyd.go index b3d23417..944d242e 100644 --- a/proxyd/proxyd.go +++ b/proxyd/proxyd.go @@ -508,6 +508,7 @@ func Start(config *Config) (*Server, func(), error) { apiKeys, config.TxValidationMiddlewareConfig, time.Duration(config.Server.GracefulShutdownSeconds)*time.Second, + config.IngressRPC, ) if err != nil { return nil, nil, fmt.Errorf("error creating server: %w", err) diff --git a/proxyd/server.go b/proxyd/server.go index 9c951381..4217aae9 100644 --- a/proxyd/server.go +++ b/proxyd/server.go @@ -1,6 +1,7 @@ package proxyd import ( + "bytes" "context" "crypto/rand" "crypto/subtle" @@ -90,6 +91,8 @@ type Server struct { interopValidatingConfig InteropValidationConfig interopStrategy InteropStrategy publicAccess bool + ingressRpc string + ingressRpcClient *http.Client enableTxHashLogging bool enableTxValidation bool @@ -131,6 +134,7 @@ func NewServer( limExemptKeys []string, txValidationConfig TxValidationMiddlewareConfig, gracefulShutdownDuration time.Duration, + ingressRpc string, ) (*Server, error) { if cache == nil { cache = &NoopRPCCache{} @@ -259,6 +263,13 @@ func NewServer( txValidationClient: txValidationClient, txValidationFailOpen: txValidationFailOpen, gracefulShutdownDuration: gracefulShutdownDuration, + ingressRpc: ingressRpc, + ingressRpcClient: &http.Client{ + Timeout: defaultRPCTimeout, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + }, }, nil } @@ -646,7 +657,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL // Apply a sender-based rate limit if it is enabled. Note that sender-based rate // limits apply regardless of origin or user-agent. As such, they don't use the // isLimited method. - if parsedReq.Method == "eth_sendRawTransaction" || parsedReq.Method == "eth_sendRawTransactionConditional" { + if parsedReq.Method == "eth_sendRawTransaction" || parsedReq.Method == "eth_sendRawTransactionConditional" || parsedReq.Method == "eth_sendRawTransactionSync" { tx, err := s.convertSendReqToSendTx(ctx, parsedReq) if err != nil { RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err) @@ -663,6 +674,31 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL responses[i] = NewRPCErrorRes(parsedReq.ID, err) continue } + + // Send async copy to ingress service, don't wait for error handling + if s.ingressRpc != "" { + body, err := json.Marshal(parsedReq) + if err != nil { + log.Error("unable to marshal JSON RPC request", "source", "rpc", "err", err) + } else { + go func() { + RecordIngressRequest() + + ingressStart := time.Now() + req, err := http.NewRequest(http.MethodPost, s.ingressRpc, bytes.NewBuffer(body)) + req.Header.Set("Content-Type", "application/json") + + resp, err := s.ingressRpcClient.Do(req) + if err != nil { + log.Warn("failed to proxy to ingress rpc", "err", err) + return + } + + defer resp.Body.Close() + RecordIngressRequestDuration(time.Since(ingressStart)) + }() + } + } } // Apply transaction validation middleware if enabled and method is configured. @@ -907,15 +943,19 @@ func (s *Server) convertSendReqToSendTx(ctx context.Context, req *RPCReq) (*type log.Debug("raw transaction conditional request has invalid number of params", "req_id", GetReqID(ctx)) // The error below is identical to the one Geth responds with. return nil, ErrInvalidParams("missing value for required argument 0 or 1") + } else if req.Method == "eth_sendRawTransactionSync" && (len(params) == 0 || len(params) > 2) { + log.Debug("raw transaction sync request has invalid number of params", "req_id", GetReqID(ctx)) + // The error below is identical to the one Geth responds with. + return nil, ErrInvalidParams("missing value for required argument 0") } - address, ok := params[0].(string) + signedTransaction, ok := params[0].(string) if !ok { return nil, ErrParseErr } var data hexutil.Bytes - if err := data.UnmarshalText([]byte(address)); err != nil { + if err := data.UnmarshalText([]byte(signedTransaction)); err != nil { log.Debug("error decoding raw tx data", "err", err, "req_id", GetReqID(ctx)) // Geth returns the raw error from UnmarshalText. return nil, ErrInvalidParams(err.Error())