From e009c8a7d80f7b6eeef3449ab98a34d084243355 Mon Sep 17 00:00:00 2001 From: smcio Date: Fri, 8 May 2026 12:27:37 +0100 Subject: [PATCH 1/6] rpcserver: implement sendTransaction method --- cmd/sendtxprobe/main.go | 277 ++++++++++++ pkg/rpcserver/errors.go | 46 +- pkg/rpcserver/rpcserver.go | 53 +++ pkg/rpcserver/send_transaction.go | 575 +++++++++++++++++++++++++ pkg/rpcserver/send_transaction_test.go | 307 +++++++++++++ 5 files changed, 1249 insertions(+), 9 deletions(-) create mode 100644 cmd/sendtxprobe/main.go create mode 100644 pkg/rpcserver/send_transaction.go create mode 100644 pkg/rpcserver/send_transaction_test.go diff --git a/cmd/sendtxprobe/main.go b/cmd/sendtxprobe/main.go new file mode 100644 index 00000000..b1708ea8 --- /dev/null +++ b/cmd/sendtxprobe/main.go @@ -0,0 +1,277 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "os" + "os/signal" + "strings" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/programs/system" + "github.com/gagliardetto/solana-go/rpc" +) + +const ( + defaultAirdropLamports = 20_000_000 + defaultTransferLamports = 1_000_000 + defaultCluster = "testnet" +) + +type config struct { + mithrilRPC string + cluster string + clusterRPC string + airdropLamports uint64 + transferLamports uint64 + skipPreflight bool + timeout time.Duration + pollInterval time.Duration + postAirdropSettle time.Duration +} + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + cfg, err := parseFlags() + if err != nil { + fmt.Fprintf(os.Stderr, "sendtxprobe: %v\n", err) + os.Exit(2) + } + + if err := run(ctx, cfg); err != nil { + fmt.Fprintf(os.Stderr, "sendtxprobe failed: %v\n", err) + os.Exit(1) + } +} + +func parseFlags() (config, error) { + cfg := config{} + + flag.StringVar(&cfg.mithrilRPC, "mithril-rpc", "", "HTTP URL for the Mithril RPC endpoint to exercise") + flag.StringVar(&cfg.cluster, "cluster", defaultCluster, "Public Solana cluster for funding/confirmation: testnet or devnet") + flag.StringVar(&cfg.clusterRPC, "cluster-rpc", "", "Override the public cluster RPC URL used for airdrop and confirmation") + flag.Uint64Var(&cfg.airdropLamports, "airdrop-lamports", defaultAirdropLamports, "Lamports to request from the public faucet") + flag.Uint64Var(&cfg.transferLamports, "transfer-lamports", defaultTransferLamports, "Lamports to send through Mithril") + flag.BoolVar(&cfg.skipPreflight, "skip-preflight", false, "Pass skipPreflight=true to Mithril sendTransaction") + flag.DurationVar(&cfg.timeout, "timeout", 3*time.Minute, "Max time to wait for the airdrop and transfer confirmations") + flag.DurationVar(&cfg.pollInterval, "poll-interval", 2*time.Second, "Polling interval for signature confirmation") + flag.DurationVar(&cfg.postAirdropSettle, "post-airdrop-settle", 5*time.Second, "Extra wait after the airdrop confirms before submitting via Mithril") + flag.Parse() + + if cfg.mithrilRPC == "" { + return config{}, errors.New("must provide -mithril-rpc") + } + if cfg.transferLamports == 0 { + return config{}, errors.New("-transfer-lamports must be greater than zero") + } + if cfg.airdropLamports <= cfg.transferLamports { + return config{}, fmt.Errorf("-airdrop-lamports (%d) must be greater than -transfer-lamports (%d)", cfg.airdropLamports, cfg.transferLamports) + } + if cfg.timeout <= 0 { + return config{}, errors.New("-timeout must be greater than zero") + } + if cfg.pollInterval <= 0 { + return config{}, errors.New("-poll-interval must be greater than zero") + } + if cfg.clusterRPC == "" { + clusterRPC, err := clusterRPCFor(cfg.cluster) + if err != nil { + return config{}, err + } + cfg.clusterRPC = clusterRPC + } + + return cfg, nil +} + +func run(ctx context.Context, cfg config) error { + clusterClient := rpc.New(cfg.clusterRPC) + defer clusterClient.Close() + + mithrilClient := rpc.New(cfg.mithrilRPC) + defer mithrilClient.Close() + + sender := solana.NewWallet() + recipient := solana.NewWallet() + + fmt.Printf("Mithril RPC: %s\n", cfg.mithrilRPC) + fmt.Printf("Cluster RPC: %s\n", cfg.clusterRPC) + fmt.Printf("Sender: %s\n", sender.PublicKey()) + fmt.Printf("Recipient: %s\n", recipient.PublicKey()) + + fmt.Printf("Requesting airdrop of %d lamports for sender...\n", cfg.airdropLamports) + airdropSig, err := clusterClient.RequestAirdrop(ctx, sender.PublicKey(), cfg.airdropLamports, rpc.CommitmentConfirmed) + if err != nil { + return formatAirdropError(cfg.cluster, cfg.clusterRPC, err) + } + fmt.Printf("Airdrop signature: %s\n", airdropSig) + + if err := waitForSignature(ctx, clusterClient, airdropSig, cfg.timeout, cfg.pollInterval, "airdrop"); err != nil { + return err + } + + senderBalance, err := clusterClient.GetBalance(ctx, sender.PublicKey(), rpc.CommitmentConfirmed) + if err != nil { + return fmt.Errorf("get sender balance from %s: %w", cfg.clusterRPC, err) + } + fmt.Printf("Sender confirmed balance: %d lamports\n", senderBalance.Value) + if senderBalance.Value < cfg.transferLamports { + return fmt.Errorf("sender balance %d is lower than transfer amount %d", senderBalance.Value, cfg.transferLamports) + } + + if cfg.postAirdropSettle > 0 { + fmt.Printf("Waiting %s for Mithril to catch up with the airdrop...\n", cfg.postAirdropSettle) + timer := time.NewTimer(cfg.postAirdropSettle) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + } + } + + latestBlockhash, err := mithrilClient.GetLatestBlockhash(ctx, rpc.CommitmentConfirmed) + if err != nil { + return fmt.Errorf("getLatestBlockhash via Mithril %s: %w", cfg.mithrilRPC, err) + } + if latestBlockhash == nil || latestBlockhash.Value == nil { + return errors.New("Mithril getLatestBlockhash returned no value") + } + fmt.Printf("Mithril latest blockhash: %s\n", latestBlockhash.Value.Blockhash) + + instruction, err := system.NewTransferInstruction( + cfg.transferLamports, + sender.PublicKey(), + recipient.PublicKey(), + ).ValidateAndBuild() + if err != nil { + return fmt.Errorf("build transfer instruction: %w", err) + } + + tx, err := solana.NewTransaction( + []solana.Instruction{instruction}, + latestBlockhash.Value.Blockhash, + solana.TransactionPayer(sender.PublicKey()), + ) + if err != nil { + return fmt.Errorf("build transaction: %w", err) + } + + _, err = tx.Sign(func(key solana.PublicKey) *solana.PrivateKey { + if key.Equals(sender.PublicKey()) { + return &sender.PrivateKey + } + return nil + }) + if err != nil { + return fmt.Errorf("sign transaction: %w", err) + } + + txSig, err := mithrilClient.SendTransactionWithOpts(ctx, tx, rpc.TransactionOpts{ + SkipPreflight: cfg.skipPreflight, + PreflightCommitment: rpc.CommitmentConfirmed, + }) + if err != nil { + return fmt.Errorf("sendTransaction via Mithril %s: %w", cfg.mithrilRPC, err) + } + fmt.Printf("Submitted via Mithril sendTransaction: %s\n", txSig) + + if err := waitForSignature(ctx, clusterClient, txSig, cfg.timeout, cfg.pollInterval, "transfer"); err != nil { + return err + } + + recipientBalance, err := clusterClient.GetBalance(ctx, recipient.PublicKey(), rpc.CommitmentConfirmed) + if err != nil { + return fmt.Errorf("get recipient balance from %s: %w", cfg.clusterRPC, err) + } + fmt.Printf("Recipient confirmed balance: %d lamports\n", recipientBalance.Value) + if recipientBalance.Value < cfg.transferLamports { + return fmt.Errorf("recipient balance %d is lower than transfer amount %d", recipientBalance.Value, cfg.transferLamports) + } + + fmt.Printf("sendTransaction flow succeeded through Mithril.\n") + return nil +} + +func clusterRPCFor(cluster string) (string, error) { + switch strings.ToLower(strings.TrimSpace(cluster)) { + case "testnet": + return rpc.TestNet_RPC, nil + case "devnet": + return rpc.DevNet_RPC, nil + default: + return "", fmt.Errorf("unsupported -cluster %q; expected testnet or devnet", cluster) + } +} + +func waitForSignature( + ctx context.Context, + client *rpc.Client, + signature solana.Signature, + timeout time.Duration, + pollInterval time.Duration, + label string, +) error { + waitCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + for { + status, err := getSignatureStatus(waitCtx, client, signature) + if err == nil { + if status.Err != nil { + return fmt.Errorf("%s signature %s failed on cluster: %v", label, signature, status.Err) + } + if signatureConfirmed(status) { + fmt.Printf("%s signature confirmed on cluster: %s\n", label, signature) + return nil + } + } else if !errors.Is(err, rpc.ErrNotFound) { + return fmt.Errorf("getSignatureStatuses for %s signature %s: %w", label, signature, err) + } + + select { + case <-waitCtx.Done(): + return fmt.Errorf("timed out waiting for %s signature %s: %w", label, signature, waitCtx.Err()) + case <-ticker.C: + } + } +} + +func getSignatureStatus(ctx context.Context, client *rpc.Client, signature solana.Signature) (*rpc.SignatureStatusesResult, error) { + statuses, err := client.GetSignatureStatuses(ctx, true, signature) + if err != nil { + return nil, err + } + if len(statuses.Value) == 0 || statuses.Value[0] == nil { + return nil, rpc.ErrNotFound + } + return statuses.Value[0], nil +} + +func signatureConfirmed(status *rpc.SignatureStatusesResult) bool { + if status == nil || status.Err != nil { + return false + } + + switch status.ConfirmationStatus { + case rpc.ConfirmationStatusConfirmed, rpc.ConfirmationStatusFinalized: + return true + default: + return status.Confirmations == nil + } +} + +func formatAirdropError(cluster string, endpoint string, err error) error { + if strings.EqualFold(cluster, "testnet") { + return fmt.Errorf("request airdrop via %s: %w (testnet faucet/public RPC can be flaky; retry, override -cluster-rpc, or try -cluster devnet)", endpoint, err) + } + return fmt.Errorf("request airdrop via %s: %w", endpoint, err) +} diff --git a/pkg/rpcserver/errors.go b/pkg/rpcserver/errors.go index b2562d30..ee2a1654 100644 --- a/pkg/rpcserver/errors.go +++ b/pkg/rpcserver/errors.go @@ -11,14 +11,12 @@ import ( const ( // -32602 is the JSON-RPC standard "Invalid params" code. rpcCodeInvalidParams jsonrpc.ErrorCode = -32602 + // -32002 matches Agave's SendTransactionPreflightFailure. + rpcCodeSendTransactionPreflightFailure jsonrpc.ErrorCode = -32002 // -32016 is Agave's reserved code for MinContextSlotNotReached. rpcCodeMinContextSlotNotReached jsonrpc.ErrorCode = -32016 ) -// MinContextSlotNotReachedError is returned when the caller demands a -// minimum context slot we have not yet reached. The structured payload -// is emitted as the JSON-RPC error's `data` field (Agave-compatible), -// matching the shape `{"contextSlot": N}` that Solana clients consume. type MinContextSlotNotReachedError struct { ContextSlot uint64 } @@ -57,9 +55,6 @@ func (e *MinContextSlotNotReachedError) FromJSONRPCError(rpcErr jsonrpc.JSONRPCE return nil } -// InvalidParamsError maps a Mithril-side argument-validation failure to -// the standard JSON-RPC -32602. Use for shape and conflict checks like -// "sigVerify may not be used with replaceRecentBlockhash". type InvalidParamsError struct { Message string } @@ -81,11 +76,44 @@ func (e *InvalidParamsError) FromJSONRPCError(rpcErr jsonrpc.JSONRPCError) error return nil } -// rpcErrorRegistry returns an Errors set with Mithril's custom codes -// registered. Pass to jsonrpc.NewServer via WithServerErrors. +type SendTransactionPreflightFailureError struct { + Message string + Result SimulateTransactionRespValue +} + +func (e *SendTransactionPreflightFailureError) Error() string { return e.Message } + +func (e *SendTransactionPreflightFailureError) ToJSONRPCError() (jsonrpc.JSONRPCError, error) { + return jsonrpc.JSONRPCError{ + Code: rpcCodeSendTransactionPreflightFailure, + Message: e.Message, + Data: e.Result, + }, nil +} + +func (e *SendTransactionPreflightFailureError) FromJSONRPCError(rpcErr jsonrpc.JSONRPCError) error { + if rpcErr.Code != rpcCodeSendTransactionPreflightFailure { + return fmt.Errorf("unexpected code %d for SendTransactionPreflightFailureError", rpcErr.Code) + } + e.Message = rpcErr.Message + if rpcErr.Data == nil { + e.Result = SimulateTransactionRespValue{} + return nil + } + raw, err := json.Marshal(rpcErr.Data) + if err != nil { + return fmt.Errorf("re-encoding SendTransactionPreflightFailureError data: %w", err) + } + if err := json.Unmarshal(raw, &e.Result); err != nil { + return fmt.Errorf("decoding SendTransactionPreflightFailureError data: %w", err) + } + return nil +} + func rpcErrorRegistry() jsonrpc.Errors { errs := jsonrpc.NewErrors() errs.Register(rpcCodeInvalidParams, new(*InvalidParamsError)) + errs.Register(rpcCodeSendTransactionPreflightFailure, new(*SendTransactionPreflightFailureError)) errs.Register(rpcCodeMinContextSlotNotReached, new(*MinContextSlotNotReachedError)) return errs } diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index 47b7fc6b..5994d54a 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -1,17 +1,22 @@ package rpcserver import ( + "context" "fmt" "net" "net/http" "net/http/httptest" + "net/netip" "strings" "sync" + "time" "github.com/Overclock-Validator/mithril/pkg/accountsdb" + "github.com/Overclock-Validator/mithril/pkg/mlog" "github.com/Overclock-Validator/mithril/pkg/sealevel" "github.com/filecoin-project/go-jsonrpc" bin "github.com/gagliardetto/binary" + "github.com/gagliardetto/solana-go" ) type RpcServer struct { @@ -23,6 +28,18 @@ type RpcServer struct { epochSchedule *sealevel.SysvarEpochSchedule slotCtx *sealevel.SlotCtx slotCtxMu sync.RWMutex + + leaderTPUCacheMu sync.RWMutex + leaderTPUByIdentity map[solana.PublicKey]netip.AddrPort + leaderTPUCacheUpdatedAt time.Time + clusterNodesRefreshEvery time.Duration + clusterNodesRefreshOnce sync.Once + + clusterRPCEndpoints []string + clusterNodesFetcher clusterNodesFetcher + // packetSender is injectable for tests; production defaults to UDP. + packetSender packetSender + sendTransactionLeaderForwardCount uint64 } func NewRpcServer(acctsDb *accountsdb.AccountsDb, port uint16) *RpcServer { @@ -47,6 +64,11 @@ func NewRpcServer(acctsDb *accountsdb.AccountsDb, port uint16) *RpcServer { rpcServer.rpcService.Register("MithrilRpc", rpcServer) rpcServer.acctsDb = acctsDb rpcServer.epochSchedule = fetchAndUnmarshalEpochScheduleSysvar(acctsDb) + rpcServer.leaderTPUByIdentity = make(map[solana.PublicKey]netip.AddrPort) + rpcServer.clusterNodesRefreshEvery = sendTransactionClusterNodesRefreshEvery + rpcServer.clusterRPCEndpoints = configuredSendTransactionRPCEndpoints() + rpcServer.packetSender = defaultPacketSender + rpcServer.sendTransactionLeaderForwardCount = sendTransactionLeaderForwardCount return rpcServer } @@ -77,5 +99,36 @@ func (rpcServer *RpcServer) getSlotCtx() *sealevel.SlotCtx { } func (rpcServer *RpcServer) Start() { + rpcServer.startClusterNodesRefreshLoop() go http.Serve(rpcServer.listener, rpcServer.rpcService) } + +func (rpcServer *RpcServer) startClusterNodesRefreshLoop() { + rpcServer.clusterNodesRefreshOnce.Do(func() { + if rpcServer.clusterNodesFetcher == nil && len(rpcServer.clusterRPCEndpoints) == 0 { + return + } + + go func() { + if err := rpcServer.refreshLeaderTPUCache(context.Background()); err != nil { + mlog.Log.Warnf("sendTransaction: initial cluster node refresh failed: %v", err) + } + + ticker := time.NewTicker(rpcServer.clusterNodesRefreshInterval()) + defer ticker.Stop() + + for range ticker.C { + if err := rpcServer.refreshLeaderTPUCache(context.Background()); err != nil { + mlog.Log.Warnf("sendTransaction: periodic cluster node refresh failed: %v", err) + } + } + }() + }) +} + +func (rpcServer *RpcServer) clusterNodesRefreshInterval() time.Duration { + if rpcServer.clusterNodesRefreshEvery > 0 { + return rpcServer.clusterNodesRefreshEvery + } + return sendTransactionClusterNodesRefreshEvery +} diff --git a/pkg/rpcserver/send_transaction.go b/pkg/rpcserver/send_transaction.go new file mode 100644 index 00000000..bd19e297 --- /dev/null +++ b/pkg/rpcserver/send_transaction.go @@ -0,0 +1,575 @@ +package rpcserver + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "net" + "net/netip" + "strings" + "time" + + "github.com/Overclock-Validator/mithril/pkg/config" + "github.com/Overclock-Validator/mithril/pkg/features" + "github.com/Overclock-Validator/mithril/pkg/global" + "github.com/Overclock-Validator/mithril/pkg/mlog" + "github.com/Overclock-Validator/mithril/pkg/replay" + "github.com/Overclock-Validator/mithril/pkg/sealevel" + "github.com/filecoin-project/go-jsonrpc" + bin "github.com/gagliardetto/binary" + "github.com/gagliardetto/solana-go" + solanarpc "github.com/gagliardetto/solana-go/rpc" + "github.com/mr-tron/base58" +) + +type clusterNodesFetcher func(context.Context) ([]*solanarpc.GetClusterNodesResult, error) +type packetSender func([]byte, netip.AddrPort) error + +type sendTransactionConfig struct { + encoding string + skipPreflight bool + preflightCommitment string + maxRetries *uint + minContextSlot *uint64 +} + +const ( + maxBase58TxSize = 1683 + maxBase64TxSize = 1644 + packetDataSize = 1232 + sendTransactionLeaderForwardCount = 5 + sendTransactionTargetCount = sendTransactionLeaderForwardCount + 1 + sendTransactionLeaderLookahead = 64 + sendTransactionClusterNodesRefreshEvery = 10 * time.Minute + maxSanitizedInstructionCount = 64 +) + +var errInvalidSanitizedTransaction = &InvalidParamsError{ + Message: "invalid transaction: Transaction failed to sanitize accounts offsets correctly", +} + +func (rpcServer *RpcServer) SendTransaction(ctx context.Context, p jsonrpc.RawParams) (string, error) { + params, err := jsonrpc.DecodeParams[[]interface{}](p) + if err != nil { + return "", &InvalidParamsError{Message: fmt.Sprintf("decoding params: %v", err)} + } + if len(params) < 1 { + return "", &InvalidParamsError{Message: "sendTransaction requires a transaction string as first parameter"} + } + + txStr, ok := params[0].(string) + if !ok { + return "", &InvalidParamsError{Message: "sendTransaction requires a transaction string as first parameter"} + } + + conf := parseSendTransactionConfig(params) + + tx, wire, err := decodeSendTransaction(txStr, conf.encoding) + if err != nil { + return "", err + } + + // Legacy and already-expanded txs can be bounds-checked immediately. + if err := validateSendTransactionSanitize(tx, featuresForSendValidation(rpcServer.getSlotCtx()), shouldValidateInstructionIndexes(tx)); err != nil { + return "", err + } + + if conf.minContextSlot != nil && global.Slot() < *conf.minContextSlot { + return "", &MinContextSlotNotReachedError{ContextSlot: *conf.minContextSlot} + } + + if !conf.skipPreflight { + slotCtx := rpcServer.getSlotCtx() + if slotCtx == nil { + return "", fmt.Errorf("node is not ready for transaction preflight") + } + if err := rpcServer.preflightSendTransaction(ctx, tx, slotCtx); err != nil { + return "", err + } + } + + signature, err := firstSignature(tx) + if err != nil { + return "", err + } + + if err := rpcServer.forwardTransactionToUpcomingLeaders(ctx, wire); err != nil { + return "", err + } + + return signature.String(), nil +} + +func parseSendTransactionConfig(params []interface{}) sendTransactionConfig { + conf := sendTransactionConfig{ + encoding: "base58", + } + if len(params) < 2 { + return conf + } + + confMap, ok := params[1].(map[string]interface{}) + if !ok { + return conf + } + + if encoding, ok := confMap["encoding"].(string); ok { + conf.encoding = encoding + } + if skipPreflight, ok := confMap["skipPreflight"].(bool); ok { + conf.skipPreflight = skipPreflight + } + if preflightCommitment, ok := confMap["preflightCommitment"].(string); ok { + conf.preflightCommitment = preflightCommitment + } + if maxRetries, ok := confMap["maxRetries"].(float64); ok && maxRetries >= 0 { + v := uint(maxRetries) + conf.maxRetries = &v + } + if minContextSlot, ok := confMap["minContextSlot"].(float64); ok && minContextSlot >= 0 { + v := uint64(minContextSlot) + conf.minContextSlot = &v + } + + return conf +} + +func decodeSendTransaction(txStr string, encoding string) (*solana.Transaction, []byte, error) { + if encoding != "base58" && encoding != "base64" { + return nil, nil, &InvalidParamsError{ + Message: fmt.Sprintf("unsupported encoding: %s. Supported encodings: base58, base64", encoding), + } + } + + if encoding == "base58" && len(txStr) > maxBase58TxSize { + return nil, nil, &InvalidParamsError{ + Message: fmt.Sprintf("base58 encoded solana_transaction too large: %d bytes (max: encoded/raw %d/%d)", len(txStr), maxBase58TxSize, packetDataSize), + } + } + if encoding == "base64" && len(txStr) > maxBase64TxSize { + return nil, nil, &InvalidParamsError{ + Message: fmt.Sprintf("base64 encoded solana_transaction too large: %d bytes (max: encoded/raw %d/%d)", len(txStr), maxBase64TxSize, packetDataSize), + } + } + + var ( + wire []byte + err error + ) + switch encoding { + case "base58": + wire, err = base58.Decode(txStr) + if err != nil { + return nil, nil, &InvalidParamsError{Message: fmt.Sprintf("invalid base58 encoding: %v", err)} + } + default: + wire, err = base64.StdEncoding.DecodeString(txStr) + if err != nil { + return nil, nil, &InvalidParamsError{Message: fmt.Sprintf("invalid base64 encoding: %v", err)} + } + } + + if len(wire) > packetDataSize { + return nil, nil, &InvalidParamsError{ + Message: fmt.Sprintf("decoded solana_transaction too large: %d bytes (max: %d bytes)", len(wire), packetDataSize), + } + } + + tx, err := solana.TransactionFromDecoder(bin.NewBinDecoder(wire)) + if err != nil { + return nil, nil, &InvalidParamsError{Message: fmt.Sprintf("failed to deserialize solana_transaction: %v", err)} + } + + return tx, wire, nil +} + +func featuresForSendValidation(slotCtx *sealevel.SlotCtx) *features.Features { + if slotCtx != nil && slotCtx.Features != nil { + return slotCtx.Features + } + return nil +} + +func shouldValidateInstructionIndexes(tx *solana.Transaction) bool { + return !tx.Message.IsVersioned() || tx.Message.AddressTableLookups.NumLookups() == 0 +} + +func validateSendTransactionSanitize(tx *solana.Transaction, feats *features.Features, validateInstructionIndexes bool) error { + hdr := tx.Message.Header + numKeys := len(tx.Message.AccountKeys) + + if hdr.NumReadonlySignedAccounts >= hdr.NumRequiredSignatures || + int(hdr.NumRequiredSignatures) > len(tx.Signatures) || + len(tx.Signatures) > numKeys { + return errInvalidSanitizedTransaction + } + + if feats != nil && + feats.IsActive(features.StaticInstructionLimit) && + len(tx.Message.Instructions) > maxSanitizedInstructionCount { + return errInvalidSanitizedTransaction + } + + if !validateInstructionIndexes { + return nil + } + + for _, ci := range tx.Message.Instructions { + if int(ci.ProgramIDIndex) >= numKeys { + return errInvalidSanitizedTransaction + } + for _, idx := range ci.Accounts { + if int(idx) >= numKeys { + return errInvalidSanitizedTransaction + } + } + } + + return nil +} + +func firstSignature(tx *solana.Transaction) (solana.Signature, error) { + if len(tx.Signatures) == 0 { + return solana.Signature{}, errInvalidSanitizedTransaction + } + return tx.Signatures[0], nil +} + +func (rpcServer *RpcServer) preflightSendTransaction(ctx context.Context, tx *solana.Transaction, slotCtx *sealevel.SlotCtx) error { + if err := rpcServer.resolveAddressTablesForPreflight(ctx, tx, slotCtx); err != nil { + return err + } + + if err := validateSendTransactionSanitize(tx, slotCtx.Features, true); err != nil { + return err + } + + if err := tx.VerifySignatures(); err != nil { + return signaturePreflightFailure() + } + + output := replay.LoadAndExecuteTransaction(replay.LoadAndExecuteTransactionInput{ + SlotCtx: slotCtx, + Transaction: tx, + TxMeta: nil, + IsSimulation: true, + RecordInnerInstructions: false, + }) + + if output.ProcessingResult.TransactionError == nil { + return nil + } + if output.ProcessingResult.TransactionError.ErrorType == replay.TransactionErrorSanitizeFailure { + return errInvalidSanitizedTransaction + } + + txErr := output.ProcessingResult.TransactionError + return &SendTransactionPreflightFailureError{ + Message: fmt.Sprintf("Transaction simulation failed: %s", sendTransactionErrorMessage(txErr)), + Result: sendTransactionFailureResultFromOutput(output), + } +} + +func (rpcServer *RpcServer) resolveAddressTablesForPreflight(ctx context.Context, tx *solana.Transaction, slotCtx *sealevel.SlotCtx) error { + if !tx.Message.IsVersioned() || tx.Message.AddressTableLookups.NumLookups() == 0 { + return nil + } + if rpcServer.acctsDb == nil { + return &InvalidParamsError{Message: "invalid transaction: address lookup table resolution unavailable"} + } + if err := replay.ResolveAddrTableLookupsForTx(ctx, rpcServer.acctsDb, slotCtx.Slot, tx); err != nil { + return &InvalidParamsError{Message: fmt.Sprintf("invalid transaction: %s", err.Error())} + } + return nil +} + +func signaturePreflightFailure() *SendTransactionPreflightFailureError { + return &SendTransactionPreflightFailureError{ + Message: "Transaction simulation failed: Transaction did not pass signature verification", + Result: sendTransactionFailureResult( + "SignatureFailure", + []string{}, + 0, + 0, + nil, + ), + } +} + +func sendTransactionFailureResultFromOutput(output replay.LoadAndExecuteTransactionOutput) SimulateTransactionRespValue { + logs := []string{} + if output.ExecCtx != nil { + if logRecorder, ok := output.ExecCtx.Log.(*sealevel.LogRecorder); ok && logRecorder != nil && logRecorder.Logs != nil { + logs = clampLogs(logRecorder.Logs) + } + } + + var ( + unitsConsumed uint64 + dataSize uint32 + returnData *ReturnDataPayload + ) + + if processedTx := output.ProcessingResult.ProcessedTransaction; processedTx != nil && processedTx.Executed != nil { + executed := processedTx.Executed + unitsConsumed = executed.ExecutionDetails.ExecutedUnits + dataSize = executed.LoadedTransaction.LoadedAccountsDataSize + if executed.ExecutionDetails.ReturnData != nil { + rd := executed.ExecutionDetails.ReturnData + clamped := clampReturnData(rd.Data) + returnData = &ReturnDataPayload{ + ProgramId: rd.ProgramId.String(), + Data: []string{base64.StdEncoding.EncodeToString(clamped), "base64"}, + } + } + } else if output.ExecCtx != nil { + unitsConsumed = output.ExecCtx.ComputeMeter.Used() + dataSize = loadedAccountsDataSizeFromExecCtx(output.ExecCtx) + } + + return sendTransactionFailureResult( + output.ProcessingResult.TransactionError, + logs, + unitsConsumed, + dataSize, + returnData, + ).withFee(output) +} + +func sendTransactionFailureResult(errValue interface{}, logs []string, unitsConsumed uint64, dataSize uint32, returnData *ReturnDataPayload) SimulateTransactionRespValue { + return SimulateTransactionRespValue{ + Err: errValue, + Logs: ptrSlice(logs), + UnitsConsumed: &unitsConsumed, + ReturnData: returnData, + InnerInstructions: nil, + LoadedAccountsDataSize: &dataSize, + Fee: nil, + PreBalances: nil, + PostBalances: nil, + PreTokenBalances: nil, + PostTokenBalances: nil, + LoadedAddresses: nil, + } +} + +func (v SimulateTransactionRespValue) withFee(output replay.LoadAndExecuteTransactionOutput) SimulateTransactionRespValue { + if output.FeeInfo != nil { + fee := output.FeeInfo.TotalFee + v.Fee = &fee + } + return v +} + +func sendTransactionErrorMessage(txErr *replay.TransactionError) string { + if txErr == nil { + return "unknown error" + } + + switch txErr.ErrorType { + case replay.TransactionErrorBlockhashNotFound: + return "Blockhash not found" + case replay.TransactionErrorSignatureFailure: + return "Transaction did not pass signature verification" + case replay.TransactionErrorSanitizeFailure: + return "Transaction failed to sanitize accounts offsets correctly" + case replay.TransactionErrorInstructionError: + if txErr.InstructionError != nil { + return normalizeSendTransactionErrorName(txErr.InstructionError.Error()) + } + } + + return normalizeSendTransactionErrorName(txErr.ErrorType.String()) +} + +func normalizeSendTransactionErrorName(name string) string { + switch { + case strings.HasPrefix(name, "InstrErr"): + return strings.TrimPrefix(name, "InstrErr") + case strings.HasPrefix(name, "TxErr"): + return strings.TrimPrefix(name, "TxErr") + default: + return name + } +} + +func (rpcServer *RpcServer) forwardTransactionToUpcomingLeaders(ctx context.Context, wire []byte) error { + targetCount := int(rpcServer.sendTransactionLeaderForwardCount) + 1 + if targetCount <= 1 { + targetCount = sendTransactionTargetCount + } + + targets, err := rpcServer.resolveUpcomingLeaderTPUAddresses(ctx, targetCount) + if err != nil { + return err + } + + send := rpcServer.packetSender + if send == nil { + send = defaultPacketSender + } + + var sendErrs []error + sentCount := 0 + for _, target := range targets { + if err := send(wire, target); err != nil { + sendErrs = append(sendErrs, fmt.Errorf("%s: %w", target.String(), err)) + continue + } + sentCount++ + } + + if sentCount == 0 { + return fmt.Errorf("failed to forward transaction to any leader TPU: %w", errors.Join(sendErrs...)) + } + if len(sendErrs) > 0 { + mlog.Log.Warnf("sendTransaction: forwarded to %d/%d leader TPUs; partial failures: %v", sentCount, len(targets), errors.Join(sendErrs...)) + } + return nil +} + +func (rpcServer *RpcServer) resolveUpcomingLeaderTPUAddresses(ctx context.Context, want int) ([]netip.AddrPort, error) { + if want <= 0 { + want = 1 + } + + targets, updatedAt := rpcServer.collectUpcomingLeaderTPUAddressesFromCache(want) + cacheStale := updatedAt.IsZero() || time.Since(updatedAt) >= rpcServer.clusterNodesRefreshInterval() + if cacheStale || len(targets) < want { + if err := rpcServer.refreshLeaderTPUCache(ctx); err != nil { + if len(targets) == 0 { + return nil, err + } + mlog.Log.Warnf("sendTransaction: using partial cached TPU target set after refresh failure: %v", err) + return targets, nil + } + targets, _ = rpcServer.collectUpcomingLeaderTPUAddressesFromCache(want) + } + + if len(targets) == 0 { + return nil, fmt.Errorf("unable to resolve TPU addresses for upcoming leaders from current leader schedule") + } + return targets, nil +} + +func (rpcServer *RpcServer) fetchClusterNodes(ctx context.Context) ([]*solanarpc.GetClusterNodesResult, error) { + if rpcServer.clusterNodesFetcher != nil { + return rpcServer.clusterNodesFetcher(ctx) + } + + endpoints := rpcServer.clusterRPCEndpoints + if len(endpoints) == 0 { + endpoints = configuredSendTransactionRPCEndpoints() + } + if len(endpoints) == 0 { + return nil, fmt.Errorf("no cluster RPC endpoints configured for sendTransaction leader resolution") + } + + var lastErr error + for _, endpoint := range endpoints { + client := solanarpc.New(endpoint) + nodes, err := client.GetClusterNodes(ctx) + if err == nil { + return nodes, nil + } + lastErr = err + } + + if lastErr == nil { + lastErr = fmt.Errorf("unable to fetch cluster nodes") + } + return nil, lastErr +} + +func (rpcServer *RpcServer) refreshLeaderTPUCache(ctx context.Context) error { + nodes, err := rpcServer.fetchClusterNodes(ctx) + if err != nil { + return err + } + + leaderTPUs := make(map[solana.PublicKey]netip.AddrPort, len(nodes)) + for _, node := range nodes { + if node == nil || node.TPU == nil || *node.TPU == "" { + continue + } + addr, err := netip.ParseAddrPort(*node.TPU) + if err != nil { + continue + } + leaderTPUs[node.Pubkey] = addr + } + + if len(leaderTPUs) == 0 { + return fmt.Errorf("cluster did not advertise any TPU UDP endpoints") + } + + rpcServer.leaderTPUCacheMu.Lock() + rpcServer.leaderTPUByIdentity = leaderTPUs + rpcServer.leaderTPUCacheUpdatedAt = time.Now() + rpcServer.leaderTPUCacheMu.Unlock() + return nil +} + +func (rpcServer *RpcServer) collectUpcomingLeaderTPUAddressesFromCache(want int) ([]netip.AddrPort, time.Time) { + rpcServer.leaderTPUCacheMu.RLock() + nodeTPUs := make(map[solana.PublicKey]netip.AddrPort, len(rpcServer.leaderTPUByIdentity)) + for leader, addr := range rpcServer.leaderTPUByIdentity { + nodeTPUs[leader] = addr + } + updatedAt := rpcServer.leaderTPUCacheUpdatedAt + rpcServer.leaderTPUCacheMu.RUnlock() + + currentSlot := global.Slot() + targets := make([]netip.AddrPort, 0, want) + seenLeaders := make(map[solana.PublicKey]struct{}, want) + seenTargets := make(map[netip.AddrPort]struct{}, want) + + for offset := uint64(0); offset < sendTransactionLeaderLookahead && len(targets) < want; offset++ { + leader, ok := global.LeaderForSlot(currentSlot + offset) + if !ok { + continue + } + if _, exists := seenLeaders[leader]; exists { + continue + } + seenLeaders[leader] = struct{}{} + + target, ok := nodeTPUs[leader] + if !ok { + continue + } + if _, exists := seenTargets[target]; exists { + continue + } + seenTargets[target] = struct{}{} + targets = append(targets, target) + } + + return targets, updatedAt +} + +func configuredSendTransactionRPCEndpoints() []string { + endpoints := config.GetStringSlice("network.rpc") + if len(endpoints) == 0 { + endpoints = config.GetStringSlice("rpc.rpc") + } + return endpoints +} + +func defaultPacketSender(payload []byte, target netip.AddrPort) error { + conn, err := net.ListenUDP("udp", nil) + if err != nil { + return err + } + defer conn.Close() + + written, err := conn.WriteToUDPAddrPort(payload, target) + if err != nil { + return err + } + if written != len(payload) { + return fmt.Errorf("short write: wrote %d of %d bytes", written, len(payload)) + } + return nil +} diff --git a/pkg/rpcserver/send_transaction_test.go b/pkg/rpcserver/send_transaction_test.go new file mode 100644 index 00000000..95441546 --- /dev/null +++ b/pkg/rpcserver/send_transaction_test.go @@ -0,0 +1,307 @@ +package rpcserver + +import ( + "context" + "encoding/base64" + "encoding/json" + "net" + "testing" + "time" + + "github.com/Overclock-Validator/mithril/pkg/features" + "github.com/Overclock-Validator/mithril/pkg/global" + "github.com/Overclock-Validator/mithril/pkg/leaderschedule" + "github.com/Overclock-Validator/mithril/pkg/sealevel" + "github.com/filecoin-project/go-jsonrpc" + "github.com/gagliardetto/solana-go" + solanarpc "github.com/gagliardetto/solana-go/rpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseSendTransactionConfig_Defaults(t *testing.T) { + conf := parseSendTransactionConfig([]interface{}{"tx"}) + assert.Equal(t, "base58", conf.encoding) + assert.False(t, conf.skipPreflight) + assert.Nil(t, conf.maxRetries) + assert.Nil(t, conf.minContextSlot) +} + +func TestSendTransaction_RejectsSanitizeFailure(t *testing.T) { + rpcServer := &RpcServer{} + tx := &solana.Transaction{ + Message: solana.Message{ + Header: solana.MessageHeader{NumRequiredSignatures: 1}, + AccountKeys: []solana.PublicKey{{1}}, + }, + } + wire, err := tx.MarshalBinary() + require.NoError(t, err) + + _, err = rpcServer.SendTransaction( + context.Background(), + mustRawParams(t, []interface{}{ + base64.StdEncoding.EncodeToString(wire), + map[string]interface{}{"encoding": "base64", "skipPreflight": true}, + }), + ) + require.Error(t, err) + + invalidParams, ok := err.(*InvalidParamsError) + require.True(t, ok) + assert.Equal(t, "invalid transaction: Transaction failed to sanitize accounts offsets correctly", invalidParams.Message) +} + +func TestSendTransaction_PreflightSignatureFailure(t *testing.T) { + rpcServer := &RpcServer{ + slotCtx: &sealevel.SlotCtx{ + Slot: 123, + Features: features.NewFeaturesDefault(), + }, + } + + tx, wire := testLegacyTransaction(t) + _, err := rpcServer.SendTransaction( + context.Background(), + mustRawParams(t, []interface{}{ + base64.StdEncoding.EncodeToString(wire), + map[string]interface{}{"encoding": "base64"}, + }), + ) + require.Error(t, err) + + preflightErr, ok := err.(*SendTransactionPreflightFailureError) + require.True(t, ok) + assert.Equal(t, "Transaction simulation failed: Transaction did not pass signature verification", preflightErr.Message) + assert.Equal(t, "SignatureFailure", preflightErr.Result.Err) + require.NotNil(t, preflightErr.Result.Logs) + assert.Equal(t, []string{}, *preflightErr.Result.Logs) + require.NotNil(t, preflightErr.Result.UnitsConsumed) + assert.Equal(t, uint64(0), *preflightErr.Result.UnitsConsumed) + require.NotNil(t, preflightErr.Result.LoadedAccountsDataSize) + assert.Equal(t, uint32(0), *preflightErr.Result.LoadedAccountsDataSize) + assert.Nil(t, preflightErr.Result.LoadedAddresses) + + sig, sigErr := firstSignature(tx) + require.NoError(t, sigErr) + assert.Equal(t, tx.Signatures[0], sig) +} + +func TestSendTransaction_SkipPreflight_FansOutToUpcomingLeaders(t *testing.T) { + listenerA := mustListenUDP(t) + defer listenerA.Close() + listenerB := mustListenUDP(t) + defer listenerB.Close() + listenerC := mustListenUDP(t) + defer listenerC.Close() + listenerD := mustListenUDP(t) + defer listenerD.Close() + listenerE := mustListenUDP(t) + defer listenerE.Close() + listenerF := mustListenUDP(t) + defer listenerF.Close() + + leaderA := solana.PublicKey{0xA1} + leaderB := solana.PublicKey{0xB2} + leaderC := solana.PublicKey{0xC3} + leaderD := solana.PublicKey{0xD4} + leaderE := solana.PublicKey{0xE5} + leaderF := solana.PublicKey{0xF6} + + global.SetLeaderSchedule(leaderschedule.NewLeaderScheduleFromKeyedSlots( + map[solana.PublicKey][]uint64{ + leaderA: {0, 1, 2, 3}, + leaderB: {4, 5, 6, 7}, + leaderC: {8, 9, 10, 11}, + leaderD: {12, 13, 14, 15}, + leaderE: {16, 17, 18, 19}, + leaderF: {20, 21, 22, 23}, + }, + 100, + )) + global.SetSlot(100) + defer global.SetLeaderSchedule(nil) + defer global.SetSlot(0) + + tx, wire := testLegacyTransaction(t) + fetchCount := 0 + rpcServer := &RpcServer{ + packetSender: defaultPacketSender, + clusterNodesRefreshEvery: sendTransactionClusterNodesRefreshEvery, + sendTransactionLeaderForwardCount: sendTransactionLeaderForwardCount, + clusterNodesFetcher: func(context.Context) ([]*solanarpc.GetClusterNodesResult, error) { + fetchCount++ + return []*solanarpc.GetClusterNodesResult{ + {Pubkey: leaderA, TPU: stringPtr(listenerA.LocalAddr().String())}, + {Pubkey: leaderB, TPU: stringPtr(listenerB.LocalAddr().String())}, + {Pubkey: leaderC, TPU: stringPtr(listenerC.LocalAddr().String())}, + {Pubkey: leaderD, TPU: stringPtr(listenerD.LocalAddr().String())}, + {Pubkey: leaderE, TPU: stringPtr(listenerE.LocalAddr().String())}, + {Pubkey: leaderF, TPU: stringPtr(listenerF.LocalAddr().String())}, + }, nil + }, + } + + gotSig, err := rpcServer.SendTransaction( + context.Background(), + mustRawParams(t, []interface{}{ + base64.StdEncoding.EncodeToString(wire), + map[string]interface{}{"encoding": "base64", "skipPreflight": true}, + }), + ) + require.NoError(t, err) + assert.Equal(t, tx.Signatures[0].String(), gotSig) + assert.Equal(t, 1, fetchCount, "first send should populate the TPU cache once") + + assert.Equal(t, wire, mustReadUDP(t, listenerA)) + assert.Equal(t, wire, mustReadUDP(t, listenerB)) + assert.Equal(t, wire, mustReadUDP(t, listenerC)) + assert.Equal(t, wire, mustReadUDP(t, listenerD)) + assert.Equal(t, wire, mustReadUDP(t, listenerE)) + assert.Equal(t, wire, mustReadUDP(t, listenerF)) +} + +func TestResolveUpcomingLeaderTPUAddresses_UsesFreshCacheWithoutRefetch(t *testing.T) { + leaderA := solana.PublicKey{0x01} + leaderB := solana.PublicKey{0x02} + leaderC := solana.PublicKey{0x03} + leaderD := solana.PublicKey{0x04} + leaderE := solana.PublicKey{0x05} + leaderF := solana.PublicKey{0x06} + + global.SetLeaderSchedule(leaderschedule.NewLeaderScheduleFromKeyedSlots( + map[solana.PublicKey][]uint64{ + leaderA: {0}, + leaderB: {1}, + leaderC: {2}, + leaderD: {3}, + leaderE: {4}, + leaderF: {5}, + }, + 500, + )) + global.SetSlot(500) + defer global.SetLeaderSchedule(nil) + defer global.SetSlot(0) + + fetchCount := 0 + rpcServer := &RpcServer{ + clusterNodesRefreshEvery: sendTransactionClusterNodesRefreshEvery, + sendTransactionLeaderForwardCount: sendTransactionLeaderForwardCount, + clusterNodesFetcher: func(context.Context) ([]*solanarpc.GetClusterNodesResult, error) { + fetchCount++ + return []*solanarpc.GetClusterNodesResult{ + {Pubkey: leaderA, TPU: stringPtr("127.0.0.1:9001")}, + {Pubkey: leaderB, TPU: stringPtr("127.0.0.1:9002")}, + {Pubkey: leaderC, TPU: stringPtr("127.0.0.1:9003")}, + {Pubkey: leaderD, TPU: stringPtr("127.0.0.1:9004")}, + {Pubkey: leaderE, TPU: stringPtr("127.0.0.1:9005")}, + {Pubkey: leaderF, TPU: stringPtr("127.0.0.1:9006")}, + }, nil + }, + } + + require.NoError(t, rpcServer.refreshLeaderTPUCache(context.Background())) + targets, err := rpcServer.resolveUpcomingLeaderTPUAddresses(context.Background(), sendTransactionTargetCount) + require.NoError(t, err) + require.Len(t, targets, sendTransactionTargetCount) + assert.Equal(t, 1, fetchCount, "fresh cache should satisfy resolution without another RPC poll") +} + +func TestResolveUpcomingLeaderTPUAddresses_RefreshesStaleCache(t *testing.T) { + leaderA := solana.PublicKey{0x11} + leaderB := solana.PublicKey{0x12} + leaderC := solana.PublicKey{0x13} + leaderD := solana.PublicKey{0x14} + leaderE := solana.PublicKey{0x15} + leaderF := solana.PublicKey{0x16} + + global.SetLeaderSchedule(leaderschedule.NewLeaderScheduleFromKeyedSlots( + map[solana.PublicKey][]uint64{ + leaderA: {0}, + leaderB: {1}, + leaderC: {2}, + leaderD: {3}, + leaderE: {4}, + leaderF: {5}, + }, + 900, + )) + global.SetSlot(900) + defer global.SetLeaderSchedule(nil) + defer global.SetSlot(0) + + fetchCount := 0 + rpcServer := &RpcServer{ + clusterNodesRefreshEvery: 10 * time.Minute, + sendTransactionLeaderForwardCount: sendTransactionLeaderForwardCount, + clusterNodesFetcher: func(context.Context) ([]*solanarpc.GetClusterNodesResult, error) { + fetchCount++ + return []*solanarpc.GetClusterNodesResult{ + {Pubkey: leaderA, TPU: stringPtr("127.0.0.1:9101")}, + {Pubkey: leaderB, TPU: stringPtr("127.0.0.1:9102")}, + {Pubkey: leaderC, TPU: stringPtr("127.0.0.1:9103")}, + {Pubkey: leaderD, TPU: stringPtr("127.0.0.1:9104")}, + {Pubkey: leaderE, TPU: stringPtr("127.0.0.1:9105")}, + {Pubkey: leaderF, TPU: stringPtr("127.0.0.1:9106")}, + }, nil + }, + } + + require.NoError(t, rpcServer.refreshLeaderTPUCache(context.Background())) + rpcServer.leaderTPUCacheMu.Lock() + rpcServer.leaderTPUCacheUpdatedAt = time.Now().Add(-11 * time.Minute) + rpcServer.leaderTPUCacheMu.Unlock() + + targets, err := rpcServer.resolveUpcomingLeaderTPUAddresses(context.Background(), sendTransactionTargetCount) + require.NoError(t, err) + require.Len(t, targets, sendTransactionTargetCount) + assert.Equal(t, 2, fetchCount, "stale cache should trigger a refresh before resolving targets") +} + +func mustRawParams(t *testing.T, params []interface{}) jsonrpc.RawParams { + t.Helper() + raw, err := json.Marshal(params) + require.NoError(t, err) + return jsonrpc.RawParams(raw) +} + +func testLegacyTransaction(t *testing.T) (*solana.Transaction, []byte) { + t.Helper() + + tx := &solana.Transaction{ + Signatures: []solana.Signature{{}}, + Message: solana.Message{ + Header: solana.MessageHeader{NumRequiredSignatures: 1}, + AccountKeys: []solana.PublicKey{{0x11}}, + }, + } + + wire, err := tx.MarshalBinary() + require.NoError(t, err) + return tx, wire +} + +func mustListenUDP(t *testing.T) *net.UDPConn { + t.Helper() + + conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0}) + require.NoError(t, err) + return conn +} + +func mustReadUDP(t *testing.T, conn *net.UDPConn) []byte { + t.Helper() + + buf := make([]byte, packetDataSize) + require.NoError(t, conn.SetReadDeadline(time.Now().Add(2*time.Second))) + n, _, err := conn.ReadFromUDP(buf) + require.NoError(t, err) + out := make([]byte, n) + copy(out, buf[:n]) + return out +} + +func stringPtr(v string) *string { + return &v +} From 1af7f3d9205b5f2f6eb2d2a30f03969ae3951fed Mon Sep 17 00:00:00 2001 From: smcio Date: Fri, 8 May 2026 18:09:30 +0200 Subject: [PATCH 2/6] modifications to test program --- cmd/sendtxprobe/main.go | 156 ++++++++++++++++++++-------------------- 1 file changed, 79 insertions(+), 77 deletions(-) diff --git a/cmd/sendtxprobe/main.go b/cmd/sendtxprobe/main.go index b1708ea8..bb14328a 100644 --- a/cmd/sendtxprobe/main.go +++ b/cmd/sendtxprobe/main.go @@ -11,26 +11,20 @@ import ( "time" "github.com/gagliardetto/solana-go" - "github.com/gagliardetto/solana-go/programs/system" "github.com/gagliardetto/solana-go/rpc" ) -const ( - defaultAirdropLamports = 20_000_000 - defaultTransferLamports = 1_000_000 - defaultCluster = "testnet" -) +const defaultCluster = "mainnet-beta" type config struct { - mithrilRPC string - cluster string - clusterRPC string - airdropLamports uint64 - transferLamports uint64 - skipPreflight bool - timeout time.Duration - pollInterval time.Duration - postAirdropSettle time.Duration + mithrilRPC string + cluster string + clusterRPC string + senderPrivateKey string + memoText string + skipPreflight bool + timeout time.Duration + pollInterval time.Duration } func main() { @@ -53,24 +47,20 @@ func parseFlags() (config, error) { cfg := config{} flag.StringVar(&cfg.mithrilRPC, "mithril-rpc", "", "HTTP URL for the Mithril RPC endpoint to exercise") - flag.StringVar(&cfg.cluster, "cluster", defaultCluster, "Public Solana cluster for funding/confirmation: testnet or devnet") - flag.StringVar(&cfg.clusterRPC, "cluster-rpc", "", "Override the public cluster RPC URL used for airdrop and confirmation") - flag.Uint64Var(&cfg.airdropLamports, "airdrop-lamports", defaultAirdropLamports, "Lamports to request from the public faucet") - flag.Uint64Var(&cfg.transferLamports, "transfer-lamports", defaultTransferLamports, "Lamports to send through Mithril") + flag.StringVar(&cfg.cluster, "cluster", defaultCluster, "Cluster for confirmation: mainnet-beta, mainnet, testnet, or devnet") + flag.StringVar(&cfg.clusterRPC, "cluster-rpc", "", "Override the public cluster RPC URL used for confirmation") + flag.StringVar(&cfg.senderPrivateKey, "sender-private-key", "", "Base58 sender private key used to sign the probe transaction") + flag.StringVar(&cfg.memoText, "memo", "", "Optional memo payload; defaults to a generated probe string") flag.BoolVar(&cfg.skipPreflight, "skip-preflight", false, "Pass skipPreflight=true to Mithril sendTransaction") - flag.DurationVar(&cfg.timeout, "timeout", 3*time.Minute, "Max time to wait for the airdrop and transfer confirmations") + flag.DurationVar(&cfg.timeout, "timeout", 3*time.Minute, "Max time to wait for transaction confirmation") flag.DurationVar(&cfg.pollInterval, "poll-interval", 2*time.Second, "Polling interval for signature confirmation") - flag.DurationVar(&cfg.postAirdropSettle, "post-airdrop-settle", 5*time.Second, "Extra wait after the airdrop confirms before submitting via Mithril") flag.Parse() if cfg.mithrilRPC == "" { return config{}, errors.New("must provide -mithril-rpc") } - if cfg.transferLamports == 0 { - return config{}, errors.New("-transfer-lamports must be greater than zero") - } - if cfg.airdropLamports <= cfg.transferLamports { - return config{}, fmt.Errorf("-airdrop-lamports (%d) must be greater than -transfer-lamports (%d)", cfg.airdropLamports, cfg.transferLamports) + if cfg.senderPrivateKey == "" { + return config{}, errors.New("must provide -sender-private-key") } if cfg.timeout <= 0 { return config{}, errors.New("-timeout must be greater than zero") @@ -78,6 +68,13 @@ func parseFlags() (config, error) { if cfg.pollInterval <= 0 { return config{}, errors.New("-poll-interval must be greater than zero") } + + cluster, err := canonicalClusterName(cfg.cluster) + if err != nil { + return config{}, err + } + cfg.cluster = cluster + if cfg.clusterRPC == "" { clusterRPC, err := clusterRPCFor(cfg.cluster) if err != nil { @@ -96,44 +93,21 @@ func run(ctx context.Context, cfg config) error { mithrilClient := rpc.New(cfg.mithrilRPC) defer mithrilClient.Close() - sender := solana.NewWallet() - recipient := solana.NewWallet() + sender, err := loadSenderWallet(cfg.senderPrivateKey) + if err != nil { + return err + } fmt.Printf("Mithril RPC: %s\n", cfg.mithrilRPC) + fmt.Printf("Cluster: %s\n", cfg.cluster) fmt.Printf("Cluster RPC: %s\n", cfg.clusterRPC) fmt.Printf("Sender: %s\n", sender.PublicKey()) - fmt.Printf("Recipient: %s\n", recipient.PublicKey()) - - fmt.Printf("Requesting airdrop of %d lamports for sender...\n", cfg.airdropLamports) - airdropSig, err := clusterClient.RequestAirdrop(ctx, sender.PublicKey(), cfg.airdropLamports, rpc.CommitmentConfirmed) - if err != nil { - return formatAirdropError(cfg.cluster, cfg.clusterRPC, err) - } - fmt.Printf("Airdrop signature: %s\n", airdropSig) - - if err := waitForSignature(ctx, clusterClient, airdropSig, cfg.timeout, cfg.pollInterval, "airdrop"); err != nil { - return err - } - senderBalance, err := clusterClient.GetBalance(ctx, sender.PublicKey(), rpc.CommitmentConfirmed) + preBalance, err := clusterClient.GetBalance(ctx, sender.PublicKey(), rpc.CommitmentConfirmed) if err != nil { return fmt.Errorf("get sender balance from %s: %w", cfg.clusterRPC, err) } - fmt.Printf("Sender confirmed balance: %d lamports\n", senderBalance.Value) - if senderBalance.Value < cfg.transferLamports { - return fmt.Errorf("sender balance %d is lower than transfer amount %d", senderBalance.Value, cfg.transferLamports) - } - - if cfg.postAirdropSettle > 0 { - fmt.Printf("Waiting %s for Mithril to catch up with the airdrop...\n", cfg.postAirdropSettle) - timer := time.NewTimer(cfg.postAirdropSettle) - defer timer.Stop() - select { - case <-ctx.Done(): - return ctx.Err() - case <-timer.C: - } - } + fmt.Printf("Sender balance before submit: %d lamports\n", preBalance.Value) latestBlockhash, err := mithrilClient.GetLatestBlockhash(ctx, rpc.CommitmentConfirmed) if err != nil { @@ -144,14 +118,13 @@ func run(ctx context.Context, cfg config) error { } fmt.Printf("Mithril latest blockhash: %s\n", latestBlockhash.Value.Blockhash) - instruction, err := system.NewTransferInstruction( - cfg.transferLamports, - sender.PublicKey(), - recipient.PublicKey(), - ).ValidateAndBuild() - if err != nil { - return fmt.Errorf("build transfer instruction: %w", err) + memoText := cfg.memoText + if memoText == "" { + memoText = fmt.Sprintf("mithril sendtxprobe %s", time.Now().UTC().Format(time.RFC3339Nano)) } + fmt.Printf("Memo payload: %q\n", memoText) + + instruction := buildMemoInstruction(sender.PublicKey(), memoText) tx, err := solana.NewTransaction( []solana.Instruction{instruction}, @@ -181,34 +154,69 @@ func run(ctx context.Context, cfg config) error { } fmt.Printf("Submitted via Mithril sendTransaction: %s\n", txSig) - if err := waitForSignature(ctx, clusterClient, txSig, cfg.timeout, cfg.pollInterval, "transfer"); err != nil { + if err := waitForSignature(ctx, clusterClient, txSig, cfg.timeout, cfg.pollInterval, "memo"); err != nil { return err } - recipientBalance, err := clusterClient.GetBalance(ctx, recipient.PublicKey(), rpc.CommitmentConfirmed) + postBalance, err := clusterClient.GetBalance(ctx, sender.PublicKey(), rpc.CommitmentConfirmed) if err != nil { - return fmt.Errorf("get recipient balance from %s: %w", cfg.clusterRPC, err) + return fmt.Errorf("get sender post-submit balance from %s: %w", cfg.clusterRPC, err) } - fmt.Printf("Recipient confirmed balance: %d lamports\n", recipientBalance.Value) - if recipientBalance.Value < cfg.transferLamports { - return fmt.Errorf("recipient balance %d is lower than transfer amount %d", recipientBalance.Value, cfg.transferLamports) + fmt.Printf("Sender balance after submit: %d lamports\n", postBalance.Value) + if postBalance.Value > preBalance.Value { + fmt.Printf("Balance delta: +%d lamports\n", postBalance.Value-preBalance.Value) + } else { + fmt.Printf("Balance delta: -%d lamports\n", preBalance.Value-postBalance.Value) } fmt.Printf("sendTransaction flow succeeded through Mithril.\n") return nil } -func clusterRPCFor(cluster string) (string, error) { +func canonicalClusterName(cluster string) (string, error) { switch strings.ToLower(strings.TrimSpace(cluster)) { + case "mainnet", "mainnet-beta": + return "mainnet-beta", nil + case "testnet": + return "testnet", nil + case "devnet": + return "devnet", nil + default: + return "", fmt.Errorf("unsupported -cluster %q; expected mainnet-beta, mainnet, testnet, or devnet", cluster) + } +} + +func clusterRPCFor(cluster string) (string, error) { + switch cluster { + case "mainnet-beta": + return rpc.MainNetBeta_RPC, nil case "testnet": return rpc.TestNet_RPC, nil case "devnet": return rpc.DevNet_RPC, nil default: - return "", fmt.Errorf("unsupported -cluster %q; expected testnet or devnet", cluster) + return "", fmt.Errorf("unsupported -cluster %q; expected mainnet-beta, mainnet, testnet, or devnet", cluster) } } +func loadSenderWallet(privateKey string) (*solana.Wallet, error) { + wallet, err := solana.WalletFromPrivateKeyBase58(privateKey) + if err != nil { + return nil, fmt.Errorf("load sender private key: %w", err) + } + return wallet, nil +} + +func buildMemoInstruction(sender solana.PublicKey, memoText string) solana.Instruction { + return solana.NewInstruction( + solana.MemoProgramID, + solana.AccountMetaSlice{ + solana.Meta(sender).SIGNER(), + }, + []byte(memoText), + ) +} + func waitForSignature( ctx context.Context, client *rpc.Client, @@ -269,9 +277,3 @@ func signatureConfirmed(status *rpc.SignatureStatusesResult) bool { } } -func formatAirdropError(cluster string, endpoint string, err error) error { - if strings.EqualFold(cluster, "testnet") { - return fmt.Errorf("request airdrop via %s: %w (testnet faucet/public RPC can be flaky; retry, override -cluster-rpc, or try -cluster devnet)", endpoint, err) - } - return fmt.Errorf("request airdrop via %s: %w", endpoint, err) -} From 7013400175d0a874217657b66ae2bc1178422d3f Mon Sep 17 00:00:00 2001 From: smcio Date: Sat, 9 May 2026 17:08:09 +0200 Subject: [PATCH 3/6] fix occasional cu divergence --- pkg/sealevel/execution_ctx.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/sealevel/execution_ctx.go b/pkg/sealevel/execution_ctx.go index 0739e8d5..7d74a38f 100644 --- a/pkg/sealevel/execution_ctx.go +++ b/pkg/sealevel/execution_ctx.go @@ -235,6 +235,13 @@ func (execCtx *ExecutionCtx) ProcessInstruction(instrData []byte, instructionAcc } metrics.GlobalBlockReplay.IxPush.AddTimingSince(start) + if len(programIndices) > 0 { + programKey, keyErr := execCtx.TransactionContext.KeyOfAccountAtIndex(programIndices[len(programIndices)-1]) + if keyErr == nil { + execCtx.TransactionContext.SetReturnData(programKey, []byte{}) + } + } + err1 := execCtx.ExecuteInstruction() start = time.Now() From 3cde17c911cf992d22cbc11ce83bf651b2f31995 Mon Sep 17 00:00:00 2001 From: smcio Date: Sat, 9 May 2026 21:35:18 +0200 Subject: [PATCH 4/6] rework cu reporting --- pkg/replay/block.go | 34 +++++++++++++++++++--------------- pkg/replay/transaction.go | 32 +++++++++++++++++++++----------- pkg/sealevel/execution_ctx.go | 23 ++++++++++++----------- 3 files changed, 52 insertions(+), 37 deletions(-) diff --git a/pkg/replay/block.go b/pkg/replay/block.go index 8b549b47..63c0a4b5 100644 --- a/pkg/replay/block.go +++ b/pkg/replay/block.go @@ -2018,19 +2018,16 @@ func ReplayBlocks( slotReplayDuration := time.Since(start) - // Calculate slot stats: vote/non-vote tx counts and total CU + // Calculate slot stats: vote/non-vote tx counts and locally replayed CU. var voteTxCount, nonVoteTxCount int - var totalCU uint64 - for i, tx := range block.Transactions { + for _, tx := range block.Transactions { if tx.IsVote() { voteTxCount++ } else { nonVoteTxCount++ } - if i < len(block.TxMetas) && block.TxMetas[i] != nil && block.TxMetas[i].ComputeUnitsConsumed != nil { - totalCU += *block.TxMetas[i].ComputeUnitsConsumed - } } + totalCU := lastSlotCtx.TotalComputeUnitsConsumed // Get leader from block (set by configureBlock in live mode, or by block source in verify mode) leaderStr := "unknown" @@ -2439,15 +2436,17 @@ func newSlotCtx(block *b.Block, accts accounts.Accounts, parentAccts accounts.Ac return slotCtx } -func sequentialTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, block *b.Block, dbgOpts *DebugOptions) fees.TxFeeInfoAccumulator { +func sequentialTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, block *b.Block, dbgOpts *DebugOptions) (fees.TxFeeInfoAccumulator, uint64) { var txFeeAccumulator fees.TxFeeInfoAccumulator + var totalComputeUnitsConsumed uint64 // process & execute each transaction in turn for idx, tx := range block.Transactions { var txMeta *rpc.TransactionMeta if block.TxMetas != nil { txMeta = block.TxMetas[idx] } - txFeeInfo, txErr := ProcessTransaction(slotCtx, sigverifyWg, tx, txMeta, dbgOpts, nil) + txFeeInfo, txComputeUnitsConsumed, txErr := ProcessTransaction(slotCtx, sigverifyWg, tx, txMeta, dbgOpts, nil) + totalComputeUnitsConsumed += txComputeUnitsConsumed if txMeta == nil { if txFeeInfo == nil { @@ -2481,7 +2480,7 @@ func sequentialTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, bl } txFeeAccumulator.Add(txFeeInfo) } - return txFeeAccumulator + return txFeeAccumulator, totalComputeUnitsConsumed } func lightbringerEntryExecutionBatches(transactions []*solana.Transaction, entry *b.TxEntry, relaxIntraBatchAccountLocks bool) [][]uint64 { @@ -2557,9 +2556,10 @@ func lightbringerEntryExecutionBatches(transactions []*solana.Transaction, entry return batches } -func parallelTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, block *b.Block, rblock *b.Block, txParallelism int, dbgOpts *DebugOptions) fees.TxFeeInfoAccumulator { +func parallelTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, block *b.Block, rblock *b.Block, txParallelism int, dbgOpts *DebugOptions) (fees.TxFeeInfoAccumulator, uint64) { var txFeeAccumulator fees.TxFeeInfoAccumulator txFeeInfos := make([]*fees.TxFeeInfo, len(block.Transactions)) + txComputeUnitsConsumed := make([]uint64, len(block.Transactions)) errs := make([]error, len(block.Transactions)) txDurations := make([]time.Duration, txParallelism) @@ -2585,7 +2585,7 @@ func parallelTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, bloc if idx < len(rblock.TxMetas) { txMeta = rblock.TxMetas[idx] } - txFeeInfos[idx], errs[idx] = ProcessTransaction(slotCtx, sigverifyWg, rblock.Transactions[idx], txMeta, dbgOpts, sealevel.BorrowedAccountArenas[i]) + txFeeInfos[idx], txComputeUnitsConsumed[idx], errs[idx] = ProcessTransaction(slotCtx, sigverifyWg, rblock.Transactions[idx], txMeta, dbgOpts, sealevel.BorrowedAccountArenas[i]) txErr := errs[idx] // check for success-failure return value divergences if txMeta != nil && txErr == nil && txMeta.Err != nil { @@ -2621,7 +2621,7 @@ func parallelTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, bloc if int(idx) < len(rblock.TxMetas) { txMeta = rblock.TxMetas[idx] } - txFeeInfos[idx], errs[idx] = ProcessTransaction(slotCtx, sigverifyWg, rblock.Transactions[idx], txMeta, dbgOpts, sealevel.BorrowedAccountArenas[workerIdx]) + txFeeInfos[idx], txComputeUnitsConsumed[idx], errs[idx] = ProcessTransaction(slotCtx, sigverifyWg, rblock.Transactions[idx], txMeta, dbgOpts, sealevel.BorrowedAccountArenas[workerIdx]) txErr := errs[idx] if txMeta != nil && txErr == nil && txMeta.Err != nil { mlog.Log.Errorf("[run:%s] DIVERGENCE in slot %d: tx %s succeeded locally but failed onchain: %+v", @@ -2656,7 +2656,9 @@ func parallelTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, bloc panic("dependency planner unavailable for non-Lightbringer block") } + var totalComputeUnitsConsumed uint64 for idx, txFeeInfo := range txFeeInfos { + totalComputeUnitsConsumed += txComputeUnitsConsumed[idx] if txFeeInfo == nil { // This happens when IsTransactionAgeValid returns false (blockhash not found) tx := block.Transactions[idx] @@ -2675,7 +2677,7 @@ func parallelTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, bloc txFeeAccumulator.Add(txFeeInfo) } - return txFeeAccumulator + return txFeeAccumulator, totalComputeUnitsConsumed } func ProcessBlock( @@ -2773,15 +2775,17 @@ func ProcessBlock( slotCtx := newSlotCtx(block, accts, parentAccts, acctsDb) slotCtx.TraceCtx = ctx var txFeeAccumulator fees.TxFeeInfoAccumulator + var totalComputeUnitsConsumed uint64 start = time.Now() setReplayStage("tx_loop") txLoopRegion := trace.StartRegion(ctx, "TxLoop") if txParallelism > 0 { - txFeeAccumulator = parallelTxLoop(slotCtx, &sigverifyWg, unresolvedBlock, block, txParallelism, dbgOpts) + txFeeAccumulator, totalComputeUnitsConsumed = parallelTxLoop(slotCtx, &sigverifyWg, unresolvedBlock, block, txParallelism, dbgOpts) } else { - txFeeAccumulator = sequentialTxLoop(slotCtx, &sigverifyWg, block, dbgOpts) + txFeeAccumulator, totalComputeUnitsConsumed = sequentialTxLoop(slotCtx, &sigverifyWg, block, dbgOpts) } + slotCtx.TotalComputeUnitsConsumed = totalComputeUnitsConsumed txLoopRegion.End() metrics.GlobalBlockReplay.TxLoop.AddTimingSince(start) diff --git a/pkg/replay/transaction.go b/pkg/replay/transaction.go index 88ed49a1..8f58582e 100644 --- a/pkg/replay/transaction.go +++ b/pkg/replay/transaction.go @@ -430,7 +430,14 @@ func cloneTransaction(tx *solana.Transaction) (*solana.Transaction, error) { return cloned, nil } -func ProcessTransaction(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, tx *solana.Transaction, txMeta *rpc.TransactionMeta, dbgOpts *DebugOptions, arena *arena.Arena[sealevel.BorrowedAccount]) (*fees.TxFeeInfo, error) { +func processTransactionComputeUnits(execCtx *sealevel.ExecutionCtx) uint64 { + if execCtx == nil { + return 0 + } + return execCtx.ComputeMeter.Used() +} + +func ProcessTransaction(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, tx *solana.Transaction, txMeta *rpc.TransactionMeta, dbgOpts *DebugOptions, arena *arena.Arena[sealevel.BorrowedAccount]) (*fees.TxFeeInfo, uint64, error) { if trace.IsEnabled() && slotCtx.TraceCtx != nil { regionType := "ProcessTransaction" if tx.IsVote() { @@ -452,14 +459,14 @@ func ProcessTransaction(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, if slotCtx.Features.IsActive(features.StaticInstructionLimit) { if len(tx.Message.Instructions) > maxInstrTraceCapacity { - return nil, TxErrSanitizeFailure + return nil, 0, TxErrSanitizeFailure } } start := time.Now() sigverifySnapshot, err := buildSigverifySnapshot(tx, slotCtx.Slot) if err != nil { - return nil, err + return nil, 0, err } sigverifyWg.Add(1) go verifySignatures(sigverifySnapshot, sigverifyWg) @@ -510,28 +517,31 @@ func ProcessTransaction(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, switch txErr.ErrorType { case TransactionErrorSanitizeFailure: - return nil, txErr.InstructionError + return nil, processTransactionComputeUnits(execCtx), txErr.InstructionError case TransactionErrorBlockhashNotFound: - return nil, TxErrInvalidBlockhash + return nil, processTransactionComputeUnits(execCtx), TxErrInvalidBlockhash case TransactionErrorMaxLoadedAccountsDataSizeExceeded, TransactionErrorInvalidProgramForExecution, TransactionErrorProgramAccountNotFound: - return handleFailedTx(slotCtx, tx, instrs, computeBudgetLimits, txErr.InstructionError, nil) + txFeeInfo, err := handleFailedTx(slotCtx, tx, instrs, computeBudgetLimits, txErr.InstructionError, nil) + return txFeeInfo, processTransactionComputeUnits(execCtx), err case TransactionErrorInsufficientFundsForFee: // CalculateAndDeductTxFees failed - return fee info with nil error (matches original behavior) - return output.FeeInfo, nil + return output.FeeInfo, processTransactionComputeUnits(execCtx), nil case TransactionErrorInstructionError: - return handleFailedTx(slotCtx, tx, instrs, computeBudgetLimits, txErr.InstructionError, nil) + txFeeInfo, err := handleFailedTx(slotCtx, tx, instrs, computeBudgetLimits, txErr.InstructionError, nil) + return txFeeInfo, processTransactionComputeUnits(execCtx), err case TransactionErrorInsufficientFundsForRent: - return handleFailedTx(slotCtx, tx, instrs, computeBudgetLimits, nil, txErr.InstructionError) + txFeeInfo, err := handleFailedTx(slotCtx, tx, instrs, computeBudgetLimits, nil, txErr.InstructionError) + return txFeeInfo, processTransactionComputeUnits(execCtx), err default: - return nil, txErr.InstructionError + return nil, processTransactionComputeUnits(execCtx), txErr.InstructionError } } @@ -610,5 +620,5 @@ func ProcessTransaction(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, recordStakeAndVoteAccounts(slotCtx, execCtx, writablePubkeySet) metrics.GlobalBlockReplay.TxUpdateAccounts.AddTimingSince(start) - return txFeeInfo, nil + return txFeeInfo, processTransactionComputeUnits(execCtx), nil } diff --git a/pkg/sealevel/execution_ctx.go b/pkg/sealevel/execution_ctx.go index 7d74a38f..cc7e8152 100644 --- a/pkg/sealevel/execution_ctx.go +++ b/pkg/sealevel/execution_ctx.go @@ -76,17 +76,18 @@ type SlotCtx struct { Features *features.Features VoteTimestampMu *sync.Mutex // VoteTimestampsMu protects VoteTimestamps - VoteTimestamps map[solana.PublicKey]BlockTimestamp - VoteAccts map[solana.PublicKey]uint64 - TotalEpochStake uint64 - FinalBankhash []byte - AcctsLtHash *lthash.LtHash - EpochsAcctHash []byte - Replay bool - LamportsBurnt uint64 - EahWorkaroundBankhash []byte - HasEahWorkaround bool - LatestEvictedBlockhash [32]byte + VoteTimestamps map[solana.PublicKey]BlockTimestamp + VoteAccts map[solana.PublicKey]uint64 + TotalEpochStake uint64 + FinalBankhash []byte + AcctsLtHash *lthash.LtHash + EpochsAcctHash []byte + Replay bool + LamportsBurnt uint64 + TotalComputeUnitsConsumed uint64 + EahWorkaroundBankhash []byte + HasEahWorkaround bool + LatestEvictedBlockhash [32]byte SerializedParameterArena *arena.Arena[byte] From 84ac791dfb2a3b44fd4fd25dfa6a7519a3dabe42 Mon Sep 17 00:00:00 2001 From: smcio Date: Sat, 9 May 2026 21:56:10 +0200 Subject: [PATCH 5/6] set lightbringer.quiet to true by default --- cmd/mithril/configcmd/edit.go | 8 +- cmd/mithril/configcmd/edit_test.go | 8 +- cmd/mithril/dashboardcmd/dashboard.go | 4 +- cmd/mithril/dashboardcmd/data.go | 79 ++++++++------- cmd/mithril/setupcmd/setup.go | 141 ++++++++++++++------------ cmd/mithril/setupcmd/setup_test.go | 6 +- config.example.toml | 5 +- pkg/config/config.go | 10 +- 8 files changed, 144 insertions(+), 117 deletions(-) diff --git a/cmd/mithril/configcmd/edit.go b/cmd/mithril/configcmd/edit.go index fc1a5545..30bbc715 100644 --- a/cmd/mithril/configcmd/edit.go +++ b/cmd/mithril/configcmd/edit.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" + "github.com/Overclock-Validator/mithril/pkg/config" "github.com/Overclock-Validator/mithril/pkg/tui" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" @@ -296,8 +297,8 @@ func (m editModel) currentItems() []edItem { return items case edScrLightbringerQuiet: return []edItem{ - {label: "Normal logs", value: "false", desc: "Show all info messages (default)"}, - {label: "Quiet mode", value: "true", desc: "Only warnings and errors — recommended for long runs"}, + {label: "Quiet mode", value: "true", desc: "Only warnings and errors — default and recommended for long runs"}, + {label: "Normal logs", value: "false", desc: "Show all info messages"}, {isSep: true}, {label: "← Back", value: "_back"}, } @@ -441,7 +442,7 @@ func (m *editModel) handleSelect(value string) { m.pushInput(edScrGossip) case "disable": m.lbEnabled = false - m.lbQuiet = false // Reset dependent state so disable→re-enable starts clean. + m.lbQuiet = config.LightbringerQuietDefault // Reset dependent state so disable→re-enable starts clean. m.goBack() case "quiet": m.pushMenu(edScrLightbringerQuiet) @@ -906,6 +907,7 @@ func runConfigEdit() { } v := viper.New() + config.ApplyDefaults(v) v.SetConfigFile(configFile) if err := v.ReadInConfig(); err != nil { fmt.Printf("Error reading config: %v\n", err) diff --git a/cmd/mithril/configcmd/edit_test.go b/cmd/mithril/configcmd/edit_test.go index 8717736f..82e4dd2b 100644 --- a/cmd/mithril/configcmd/edit_test.go +++ b/cmd/mithril/configcmd/edit_test.go @@ -19,9 +19,9 @@ func TestEditor_ScrLightbringerQuiet_False(t *testing.T) { assert.False(t, m.lbQuiet, "picking 'false' should set lbQuiet=false") } -// TestEditor_DisableLB_ResetsQuiet verifies that "disable" resets m.lbQuiet -// so a later re-enable starts clean (no stale quiet state carried over). -func TestEditor_DisableLB_ResetsQuiet(t *testing.T) { +// TestEditor_DisableLB_ResetsQuiet verifies that "disable" resets m.lbQuiet to +// the default so a later re-enable starts clean (no stale quiet state carried over). +func TestEditor_DisableLB_ResetsQuietDefault(t *testing.T) { m := &editModel{ screen: edScrLightbringer, lbEnabled: true, @@ -29,7 +29,7 @@ func TestEditor_DisableLB_ResetsQuiet(t *testing.T) { } m.handleSelect("disable") assert.False(t, m.lbEnabled, "disable should set lbEnabled=false") - assert.False(t, m.lbQuiet, "disable should reset lbQuiet to avoid stale state on re-enable") + assert.True(t, m.lbQuiet, "disable should reset lbQuiet to the default to avoid stale state on re-enable") } // TestEditor_EnableLB_PreservesQuiet ensures enable does not clobber quiet. diff --git a/cmd/mithril/dashboardcmd/dashboard.go b/cmd/mithril/dashboardcmd/dashboard.go index b310b3f2..1169533a 100644 --- a/cmd/mithril/dashboardcmd/dashboard.go +++ b/cmd/mithril/dashboardcmd/dashboard.go @@ -767,8 +767,8 @@ func menuOptionsFor(section, key string) []editOption { } case "lightbringer.quiet": return []editOption{ - {label: "false", value: "false", desc: "Show all info messages (default)"}, - {label: "true", value: "true", desc: "Only warnings and errors — recommended for long runs"}, + {label: "true", value: "true", desc: "Only warnings and errors — default and recommended for long runs"}, + {label: "false", value: "false", desc: "Show all info messages"}, } case "log.level": return []editOption{ diff --git a/cmd/mithril/dashboardcmd/data.go b/cmd/mithril/dashboardcmd/data.go index fb675d13..604b22a1 100644 --- a/cmd/mithril/dashboardcmd/data.go +++ b/cmd/mithril/dashboardcmd/data.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/Overclock-Validator/mithril/pkg/config" "github.com/Overclock-Validator/mithril/pkg/tui" "github.com/spf13/viper" ) @@ -60,30 +61,31 @@ func readState(accountsPath string) *nodeState { // ── Config reading ────────────────────────────────────────────────────── type configData struct { - cluster string - rpcEndpoints []string - blockSource string - lbEnabled bool - lbGossip string - lbGrpcAddr string + cluster string + rpcEndpoints []string + blockSource string + lbEnabled bool + lbGossip string + lbGrpcAddr string lbRpcAddr string lbExternalEndpoint string // block.lightbringer_endpoint for external LB mode lbBinaryPath string lbQuiet bool - accountsPath string - snapshotsPath string - shredstorePath string - logsPath string - txpar string - blockMaxRPS string - blockInflight string - rpcPort string - logLevel string - bootstrapMode string + accountsPath string + snapshotsPath string + shredstorePath string + logsPath string + txpar string + blockMaxRPS string + blockInflight string + rpcPort string + logLevel string + bootstrapMode string } func readConfig(configFile string) *configData { v := viper.New() + config.ApplyDefaults(v) v.SetConfigFile(configFile) if err := v.ReadInConfig(); err != nil { return nil @@ -106,42 +108,42 @@ func readConfig(configFile string) *configData { } return &configData{ - cluster: cluster, - rpcEndpoints: v.GetStringSlice("network.rpc"), - blockSource: v.GetString("block.source"), - lbEnabled: v.GetBool("lightbringer.enabled"), - lbGossip: v.GetString("lightbringer.gossip_entrypoint"), - lbGrpcAddr: v.GetString("lightbringer.grpc_addr"), + cluster: cluster, + rpcEndpoints: v.GetStringSlice("network.rpc"), + blockSource: v.GetString("block.source"), + lbEnabled: v.GetBool("lightbringer.enabled"), + lbGossip: v.GetString("lightbringer.gossip_entrypoint"), + lbGrpcAddr: v.GetString("lightbringer.grpc_addr"), lbRpcAddr: v.GetString("lightbringer.rpc_addr"), lbQuiet: v.GetBool("lightbringer.quiet"), lbExternalEndpoint: v.GetString("block.lightbringer_endpoint"), lbBinaryPath: v.GetString("lightbringer.binary_path"), - accountsPath: v.GetString("storage.accounts"), - snapshotsPath: v.GetString("storage.snapshots"), - shredstorePath: v.GetString("storage.shredstore"), - logsPath: logsPath, - txpar: txpar, - blockMaxRPS: v.GetString("block.max_rps"), - blockInflight: v.GetString("block.max_inflight"), - rpcPort: v.GetString("rpc.port"), - logLevel: v.GetString("log.level"), - bootstrapMode: v.GetString("bootstrap.mode"), + accountsPath: v.GetString("storage.accounts"), + snapshotsPath: v.GetString("storage.snapshots"), + shredstorePath: v.GetString("storage.shredstore"), + logsPath: logsPath, + txpar: txpar, + blockMaxRPS: v.GetString("block.max_rps"), + blockInflight: v.GetString("block.max_inflight"), + rpcPort: v.GetString("rpc.port"), + logLevel: v.GetString("log.level"), + bootstrapMode: v.GetString("bootstrap.mode"), } } // ── Service probing ───────────────────────────────────────────────────── type serviceStatus struct { - name string - addr string - up bool + name string + addr string + up bool } // Default service addresses (must match config template defaults). const ( - defaultRPCPort = "8899" - defaultLBGRPC = "127.0.0.1:3001" - defaultLBHTTP = "127.0.0.1:3000" + defaultRPCPort = "8899" + defaultLBGRPC = "127.0.0.1:3001" + defaultLBHTTP = "127.0.0.1:3000" ) func probeServices(cfg *configData) []serviceStatus { @@ -490,6 +492,7 @@ func saveConfigValue(configFile, section, key, value string) error { case fullKey == "network.rpc": // Preserve failover endpoints — read existing array, update first element v := viper.New() + config.ApplyDefaults(v) v.SetConfigFile(configFile) if err := v.ReadInConfig(); err == nil { existing := v.GetStringSlice("network.rpc") diff --git a/cmd/mithril/setupcmd/setup.go b/cmd/mithril/setupcmd/setup.go index 872f8e79..69217667 100644 --- a/cmd/mithril/setupcmd/setup.go +++ b/cmd/mithril/setupcmd/setup.go @@ -76,12 +76,12 @@ const ( scrLightbringer scrGossip scrLightbringerQuiet // log verbosity for managed Lightbringer (only shown when Lightbringer enabled) - scrStorage // accountsPath - scrStorageSnap // snapshotsPath - scrStorageLogs // logsPath + scrStorage // accountsPath + scrStorageSnap // snapshotsPath + scrStorageLogs // logsPath scrBootstrap - scrBlockTuning // maxRPS - scrBlockInflight // maxInflight + scrBlockTuning // maxRPS + scrBlockInflight // maxInflight scrReplay scrConsensus scrSnapshot @@ -102,30 +102,30 @@ type setupModel struct { height int // Input mode - editing bool - inputVal string - inputCur int // cursor position within input - inputErr string + editing bool + inputVal string + inputCur int // cursor position within input + inputErr string // Config values - mode string // quick, full, manual - cluster string - rpcEndpoint string - enableLB bool - gossipEntry string - lbQuiet bool // suppress Lightbringer info/debug logs - accountsPath string - snapshotsPath string - logsPath string - shredstorePath string - bootstrapMode string - blockMaxRPS string - blockInflight string - txpar string + mode string // quick, full, manual + cluster string + rpcEndpoint string + enableLB bool + gossipEntry string + lbQuiet bool // suppress Lightbringer info/debug logs + accountsPath string + snapshotsPath string + logsPath string + shredstorePath string + bootstrapMode string + blockMaxRPS string + blockInflight string + txpar string consensusPolicy string - snapshotKeep string - logLevel string - rpcPort string + snapshotKeep string + logLevel string + rpcPort string // System cpuCores int @@ -139,24 +139,25 @@ func newSetupModel() setupModel { absPath, _ := filepath.Abs(outputPath) storage := config.DefaultStoragePaths() return setupModel{ - screen: scrMode, - cpuCores: runtime.NumCPU(), - disks: DetectDisks(), - cluster: "mainnet-beta", - rpcEndpoint: "https://api.mainnet-beta.solana.com", - accountsPath: storage.Accounts, - snapshotsPath: storage.Snapshots, - logsPath: storage.Logs, - shredstorePath: storage.Shredstore, - bootstrapMode: "auto", - blockMaxRPS: "8", - blockInflight: "8", - txpar: fmt.Sprintf("%d", runtime.NumCPU()*2), + screen: scrMode, + cpuCores: runtime.NumCPU(), + disks: DetectDisks(), + cluster: "mainnet-beta", + rpcEndpoint: "https://api.mainnet-beta.solana.com", + lbQuiet: config.LightbringerQuietDefault, + accountsPath: storage.Accounts, + snapshotsPath: storage.Snapshots, + logsPath: storage.Logs, + shredstorePath: storage.Shredstore, + bootstrapMode: "auto", + blockMaxRPS: "8", + blockInflight: "8", + txpar: fmt.Sprintf("%d", runtime.NumCPU()*2), consensusPolicy: "halt", - snapshotKeep: "1", - logLevel: "info", - rpcPort: "8899", - configPath: absPath, + snapshotKeep: "1", + logLevel: "info", + rpcPort: "8899", + configPath: absPath, } } @@ -281,8 +282,8 @@ func (m setupModel) currentItems() []menuItem { } case scrLightbringerQuiet: return []menuItem{ - menuOptionDesc("Normal logs", "false", "Show all info messages (default)"), - menuOptionDesc("Quiet mode", "true", "Only warnings and errors — recommended for long runs"), + menuOptionDesc("Quiet mode", "true", "Only warnings and errors — default and recommended for long runs"), + menuOptionDesc("Normal logs", "false", "Show all info messages"), menuSeparator(), menuBack(), } @@ -403,7 +404,7 @@ func (m setupModel) handleSelect(value string) (tea.Model, tea.Cmd) { case scrLightbringer: m.enableLB = value == "enable" if !m.enableLB { - m.lbQuiet = false // Reset dependent state so disable→re-enable starts clean. + m.lbQuiet = config.LightbringerQuietDefault // Reset dependent state so disable→re-enable starts clean. } if m.enableLB { m.pushInput(scrGossip) @@ -562,38 +563,52 @@ func (m *setupModel) validateAndApplyInput() bool { m.gossipEntry = val case scrStorage: - if !m.requireNonEmpty(val) { return false } + if !m.requireNonEmpty(val) { + return false + } m.accountsPath = filepath.Clean(val) case scrStorageSnap: - if !m.requireNonEmpty(val) { return false } + if !m.requireNonEmpty(val) { + return false + } m.snapshotsPath = filepath.Clean(val) case scrStorageLogs: - if !m.requireNonEmpty(val) { return false } + if !m.requireNonEmpty(val) { + return false + } m.logsPath = filepath.Clean(val) case scrBlockTuning: if val != "" { - if !m.requirePositiveInt(val) { return false } + if !m.requirePositiveInt(val) { + return false + } m.blockMaxRPS = val } case scrBlockInflight: if val != "" { - if !m.requirePositiveInt(val) { return false } + if !m.requirePositiveInt(val) { + return false + } m.blockInflight = val } case scrReplay: if val != "" { - if !m.requirePositiveInt(val) { return false } + if !m.requirePositiveInt(val) { + return false + } m.txpar = val } case scrRPCPort: if val != "" { - if !m.requirePort(val) { return false } + if !m.requirePort(val) { + return false + } m.rpcPort = val } } @@ -652,13 +667,13 @@ func (m setupModel) View() string { case scrRPC: return banner + "\n" + renderInput("RPC Endpoint", "Solana RPC endpoint for fetching blocks and cluster data\n"+ - "Public endpoint works to start · upgrade to private RPC for production", + "Public endpoint works to start · upgrade to private RPC for production", m.inputVal, m.inputErr, m.inputCur) case scrGossip: return banner + "\n" + renderInput("Gossip Entrypoint", "IP:port of a Solana validator running gossip\n"+ - "Used to receive shreds from the network", + "Used to receive shreds from the network", m.inputVal, m.inputErr, m.inputCur) case scrStorage: @@ -685,38 +700,38 @@ func (m setupModel) View() string { case scrStorageSnap: return banner + "\n" + renderInput("Snapshots Path", "Downloaded snapshots for bootstrapping · ~100 GB for full + incremental\n"+ - "Can be on a slower drive than AccountsDB", + "Can be on a slower drive than AccountsDB", m.inputVal, m.inputErr, m.inputCur) case scrStorageLogs: return banner + "\n" + renderInput("Logs Path", "Runtime logs, replay timings, and diagnostics\n"+ - "Auto-rotated · each run gets its own directory", + "Auto-rotated · each run gets its own directory", m.inputVal, m.inputErr, m.inputCur) case scrBlockTuning: return banner + "\n" + renderInput("Max Requests Per Second", "How aggressively to fetch blocks from RPC\n"+ - "Match your provider's rate limit · typical: 5–10 for public, 50+ for private", + "Match your provider's rate limit · typical: 5–10 for public, 50+ for private", m.inputVal, m.inputErr, m.inputCur) case scrBlockInflight: return banner + "\n" + renderInput("Max Inflight Workers", "Concurrent block fetch workers · should match max RPS\n"+ - fmt.Sprintf("Current max RPS: %s", m.blockMaxRPS), + fmt.Sprintf("Current max RPS: %s", m.blockMaxRPS), m.inputVal, m.inputErr, m.inputCur) case scrReplay: rec := fmt.Sprintf("%d", m.cpuCores*2) return banner + "\n" + renderInput("Transaction Parallelism", fmt.Sprintf("Parallel workers for block execution\n"+ - "Your system: %d cores · recommended: %s workers (2× cores)", m.cpuCores, rec), + "Your system: %d cores · recommended: %s workers (2× cores)", m.cpuCores, rec), m.inputVal, m.inputErr, m.inputCur) case scrRPCPort: return banner + "\n" + renderInput("Mithril RPC Port", "JSON-RPC interface for querying Mithril's state\n"+ - "Default: 8899 · set to 0 to disable", + "Default: 8899 · set to 0 to disable", m.inputVal, m.inputErr, m.inputCur) case scrReview: @@ -842,9 +857,7 @@ func (m setupModel) generateConfig() (tea.Model, tea.Cmd) { fmt.Fprintf(&cfg, "gossip_entrypoint = %q\n", m.gossipEntry) cfg.WriteString("grpc_addr = \"127.0.0.1:3001\"\n") cfg.WriteString("rpc_addr = \"127.0.0.1:3000\"\n") - if m.lbQuiet { - cfg.WriteString("quiet = true\n") - } + fmt.Fprintf(&cfg, "quiet = %t\n", m.lbQuiet) cfg.WriteString("\n") } diff --git a/cmd/mithril/setupcmd/setup_test.go b/cmd/mithril/setupcmd/setup_test.go index 2793f89f..1df6f198 100644 --- a/cmd/mithril/setupcmd/setup_test.go +++ b/cmd/mithril/setupcmd/setup_test.go @@ -23,8 +23,8 @@ func TestHandleSelect_ScrLightbringerQuiet_False(t *testing.T) { } // TestHandleSelect_DisableLB_ResetsQuiet verifies that picking "disable" on -// Lightbringer clears m.lbQuiet so a later re-enable starts clean. -func TestHandleSelect_DisableLB_ResetsQuiet(t *testing.T) { +// Lightbringer resets m.lbQuiet to the default so a later re-enable starts clean. +func TestHandleSelect_DisableLB_ResetsQuietDefault(t *testing.T) { m := setupModel{ screen: scrLightbringer, mode: "full", @@ -34,7 +34,7 @@ func TestHandleSelect_DisableLB_ResetsQuiet(t *testing.T) { out, _ := m.handleSelect("disable") model := out.(setupModel) assert.False(t, model.enableLB, "disable should set enableLB=false") - assert.False(t, model.lbQuiet, "disable should reset lbQuiet=false to avoid stale state on re-enable") + assert.True(t, model.lbQuiet, "disable should reset lbQuiet to the default to avoid stale state on re-enable") } // TestHandleSelect_EnableLB_DoesNotChangeQuiet ensures picking "enable" diff --git a/config.example.toml b/config.example.toml index 25e5eff5..8dad5bb0 100644 --- a/config.example.toml +++ b/config.example.toml @@ -267,8 +267,9 @@ name = "mithril" # block_confirmation_rpc_http = "http://localhost:8899" # block_confirmation_rpc_websocket = "ws://localhost:8899" - # Suppress Lightbringer info/debug log lines (only warn/error). Default: false. - # quiet = true + # Suppress Lightbringer info/debug log lines (only warn/error). Default: true. + # Set quiet = false to show all info messages. + quiet = true # ============================================================================ # [rpc] - Mithril RPC Server diff --git a/pkg/config/config.go b/pkg/config/config.go index 4cbf1aeb..8b9dedc7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -8,6 +8,12 @@ import ( "github.com/spf13/viper" ) +const LightbringerQuietDefault = true + +func ApplyDefaults(v *viper.Viper) { + v.SetDefault("lightbringer.quiet", LightbringerQuietDefault) +} + // LedgerConfig holds ledger-related configuration (matches Firedancer [ledger] section) type LedgerConfig struct { Path string `toml:"path" mapstructure:"path"` // was: blockdir @@ -153,7 +159,7 @@ type LightbringerConfig struct { BlockConfirmRpcHTTP string `toml:"block_confirmation_rpc_http" mapstructure:"block_confirmation_rpc_http"` BlockConfirmRpcWS string `toml:"block_confirmation_rpc_websocket" mapstructure:"block_confirmation_rpc_websocket"` - // Quiet suppresses Lightbringer info/debug logs when true. Written as + // Quiet suppresses Lightbringer info/debug logs when true. Defaults to true. Written as // [log] quiet = true in the generated Lightbringer.toml. Quiet bool `toml:"quiet" mapstructure:"quiet"` } @@ -201,6 +207,8 @@ var ConfigFile string // If no --config flag is provided, defaults to "config.toml" in current directory. // CLI flag precedence is handled separately in initConfigAndBindFlags. func InitConfig() error { + ApplyDefaults(viper.GetViper()) + configPath := ConfigFile if configPath == "" { configPath = "config.toml" // Default config file From 46f041d53437ecbbd52d749cdc43272665047b81 Mon Sep 17 00:00:00 2001 From: smcio Date: Sat, 9 May 2026 22:04:26 +0200 Subject: [PATCH 6/6] rpcserver: increase sendTransactionLeaderForwardCount to 10 --- pkg/rpcserver/send_transaction.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rpcserver/send_transaction.go b/pkg/rpcserver/send_transaction.go index bd19e297..22885a1b 100644 --- a/pkg/rpcserver/send_transaction.go +++ b/pkg/rpcserver/send_transaction.go @@ -38,7 +38,7 @@ const ( maxBase58TxSize = 1683 maxBase64TxSize = 1644 packetDataSize = 1232 - sendTransactionLeaderForwardCount = 5 + sendTransactionLeaderForwardCount = 10 sendTransactionTargetCount = sendTransactionLeaderForwardCount + 1 sendTransactionLeaderLookahead = 64 sendTransactionClusterNodesRefreshEvery = 10 * time.Minute