Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ type Backend struct {
maxWSConns int
outOfServiceInterval time.Duration
stripTrailingXFF bool
ingressRPC string
proxydIP string

skipIsSyncingCheck bool
Expand Down
70 changes: 70 additions & 0 deletions proxyd/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proxyd

import (
"context"
"io"
"net/http"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -211,6 +212,75 @@ func TestAllowedStatusCodes(t *testing.T) {
require.Equal(t, http.StatusServiceUnavailable, res[0].Error.HTTPErrorCode)
}

func TestIngressForwarding(t *testing.T) {
backendRequests := make(chan []byte, 10)
ingressRequests := make(chan []byte, 10)

// Mock backend server
backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
backendRequests <- body
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","result":"0x1234","id":1}`))
}))
defer backendServer.Close()

// Mock ingress server
ingressServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
ingressRequests <- body
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","result":"ok","id":1}`))
}))
defer ingressServer.Close()

// Create backend with ingress RPC configured
backend := NewBackend(
"test-backend",
backendServer.URL,
"",
semaphore.NewWeighted(10),
WithIngressRPC(ingressServer.URL),
)

// Create a test RPC request
rpcReq := &RPCReq{
JSONRPC: "2.0",
Method: "eth_blockNumber",
Params: []byte(`[]`),
ID: []byte(`1`),
}

// Forward the request
ctx := context.Background()
res, err := backend.Forward(ctx, []*RPCReq{rpcReq}, false)

// Verify the backend request was successful
require.NoError(t, err)
require.Len(t, res, 1)
require.False(t, res[0].IsError())

// Verify both backend and ingress received the request
select {
case backendBody := <-backendRequests:
require.Contains(t, string(backendBody), "eth_blockNumber")
require.Contains(t, string(backendBody), `"id":1`)
case <-time.After(100 * time.Millisecond):
t.Fatal("Backend did not receive request")
}

// Give a bit more time for the async ingress request to complete
select {
case ingressBody := <-ingressRequests:
require.Contains(t, string(ingressBody), "eth_blockNumber")
require.Contains(t, string(ingressBody), `"id":1`)
case <-time.After(500 * time.Millisecond):
t.Fatal("Ingress did not receive request")
}
}

func getHttpResponseCodeCount(statusCode string) float64 {
metricFamilies, err := prometheus.DefaultGatherer.Gather()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ type Config struct {
SenderRateLimit SenderRateLimitConfig `toml:"sender_rate_limit"`
InteropValidationConfig InteropValidationConfig `toml:"interop_validation"`
TxValidationMiddlewareConfig TxValidationMiddlewareConfig `toml:"tx_validation_middleware"`
IngressRPC string `toml:"ingress_rpc"`
}

type InteropValidationConfig struct {
Expand Down
21 changes: 21 additions & 0 deletions proxyd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,12 @@ var (
"backend_name",
})

ingressRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "tips_ingress_requests_total",
Help: "Count of total requests forwarded to TIPS ingress service.",
})

backendProbeChecksTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "backend_probe_checks_total",
Expand All @@ -497,6 +503,13 @@ var (
}, []string{
"backend_name",
})

ingressRequestDurationSum = promauto.NewSummary(prometheus.SummaryOpts{
Namespace: MetricsNamespace,
Name: "tips_ingress_request_duration_seconds",
Help: "Summary of TIPS ingress request response times broken down by backend.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
})
)

func RecordRedisError(source string) {
Expand Down Expand Up @@ -686,6 +699,14 @@ func RecordBackendProbeDuration(backendName string, duration time.Duration) {
backendProbeDurationHist.WithLabelValues(backendName).Observe(duration.Seconds())
}

func RecordIngressRequest() {
ingressRequestsTotal.Inc()
}

func RecordIngressRequestDuration(duration time.Duration) {
ingressRequestDurationSum.Observe(duration.Seconds())
}

func boolToFloat64(b bool) float64 {
if b {
return 1
Expand Down
1 change: 1 addition & 0 deletions proxyd/proxyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func Start(config *Config) (*Server, func(), error) {
apiKeys,
config.TxValidationMiddlewareConfig,
time.Duration(config.Server.GracefulShutdownSeconds)*time.Second,
config.IngressRPC,
)
if err != nil {
return nil, nil, fmt.Errorf("error creating server: %w", err)
Expand Down
46 changes: 43 additions & 3 deletions proxyd/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxyd

import (
"bytes"
"context"
"crypto/rand"
"crypto/subtle"
Expand Down Expand Up @@ -90,6 +91,8 @@ type Server struct {
interopValidatingConfig InteropValidationConfig
interopStrategy InteropStrategy
publicAccess bool
ingressRpc string
ingressRpcClient *http.Client
enableTxHashLogging bool

enableTxValidation bool
Expand Down Expand Up @@ -131,6 +134,7 @@ func NewServer(
limExemptKeys []string,
txValidationConfig TxValidationMiddlewareConfig,
gracefulShutdownDuration time.Duration,
ingressRpc string,
) (*Server, error) {
if cache == nil {
cache = &NoopRPCCache{}
Expand Down Expand Up @@ -259,6 +263,13 @@ func NewServer(
txValidationClient: txValidationClient,
txValidationFailOpen: txValidationFailOpen,
gracefulShutdownDuration: gracefulShutdownDuration,
ingressRpc: ingressRpc,
ingressRpcClient: &http.Client{
Timeout: defaultRPCTimeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
},
}, nil
}

Expand Down Expand Up @@ -646,7 +657,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
// Apply a sender-based rate limit if it is enabled. Note that sender-based rate
// limits apply regardless of origin or user-agent. As such, they don't use the
// isLimited method.
if parsedReq.Method == "eth_sendRawTransaction" || parsedReq.Method == "eth_sendRawTransactionConditional" {
if parsedReq.Method == "eth_sendRawTransaction" || parsedReq.Method == "eth_sendRawTransactionConditional" || parsedReq.Method == "eth_sendRawTransactionSync" {
tx, err := s.convertSendReqToSendTx(ctx, parsedReq)
if err != nil {
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err)
Expand All @@ -663,6 +674,31 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
responses[i] = NewRPCErrorRes(parsedReq.ID, err)
continue
}

// Send async copy to ingress service, don't wait for error handling
if s.ingressRpc != "" {
body, err := json.Marshal(parsedReq)
if err != nil {
log.Error("unable to marshal JSON RPC request", "source", "rpc", "err", err)
} else {
go func() {
RecordIngressRequest()

ingressStart := time.Now()
req, err := http.NewRequest(http.MethodPost, s.ingressRpc, bytes.NewBuffer(body))
req.Header.Set("Content-Type", "application/json")

resp, err := s.ingressRpcClient.Do(req)
if err != nil {
log.Warn("failed to proxy to ingress rpc", "err", err)
return
}

defer resp.Body.Close()
RecordIngressRequestDuration(time.Since(ingressStart))
}()
}
}
}

// Apply transaction validation middleware if enabled and method is configured.
Expand Down Expand Up @@ -907,15 +943,19 @@ func (s *Server) convertSendReqToSendTx(ctx context.Context, req *RPCReq) (*type
log.Debug("raw transaction conditional request has invalid number of params", "req_id", GetReqID(ctx))
// The error below is identical to the one Geth responds with.
return nil, ErrInvalidParams("missing value for required argument 0 or 1")
} else if req.Method == "eth_sendRawTransactionSync" && (len(params) == 0 || len(params) > 2) {
log.Debug("raw transaction sync request has invalid number of params", "req_id", GetReqID(ctx))
// The error below is identical to the one Geth responds with.
return nil, ErrInvalidParams("missing value for required argument 0")
}

address, ok := params[0].(string)
signedTransaction, ok := params[0].(string)
if !ok {
return nil, ErrParseErr
}

var data hexutil.Bytes
if err := data.UnmarshalText([]byte(address)); err != nil {
if err := data.UnmarshalText([]byte(signedTransaction)); err != nil {
log.Debug("error decoding raw tx data", "err", err, "req_id", GetReqID(ctx))
// Geth returns the raw error from UnmarshalText.
return nil, ErrInvalidParams(err.Error())
Expand Down
Loading