diff --git a/.gitignore b/.gitignore index f147310f5ee..64a561a3dd9 100644 --- a/.gitignore +++ b/.gitignore @@ -76,4 +76,5 @@ __pycache__/ .envrc .direnv/ .dev/tasks.md +.dev/todo/ /.worktrees \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000000..945971192eb --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,27 @@ +# Changelog + +All notable changes to the StreamingFast Firehose fork of reth are documented here. + +This changelog covers Firehose-specific changes only. For upstream reth changes, see the +[official reth releases](https://github.com/paradigmxyz/reth/releases). + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). + +## v2.2.0-fh + +First Firehose-instrumented release on top of upstream reth v2.2.0. + +### Added + +- Add flashblocks support to `reth-firehose`: `start_flashblock_local` and `mark_flashblock` methods on `FirehoseBlockTracer` allow partial block ("flashblock") boundaries to be emitted during block execution. +- Add `SynchronizedStdout` for coordinated stdout writes across multiple concurrent tracer instances; stdout lock initialization is now handled internally by `init_tracer`. +- Expose prestate types and helpers as `pub` in `reth-firehose-tests` crate to allow reuse in downstream integration test suites. + +### Changed + +- `init_tracer` now accepts `Config` directly and sets up the stdout lock internally, removing the need for callers to manage stdout coordination themselves. +- Update `firehose-tracer` dependency to version 5.1.1. + +### Fixed + +- Restore the Firehose live-path hooks in the engine-tree payload validator so blocks arriving through the engine API (`newPayload` / `forkchoiceUpdated`) are traced again. The hooks had been dropped during a merge, leaving only the historical/stage execution path instrumented. diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index d661bf02784..a72320e21aa 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -77,7 +77,7 @@ use reth_payload_primitives::{ }; use reth_primitives_traits::{ AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives, - RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable, + RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable, TxTy, }; use reth_provider::{ providers::{OverlayBuilder, OverlayStateProviderFactory}, @@ -394,12 +394,13 @@ where where V: PayloadValidator + Clone, Evm: ConfigureEngineEvm, + TxTy: reth_firehose::mapper::SignatureFields, { // Spawn payload conversion on a background thread so it runs concurrently with the // rest of the function (setup + execution). For payloads this overlaps the cost of // RLP decoding + header hashing. let is_payload = matches!(&input, BlockOrPayload::Payload(_)); - let convert_to_block = match &input { + let mut convert_to_block_handle = match &input { BlockOrPayload::Payload(_) => { let payload_clone = input.clone(); let validator = self.validator.clone(); @@ -412,21 +413,28 @@ where validator.convert_payload_to_block(payload) }, ); - Either::Left(handle) + Some(handle) } - BlockOrPayload::Block(_) => Either::Right(()), + BlockOrPayload::Block(_) => None, }; // Returns the sealed block, either by awaiting the background conversion task (for // payloads) or by extracting the already-converted block directly. - let convert_to_block = + // + // Dispatches on the variant of the `input` it is handed rather than on the variant seen at + // spawn time, which makes it callable more than once: the Firehose preamble below resolves + // the block eagerly and rebuilds `input` as a `Block`, so the later call site returns the + // already-sealed block directly instead of touching the (already-consumed) conversion + // handle. + let mut convert_to_block = move |input: BlockOrPayload| -> Result, NewPayloadError> { - match convert_to_block { - Either::Left(handle) => handle.try_into_inner().expect("sole handle"), - Either::Right(()) => { - let BlockOrPayload::Block(block) = input else { unreachable!() }; - Ok(block) - } + match input { + BlockOrPayload::Block(block) => Ok(block), + BlockOrPayload::Payload(_) => convert_to_block_handle + .take() + .expect("sole handle") + .try_into_inner() + .expect("sole handle"), } }; @@ -564,15 +572,57 @@ where None }; + // Firehose: when the tracer is active, resolve the sealed block synchronously so we can + // start the block-level tracer guard before execution. The guard emits + // `on_block_start`/`on_genesis_block` now and defers `on_block_end(None)` until + // `mark_verified()` runs after post-execution validation. + // + // If any early return is taken between here and `mark_verified()`, the guard's Drop emits + // `on_block_end(Some(err))` so invalid blocks are never flushed downstream. + // + // `finalized` is `None` on the live engine path: the consensus layer hasn't necessarily + // advanced the finalized head by the time we're validating a payload. The pipeline + // (staged-sync) path advertises the finalized ref itself. + // + // We resolve the block eagerly here and rebuild `input` as a `Block` variant so the later + // `convert_to_block(input)?` site becomes a cheap unwrap of the already-sealed block. + // + // Block 1 is the genesis marker: `start` emits `on_genesis_block` as a standalone event and + // does NOT leave the tracer in "block state", so wrapping the executor would panic in + // `on_system_call_start`. Let the guard drop (a no-op for genesis) and fall through to the + // non-Firehose execution path. + let (mut fh_tracer, input): (Option, _) = + if reth_firehose::is_tracer_initialized() { + let sealed = convert_to_block(input)?; + let tracer = reth_firehose::FirehoseBlockTracer::start::(&sealed, None); + let fh_tracer = (!tracer.is_genesis()).then_some(tracer); + (fh_tracer, BlockOrPayload::Block(sealed)) + } else { + (None, input) + }; + // Execute the block and handle any execution errors. // The receipt root task is spawned before execution and receives receipts incrementally // as transactions complete, allowing parallel computation during execution. + // + // Two entry points exist for the same work: `execute_block` is the upstream-pristine path + // and `execute_and_trace_block` is its Firehose-enabled twin. We pick based on whether a + // live tracer guard is available for this block (see the Firehose preamble above for when + // the guard is `None` — notably the block-1 genesis marker). let execute_block_start = Instant::now(); - let (output, senders, receipt_root_rx) = - match self.execute_block(state_provider, env, &input, &mut handle) { + let (output, senders, receipt_root_rx) = match fh_tracer.as_mut() { + Some(tracer) => { + match self.execute_and_trace_block(state_provider, env, &input, tracer, &mut handle) + { + Ok(output) => output, + Err(err) => return self.handle_execution_error(input, err, &parent_block), + } + } + None => match self.execute_block(state_provider, env, &input, &mut handle) { Ok(output) => output, Err(err) => return self.handle_execution_error(input, err, &parent_block), - }; + }, + }; let execution_duration = execute_block_start.elapsed(); // After executing the block we can stop prewarming transactions @@ -836,6 +886,13 @@ where let changeset_provider = ensure_ok_post_block!(overlay_factory.database_provider_ro(), block); + // All post-execution validations passed — flush the Firehose block. Placed after the last + // fallible step so any earlier failure instead drops the guard (emitting + // `on_block_end(Some(err))`) and discards the block. + if let Some(guard) = fh_tracer.take() { + guard.mark_verified(); + } + let executed_block = self.spawn_deferred_trie_task( block, output, @@ -1004,6 +1061,141 @@ where Ok((output, senders, result_rx)) } + /// Firehose-enabled twin of [`Self::execute_block`]. + /// + /// This is a copy of `execute_block` with two surgical differences: + /// - the EVM is constructed via `evm_with_env_and_inspector` with a `FirehoseInspector` + /// borrowed from the supplied `tracer`; + /// - the resulting executor is wrapped in a + /// [`reth_firehose::executor::FirehoseWrappedExecutor`] so system-call boundaries and + /// withdrawals are funnelled into the tracer. + /// + /// MAINTENANCE CONTRACT: keep this function in sync with [`Self::execute_block`]. Any change to + /// the upstream logic (new metrics, error handling, ordering of pre/post steps, etc.) must be + /// mirrored here. The intent of duplicating rather than branching is to keep the diff against + /// upstream minimal and localized: upstream's `execute_block` stays pristine, and the + /// Firehose-specific wiring lives here. + #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)] + #[expect(clippy::type_complexity)] + fn execute_and_trace_block( + &mut self, + state_provider: S, + env: ExecutionEnv, + input: &BlockOrPayload, + tracer: &mut reth_firehose::FirehoseBlockTracer, + handle: &mut PayloadHandle, Err, N::Receipt>, + ) -> Result< + ( + BlockExecutionOutput, + Vec
, + tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>, + ), + InsertBlockErrorKind, + > + where + S: StateProvider + Send, + Err: core::error::Error + Send + Sync + 'static, + V: PayloadValidator, + T: PayloadTypes>, + Evm: ConfigureEngineEvm, + TxTy: reth_firehose::mapper::SignatureFields, + { + debug!(target: "engine::tree::payload_validator", "Executing block (with Firehose tracing)"); + + let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| { + State::builder() + .with_database(StateProviderDatabase::new(state_provider)) + .with_bundle_update() + .build() + }); + + // Firehose-specific: install the inspector on the EVM and wrap the executor so that + // system-call boundaries and withdrawals are forwarded to the tracer. Withdrawals are + // pre-materialized here because the wrapper emits them as part of the end-of-block event. + let withdrawals = + input.withdrawals().map(|w| alloy_eips::eip4895::Withdrawals::new(w.to_vec())); + let (spec_id, mut executor) = { + let _span = debug_span!(target: "engine::tree", "create_evm").entered(); + let spec_id = *env.evm_env.spec_id(); + let inspector = tracer.inspector(); + let evm = self.evm_config.evm_with_env_and_inspector(&mut db, env.evm_env, inspector); + let ctx = self + .execution_ctx_for(input) + .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?; + let inner = self.evm_config.create_executor(evm, ctx); + let executor = + reth_firehose::executor::FirehoseWrappedExecutor::new(inner, withdrawals); + (spec_id, executor) + }; + + if !self.config.precompile_cache_disabled() { + let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered(); + executor.evm_mut().precompiles_mut().map_cacheable_precompiles( + |address, precompile| { + let metrics = self + .precompile_cache_metrics + .entry(*address) + .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address)) + .clone(); + CachedPrecompile::wrap( + precompile, + self.precompile_cache_map.cache_for_address(*address), + spec_id, + Some(metrics), + ) + }, + ); + } + + // Spawn background task to compute receipt root and logs bloom incrementally. + // Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block). + let receipts_len = input.transaction_count(); + let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded(); + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx); + self.payload_processor + .executor() + .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len)); + + let transaction_count = input.transaction_count(); + let executed_tx_index = Arc::clone(handle.executed_tx_index()); + let executor = executor.with_state_hook( + handle.state_hook().map(|hook| Box::new(hook) as Box), + ); + + let execution_start = Instant::now(); + + // Execute all transactions and finalize + let (executor, senders) = self.execute_transactions( + executor, + transaction_count, + handle.iter_transactions(), + &receipt_tx, + &executed_tx_index, + )?; + drop(receipt_tx); + + // Finish execution and get the result + let post_exec_start = Instant::now(); + let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish") + .in_scope(|| executor.finish()) + .map(|(evm, result)| (evm.into_db(), result))?; + self.metrics.record_post_execution(post_exec_start.elapsed()); + + // Merge transitions into bundle state + debug_span!(target: "engine::tree", "merge_transitions") + .in_scope(|| db.merge_transitions(BundleRetention::Reverts)); + + let output = BlockExecutionOutput { result, state: db.take_bundle() }; + + let execution_duration = execution_start.elapsed(); + self.metrics.record_block_execution(&output, execution_duration); + self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration); + debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block (with Firehose tracing)"); + + Ok((output, senders, result_rx)) + } + /// Executes transactions and collects senders, streaming receipts to a background task. /// /// This method handles: @@ -1976,6 +2168,7 @@ where V: PayloadValidator + Clone, Evm: ConfigureEngineEvm + 'static, Types: PayloadTypes>, + TxTy: reth_firehose::mapper::SignatureFields, { fn validate_payload_attributes_against_header( &self, diff --git a/crates/engine/tree/tests/firehose_live_tracing.rs b/crates/engine/tree/tests/firehose_live_tracing.rs new file mode 100644 index 00000000000..a3cf4ea4166 --- /dev/null +++ b/crates/engine/tree/tests/firehose_live_tracing.rs @@ -0,0 +1,106 @@ +//! Regression test for Firehose tracing on the engine-tree live-block path. +//! +//! The Firehose "live-path" hooks in `tree::payload_validator` route blocks arriving through the +//! engine API (`newPayload` / `forkchoiceUpdated`) into the Firehose tracer. On `firehose/2.x` +//! those hooks were once dropped during a merge, silently disabling live-block tracing while the +//! historical/stage path kept working — a regression that compiled and passed every existing test. +//! +//! This test guards that path end-to-end: it installs a buffer-backed global tracer, drives a real +//! [`EthereumNode`] through `newPayload` (which calls `validate_block_with_state`, the path under +//! test), and asserts that `FIRE BLOCK` lines are emitted for the live blocks. If the dispatch into +//! `execute_and_trace_block` is missing, no `FIRE BLOCK` lines are produced and the test fails. +//! +//! It lives in its own integration-test binary (rather than alongside the other engine-tree tests) +//! because it installs a process-wide tracer; cargo/nextest run each integration binary in its own +//! process, keeping the global tracer isolated from the rest of the suite. + +use eyre::Result; +use reth_chainspec::{ChainSpecBuilder, MAINNET}; +use reth_e2e_test_utils::testsuite::{ + actions::{MakeCanonical, ProduceBlocks}, + setup::{NetworkSetup, Setup}, + TestBuilder, +}; +use reth_engine_tree::tree::TreeConfig; +use reth_ethereum_engine_primitives::EthEngineTypes; +use reth_node_ethereum::EthereumNode; +use std::sync::Arc; + +/// Number of blocks to produce. Block 1 is the Firehose genesis marker (emitted via +/// `on_genesis_block`); blocks 2.. exercise the live `execute_and_trace_block` path. +const PRODUCED_BLOCKS: u64 = 3; + +#[tokio::test] +async fn live_payload_validation_emits_firehose_blocks() -> Result<()> { + reth_tracing::init_test_tracing(); + + // Install a buffer-backed global Firehose tracer BEFORE the node validates any block, so the + // live path's `is_tracer_initialized()` gate activates and routes execution through + // `execute_and_trace_block`. Fork timings only affect how block contents are mapped, not + // whether a block is emitted; activate Shanghai + Cancun at genesis to match the + // `cancun_activated` chain spec below. + let buffer = reth_firehose::init_tracer_with_buffer( + MAINNET.chain.id(), + Some(0), // shanghai + Some(0), // cancun + None, // prague + ); + + let setup = Setup::::default() + .with_chain_spec(Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis( + serde_json::from_str(include_str!( + "../../../e2e-test-utils/src/testsuite/assets/genesis.json" + )) + .unwrap(), + ) + .cancun_activated() + .build(), + )) + .with_network(NetworkSetup::single_node()) + .with_tree_config( + TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true), + ); + + // Each produced block is submitted via `engine_newPayload`, which drives + // `validate_block_with_state` — the path under test. + let test = TestBuilder::new() + .with_setup(setup) + .with_action(ProduceBlocks::::new(PRODUCED_BLOCKS)) + .with_action(MakeCanonical::new()); + + test.run::().await?; + + // Collect the block numbers from every captured `FIRE BLOCK ...` line. + let raw = buffer.get_bytes(); + let text = String::from_utf8(raw).expect("captured tracer output is UTF-8"); + let traced: Vec = text + .lines() + .filter_map(|line| { + let mut parts = line.split(' '); + if parts.next()? != "FIRE" || parts.next()? != "BLOCK" { + return None; + } + parts.next()?.parse::().ok() + }) + .collect(); + + assert!( + !traced.is_empty(), + "no FIRE BLOCK lines were emitted — the live payload-validation path is not traced.\n\ + Captured tracer output:\n{text}" + ); + + // Blocks 2..=PRODUCED_BLOCKS go through the live `execute_and_trace_block` path (block 1 is the + // genesis marker, emitted separately via `on_genesis_block`). Require each to have been traced. + for number in 2..=PRODUCED_BLOCKS { + assert!( + traced.contains(&number), + "expected a FIRE BLOCK line for live block #{number}, got traced blocks {traced:?}" + ); + } + + Ok(()) +} diff --git a/crates/firehose/src/lib.rs b/crates/firehose/src/lib.rs index f9409ec9d43..bd9c275d342 100644 --- a/crates/firehose/src/lib.rs +++ b/crates/firehose/src/lib.rs @@ -116,6 +116,49 @@ pub fn init_tracer(config: firehose_tracer::config::Config) { .expect("init_tracer called more than once"); } +/// Initialize the process-wide tracer to capture all output into an in-memory buffer, returning a +/// handle to read it back. +/// +/// This is the buffer-backed counterpart to [`init_tracer`] (which writes to stdout): it builds a +/// fully blockchain-initialized [`firehose_tracer::Tracer`] over a +/// [`firehose_tracer::InMemoryBuffer`] and installs it as the process-wide tracer, so the live +/// engine path — [`is_tracer_initialized`] plus [`block_tracer::FirehoseBlockTracer::start`] — +/// becomes active and every `FIRE BLOCK` line is captured instead of printed. +/// +/// Intended for integration tests that drive the real validation path and need to assert on the +/// emitted Firehose blocks. Like [`init_tracer`], it must be called at most once per process. +/// +/// `shanghai_time` / `cancun_time` / `prague_time` are the timestamp-based fork activations the +/// tracer uses when mapping block contents (`Some(0)` = active from genesis, `None` = never). They +/// do not gate whether a block is emitted. +pub fn init_tracer_with_buffer( + chain_id: u64, + shanghai_time: Option, + cancun_time: Option, + prague_time: Option, +) -> firehose_tracer::InMemoryBuffer { + // Mirror `init_tracer`: ensure the shared stdout lock exists so any code path that later + // reaches for it (e.g. an additional flashblock tracer) does not panic. + let _ = init_stdout_lock(); + let (tracer, buffer) = firehose_tracer::Tracer::with_buffer( + firehose_tracer::config::Config::default(), + firehose_tracer::config::ChainConfig { + chain_id, + shanghai_time, + cancun_time, + prague_time, + verkle_time: None, + }, + "reth-firehose", + env!("CARGO_PKG_VERSION"), + ); + GLOBAL_TRACER + .set(Arc::new(Mutex::new(tracer))) + .ok() + .expect("init_tracer/init_tracer_with_buffer called more than once"); + buffer +} + /// Acquire exclusive access to the process-wide tracer. /// /// Panics if [`init_tracer`] has not been called yet, or if the mutex is poisoned. diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index a10c40841f7..4f23d00e799 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -24,7 +24,7 @@ use reth_chain_state::CanonStateSubscriptions; use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks, Hardforks}; use reth_node_api::{ AddOnsContext, BlockTy, EngineApiValidator, EngineTypes, FullNodeComponents, FullNodeTypes, - NodeAddOns, NodeTypes, PayloadTypes, PayloadValidator, PrimitivesTy, TreeConfig, + NodeAddOns, NodeTypes, PayloadTypes, PayloadValidator, PrimitivesTy, TreeConfig, TxTy, }; use reth_node_core::{ cli::config::RethTransactionPoolConfig, @@ -1452,6 +1452,7 @@ where ::Payload, Block = BlockTy, > + Clone, + TxTy: reth_firehose::mapper::SignatureFields, { type EngineValidator = BasicEngineValidator;