From a70eaa794942131d04ad232352955b129cfb744f Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Wed, 20 May 2026 18:51:13 +0000 Subject: [PATCH 1/5] chore: prepare v2.2.0-fh release Add CHANGELOG.md documenting Firehose-specific changes since reth v2.2.0: flashblocks support, SynchronizedStdout, init_tracer Config refactor, public prestate helpers in reth-firehose-tests, and firehose-tracer 5.1.1. --- .dev/todo/release-v2.2.0-fh.md | 59 ++++++++++++++++++++++++++++++++++ CHANGELOG.md | 23 +++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 .dev/todo/release-v2.2.0-fh.md create mode 100644 CHANGELOG.md diff --git a/.dev/todo/release-v2.2.0-fh.md b/.dev/todo/release-v2.2.0-fh.md new file mode 100644 index 00000000000..df35affa925 --- /dev/null +++ b/.dev/todo/release-v2.2.0-fh.md @@ -0,0 +1,59 @@ +# Prepare Release v2.2.0-fh + +mode: feature +state: review +root_git: . +worktree: .worktrees/release-v2.2.0-fh +branch: release/v2.2.0-fh +target_branch: firehose/2.x + +> **Resume protocol:** read **Dev Feedback** and the **State Tracker** below first, then jump to the +> step marked `Current`. Ensure that you are in the correct worktree and branch according to preamble here. Update current with Developer feedback and update the tracker after every meaningful change. +> Do not mutate completed steps; append a new entry instead. + +--- + +## Initial Description + +Prepare a git release tag `v2.2.0-fh` for the `firehose/2.x` branch. + +This is a StreamingFast Firehose-specific release tag on top of the upstream reth `v2.2.0` tag. + +### What to do + +1. Create a `CHANGELOG.md` if one does not exist, documenting changes since the last firehose tag (`v1.11.4-fh-1` was the last 1.x tag; there is no prior 2.x firehose tag). + +2. Key changes to document (commits since `v2.2.0` on `firehose/2.x`): + - Updated `firehose-tracer` dependency to version 5.1.1 + - Added flashblocks support to `reth-firehose`: `start_flashblock_local`, `mark_flashblock` methods on `FirehoseBlockTracer` + - Added `SynchronizedStdout` + stdout lock: `init_tracer` now accepts `Config` and handles stdout coordination internally + - Exposed prestate types and helpers as `pub` in `reth-firehose-tests` + - Various code formatting and cleanup + +3. Commit the CHANGELOG. + +4. Create the annotated git tag `v2.2.0-fh` pointing to the HEAD of `release/v2.2.0-fh` after the CHANGELOG commit. + +### Tag naming convention + +- `v2.2.0` — upstream reth release +- `v2.2.0-fh` — StreamingFast Firehose-instrumented release on top of v2.2.0 + +## Dev Feedback + +## Spec & Implementation + +Created `CHANGELOG.md` at the repo root documenting the Firehose-specific changes since the last release. The changelog follows Keep a Changelog format and covers: + +- Flashblocks support (`start_flashblock_local`, `mark_flashblock` on `FirehoseBlockTracer`) +- `SynchronizedStdout` + `init_tracer` accepting `Config` directly +- Exposed prestate helpers as `pub` in `reth-firehose-tests` +- `firehose-tracer` bumped to 5.1.1 + +Committed `CHANGELOG.md` with message `chore: prepare v2.2.0-fh release` and created annotated tag `v2.2.0-fh`. + +## State Tracker + +**Last Updated:** 2026-05-20 +**Current Step:** Step 4 — Tag created, ready for push +**Status:** Tag `v2.2.0-fh` created locally. Branch and tag not yet pushed (awaiting user review). diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000000..b663f63fd86 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,23 @@ +# Changelog + +All notable changes to the StreamingFast Firehose fork of reth are documented here. + +This changelog covers Firehose-specific changes only. For upstream reth changes, see the +[official reth releases](https://github.com/paradigmxyz/reth/releases). + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). + +## v2.2.0-fh + +First Firehose-instrumented release on top of upstream reth v2.2.0. + +### Added + +- Add flashblocks support to `reth-firehose`: `start_flashblock_local` and `mark_flashblock` methods on `FirehoseBlockTracer` allow partial block ("flashblock") boundaries to be emitted during block execution. +- Add `SynchronizedStdout` for coordinated stdout writes across multiple concurrent tracer instances; stdout lock initialization is now handled internally by `init_tracer`. +- Expose prestate types and helpers as `pub` in `reth-firehose-tests` crate to allow reuse in downstream integration test suites. + +### Changed + +- `init_tracer` now accepts `Config` directly and sets up the stdout lock internally, removing the need for callers to manage stdout coordination themselves. +- Update `firehose-tracer` dependency to version 5.1.1. From b416977109067c5e5b70acc41fa0d42a189d2ad4 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Thu, 21 May 2026 17:39:20 +0000 Subject: [PATCH 2/5] chore: remove task file from tracking --- .dev/todo/release-v2.2.0-fh.md | 59 ---------------------------------- 1 file changed, 59 deletions(-) delete mode 100644 .dev/todo/release-v2.2.0-fh.md diff --git a/.dev/todo/release-v2.2.0-fh.md b/.dev/todo/release-v2.2.0-fh.md deleted file mode 100644 index df35affa925..00000000000 --- a/.dev/todo/release-v2.2.0-fh.md +++ /dev/null @@ -1,59 +0,0 @@ -# Prepare Release v2.2.0-fh - -mode: feature -state: review -root_git: . -worktree: .worktrees/release-v2.2.0-fh -branch: release/v2.2.0-fh -target_branch: firehose/2.x - -> **Resume protocol:** read **Dev Feedback** and the **State Tracker** below first, then jump to the -> step marked `Current`. Ensure that you are in the correct worktree and branch according to preamble here. Update current with Developer feedback and update the tracker after every meaningful change. -> Do not mutate completed steps; append a new entry instead. - ---- - -## Initial Description - -Prepare a git release tag `v2.2.0-fh` for the `firehose/2.x` branch. - -This is a StreamingFast Firehose-specific release tag on top of the upstream reth `v2.2.0` tag. - -### What to do - -1. Create a `CHANGELOG.md` if one does not exist, documenting changes since the last firehose tag (`v1.11.4-fh-1` was the last 1.x tag; there is no prior 2.x firehose tag). - -2. Key changes to document (commits since `v2.2.0` on `firehose/2.x`): - - Updated `firehose-tracer` dependency to version 5.1.1 - - Added flashblocks support to `reth-firehose`: `start_flashblock_local`, `mark_flashblock` methods on `FirehoseBlockTracer` - - Added `SynchronizedStdout` + stdout lock: `init_tracer` now accepts `Config` and handles stdout coordination internally - - Exposed prestate types and helpers as `pub` in `reth-firehose-tests` - - Various code formatting and cleanup - -3. Commit the CHANGELOG. - -4. Create the annotated git tag `v2.2.0-fh` pointing to the HEAD of `release/v2.2.0-fh` after the CHANGELOG commit. - -### Tag naming convention - -- `v2.2.0` — upstream reth release -- `v2.2.0-fh` — StreamingFast Firehose-instrumented release on top of v2.2.0 - -## Dev Feedback - -## Spec & Implementation - -Created `CHANGELOG.md` at the repo root documenting the Firehose-specific changes since the last release. The changelog follows Keep a Changelog format and covers: - -- Flashblocks support (`start_flashblock_local`, `mark_flashblock` on `FirehoseBlockTracer`) -- `SynchronizedStdout` + `init_tracer` accepting `Config` directly -- Exposed prestate helpers as `pub` in `reth-firehose-tests` -- `firehose-tracer` bumped to 5.1.1 - -Committed `CHANGELOG.md` with message `chore: prepare v2.2.0-fh release` and created annotated tag `v2.2.0-fh`. - -## State Tracker - -**Last Updated:** 2026-05-20 -**Current Step:** Step 4 — Tag created, ready for push -**Status:** Tag `v2.2.0-fh` created locally. Branch and tag not yet pushed (awaiting user review). From 54b7ac25bc152ab005c623e1829481eb7b9b01db Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Thu, 21 May 2026 17:39:29 +0000 Subject: [PATCH 3/5] chore: ignore .dev/todo/ to prevent task files from being tracked --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index f147310f5ee..64a561a3dd9 100644 --- a/.gitignore +++ b/.gitignore @@ -76,4 +76,5 @@ __pycache__/ .envrc .direnv/ .dev/tasks.md +.dev/todo/ /.worktrees \ No newline at end of file From e422da874b146b937c1e31f62f9530a5a570095a Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Wed, 3 Jun 2026 15:49:48 -0400 Subject: [PATCH 4/5] fix(engine): restore Firehose live-path tracing hooks in payload validator Blocks arriving through the engine API (newPayload / forkchoiceUpdated) were no longer traced on firehose/2.x: the live-path hooks in the engine-tree payload validator had been dropped during a merge, leaving only the historical/stage execution path instrumented. Restore the SignatureFields bound, the FirehoseBlockTracer guard lifecycle (start before execution, mark_verified after post-execution validation, drop on error), and the execute_and_trace_block dispatch that wraps the executor in FirehoseWrappedExecutor. Propagate the SignatureFields bound to the EngineValidatorBuilder impl. --- CHANGELOG.md | 4 + .../engine/tree/src/tree/payload_validator.rs | 221 ++++++++++++++++-- crates/node/builder/src/rpc.rs | 3 +- 3 files changed, 213 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b663f63fd86..945971192eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,3 +21,7 @@ First Firehose-instrumented release on top of upstream reth v2.2.0. - `init_tracer` now accepts `Config` directly and sets up the stdout lock internally, removing the need for callers to manage stdout coordination themselves. - Update `firehose-tracer` dependency to version 5.1.1. + +### Fixed + +- Restore the Firehose live-path hooks in the engine-tree payload validator so blocks arriving through the engine API (`newPayload` / `forkchoiceUpdated`) are traced again. The hooks had been dropped during a merge, leaving only the historical/stage execution path instrumented. diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index d661bf02784..a72320e21aa 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -77,7 +77,7 @@ use reth_payload_primitives::{ }; use reth_primitives_traits::{ AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives, - RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable, + RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable, TxTy, }; use reth_provider::{ providers::{OverlayBuilder, OverlayStateProviderFactory}, @@ -394,12 +394,13 @@ where where V: PayloadValidator + Clone, Evm: ConfigureEngineEvm, + TxTy: reth_firehose::mapper::SignatureFields, { // Spawn payload conversion on a background thread so it runs concurrently with the // rest of the function (setup + execution). For payloads this overlaps the cost of // RLP decoding + header hashing. let is_payload = matches!(&input, BlockOrPayload::Payload(_)); - let convert_to_block = match &input { + let mut convert_to_block_handle = match &input { BlockOrPayload::Payload(_) => { let payload_clone = input.clone(); let validator = self.validator.clone(); @@ -412,21 +413,28 @@ where validator.convert_payload_to_block(payload) }, ); - Either::Left(handle) + Some(handle) } - BlockOrPayload::Block(_) => Either::Right(()), + BlockOrPayload::Block(_) => None, }; // Returns the sealed block, either by awaiting the background conversion task (for // payloads) or by extracting the already-converted block directly. - let convert_to_block = + // + // Dispatches on the variant of the `input` it is handed rather than on the variant seen at + // spawn time, which makes it callable more than once: the Firehose preamble below resolves + // the block eagerly and rebuilds `input` as a `Block`, so the later call site returns the + // already-sealed block directly instead of touching the (already-consumed) conversion + // handle. + let mut convert_to_block = move |input: BlockOrPayload| -> Result, NewPayloadError> { - match convert_to_block { - Either::Left(handle) => handle.try_into_inner().expect("sole handle"), - Either::Right(()) => { - let BlockOrPayload::Block(block) = input else { unreachable!() }; - Ok(block) - } + match input { + BlockOrPayload::Block(block) => Ok(block), + BlockOrPayload::Payload(_) => convert_to_block_handle + .take() + .expect("sole handle") + .try_into_inner() + .expect("sole handle"), } }; @@ -564,15 +572,57 @@ where None }; + // Firehose: when the tracer is active, resolve the sealed block synchronously so we can + // start the block-level tracer guard before execution. The guard emits + // `on_block_start`/`on_genesis_block` now and defers `on_block_end(None)` until + // `mark_verified()` runs after post-execution validation. + // + // If any early return is taken between here and `mark_verified()`, the guard's Drop emits + // `on_block_end(Some(err))` so invalid blocks are never flushed downstream. + // + // `finalized` is `None` on the live engine path: the consensus layer hasn't necessarily + // advanced the finalized head by the time we're validating a payload. The pipeline + // (staged-sync) path advertises the finalized ref itself. + // + // We resolve the block eagerly here and rebuild `input` as a `Block` variant so the later + // `convert_to_block(input)?` site becomes a cheap unwrap of the already-sealed block. + // + // Block 1 is the genesis marker: `start` emits `on_genesis_block` as a standalone event and + // does NOT leave the tracer in "block state", so wrapping the executor would panic in + // `on_system_call_start`. Let the guard drop (a no-op for genesis) and fall through to the + // non-Firehose execution path. + let (mut fh_tracer, input): (Option, _) = + if reth_firehose::is_tracer_initialized() { + let sealed = convert_to_block(input)?; + let tracer = reth_firehose::FirehoseBlockTracer::start::(&sealed, None); + let fh_tracer = (!tracer.is_genesis()).then_some(tracer); + (fh_tracer, BlockOrPayload::Block(sealed)) + } else { + (None, input) + }; + // Execute the block and handle any execution errors. // The receipt root task is spawned before execution and receives receipts incrementally // as transactions complete, allowing parallel computation during execution. + // + // Two entry points exist for the same work: `execute_block` is the upstream-pristine path + // and `execute_and_trace_block` is its Firehose-enabled twin. We pick based on whether a + // live tracer guard is available for this block (see the Firehose preamble above for when + // the guard is `None` — notably the block-1 genesis marker). let execute_block_start = Instant::now(); - let (output, senders, receipt_root_rx) = - match self.execute_block(state_provider, env, &input, &mut handle) { + let (output, senders, receipt_root_rx) = match fh_tracer.as_mut() { + Some(tracer) => { + match self.execute_and_trace_block(state_provider, env, &input, tracer, &mut handle) + { + Ok(output) => output, + Err(err) => return self.handle_execution_error(input, err, &parent_block), + } + } + None => match self.execute_block(state_provider, env, &input, &mut handle) { Ok(output) => output, Err(err) => return self.handle_execution_error(input, err, &parent_block), - }; + }, + }; let execution_duration = execute_block_start.elapsed(); // After executing the block we can stop prewarming transactions @@ -836,6 +886,13 @@ where let changeset_provider = ensure_ok_post_block!(overlay_factory.database_provider_ro(), block); + // All post-execution validations passed — flush the Firehose block. Placed after the last + // fallible step so any earlier failure instead drops the guard (emitting + // `on_block_end(Some(err))`) and discards the block. + if let Some(guard) = fh_tracer.take() { + guard.mark_verified(); + } + let executed_block = self.spawn_deferred_trie_task( block, output, @@ -1004,6 +1061,141 @@ where Ok((output, senders, result_rx)) } + /// Firehose-enabled twin of [`Self::execute_block`]. + /// + /// This is a copy of `execute_block` with two surgical differences: + /// - the EVM is constructed via `evm_with_env_and_inspector` with a `FirehoseInspector` + /// borrowed from the supplied `tracer`; + /// - the resulting executor is wrapped in a + /// [`reth_firehose::executor::FirehoseWrappedExecutor`] so system-call boundaries and + /// withdrawals are funnelled into the tracer. + /// + /// MAINTENANCE CONTRACT: keep this function in sync with [`Self::execute_block`]. Any change to + /// the upstream logic (new metrics, error handling, ordering of pre/post steps, etc.) must be + /// mirrored here. The intent of duplicating rather than branching is to keep the diff against + /// upstream minimal and localized: upstream's `execute_block` stays pristine, and the + /// Firehose-specific wiring lives here. + #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)] + #[expect(clippy::type_complexity)] + fn execute_and_trace_block( + &mut self, + state_provider: S, + env: ExecutionEnv, + input: &BlockOrPayload, + tracer: &mut reth_firehose::FirehoseBlockTracer, + handle: &mut PayloadHandle, Err, N::Receipt>, + ) -> Result< + ( + BlockExecutionOutput, + Vec
, + tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>, + ), + InsertBlockErrorKind, + > + where + S: StateProvider + Send, + Err: core::error::Error + Send + Sync + 'static, + V: PayloadValidator, + T: PayloadTypes>, + Evm: ConfigureEngineEvm, + TxTy: reth_firehose::mapper::SignatureFields, + { + debug!(target: "engine::tree::payload_validator", "Executing block (with Firehose tracing)"); + + let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| { + State::builder() + .with_database(StateProviderDatabase::new(state_provider)) + .with_bundle_update() + .build() + }); + + // Firehose-specific: install the inspector on the EVM and wrap the executor so that + // system-call boundaries and withdrawals are forwarded to the tracer. Withdrawals are + // pre-materialized here because the wrapper emits them as part of the end-of-block event. + let withdrawals = + input.withdrawals().map(|w| alloy_eips::eip4895::Withdrawals::new(w.to_vec())); + let (spec_id, mut executor) = { + let _span = debug_span!(target: "engine::tree", "create_evm").entered(); + let spec_id = *env.evm_env.spec_id(); + let inspector = tracer.inspector(); + let evm = self.evm_config.evm_with_env_and_inspector(&mut db, env.evm_env, inspector); + let ctx = self + .execution_ctx_for(input) + .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?; + let inner = self.evm_config.create_executor(evm, ctx); + let executor = + reth_firehose::executor::FirehoseWrappedExecutor::new(inner, withdrawals); + (spec_id, executor) + }; + + if !self.config.precompile_cache_disabled() { + let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered(); + executor.evm_mut().precompiles_mut().map_cacheable_precompiles( + |address, precompile| { + let metrics = self + .precompile_cache_metrics + .entry(*address) + .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address)) + .clone(); + CachedPrecompile::wrap( + precompile, + self.precompile_cache_map.cache_for_address(*address), + spec_id, + Some(metrics), + ) + }, + ); + } + + // Spawn background task to compute receipt root and logs bloom incrementally. + // Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block). + let receipts_len = input.transaction_count(); + let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded(); + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx); + self.payload_processor + .executor() + .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len)); + + let transaction_count = input.transaction_count(); + let executed_tx_index = Arc::clone(handle.executed_tx_index()); + let executor = executor.with_state_hook( + handle.state_hook().map(|hook| Box::new(hook) as Box), + ); + + let execution_start = Instant::now(); + + // Execute all transactions and finalize + let (executor, senders) = self.execute_transactions( + executor, + transaction_count, + handle.iter_transactions(), + &receipt_tx, + &executed_tx_index, + )?; + drop(receipt_tx); + + // Finish execution and get the result + let post_exec_start = Instant::now(); + let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish") + .in_scope(|| executor.finish()) + .map(|(evm, result)| (evm.into_db(), result))?; + self.metrics.record_post_execution(post_exec_start.elapsed()); + + // Merge transitions into bundle state + debug_span!(target: "engine::tree", "merge_transitions") + .in_scope(|| db.merge_transitions(BundleRetention::Reverts)); + + let output = BlockExecutionOutput { result, state: db.take_bundle() }; + + let execution_duration = execution_start.elapsed(); + self.metrics.record_block_execution(&output, execution_duration); + self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration); + debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block (with Firehose tracing)"); + + Ok((output, senders, result_rx)) + } + /// Executes transactions and collects senders, streaming receipts to a background task. /// /// This method handles: @@ -1976,6 +2168,7 @@ where V: PayloadValidator + Clone, Evm: ConfigureEngineEvm + 'static, Types: PayloadTypes>, + TxTy: reth_firehose::mapper::SignatureFields, { fn validate_payload_attributes_against_header( &self, diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index a10c40841f7..4f23d00e799 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -24,7 +24,7 @@ use reth_chain_state::CanonStateSubscriptions; use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks, Hardforks}; use reth_node_api::{ AddOnsContext, BlockTy, EngineApiValidator, EngineTypes, FullNodeComponents, FullNodeTypes, - NodeAddOns, NodeTypes, PayloadTypes, PayloadValidator, PrimitivesTy, TreeConfig, + NodeAddOns, NodeTypes, PayloadTypes, PayloadValidator, PrimitivesTy, TreeConfig, TxTy, }; use reth_node_core::{ cli::config::RethTransactionPoolConfig, @@ -1452,6 +1452,7 @@ where ::Payload, Block = BlockTy, > + Clone, + TxTy: reth_firehose::mapper::SignatureFields, { type EngineValidator = BasicEngineValidator; From aabd6825b7fe1e721ed3625029ea9026757804d3 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Wed, 3 Jun 2026 15:49:48 -0400 Subject: [PATCH 5/5] test(engine): add live-path Firehose tracing regression test Drives a real EthereumNode through engine_newPayload (the validate_block_with_state path) with a buffer-backed global tracer and asserts FIRE BLOCK lines are emitted for the live blocks. Fails if the live-path dispatch is missing. Add reth_firehose::init_tracer_with_buffer to install a buffer-backed global tracer for output capture. The test lives in its own integration-test binary so the process-wide tracer stays isolated from the rest of the suite. --- .../tree/tests/firehose_live_tracing.rs | 106 ++++++++++++++++++ crates/firehose/src/lib.rs | 43 +++++++ 2 files changed, 149 insertions(+) create mode 100644 crates/engine/tree/tests/firehose_live_tracing.rs diff --git a/crates/engine/tree/tests/firehose_live_tracing.rs b/crates/engine/tree/tests/firehose_live_tracing.rs new file mode 100644 index 00000000000..a3cf4ea4166 --- /dev/null +++ b/crates/engine/tree/tests/firehose_live_tracing.rs @@ -0,0 +1,106 @@ +//! Regression test for Firehose tracing on the engine-tree live-block path. +//! +//! The Firehose "live-path" hooks in `tree::payload_validator` route blocks arriving through the +//! engine API (`newPayload` / `forkchoiceUpdated`) into the Firehose tracer. On `firehose/2.x` +//! those hooks were once dropped during a merge, silently disabling live-block tracing while the +//! historical/stage path kept working — a regression that compiled and passed every existing test. +//! +//! This test guards that path end-to-end: it installs a buffer-backed global tracer, drives a real +//! [`EthereumNode`] through `newPayload` (which calls `validate_block_with_state`, the path under +//! test), and asserts that `FIRE BLOCK` lines are emitted for the live blocks. If the dispatch into +//! `execute_and_trace_block` is missing, no `FIRE BLOCK` lines are produced and the test fails. +//! +//! It lives in its own integration-test binary (rather than alongside the other engine-tree tests) +//! because it installs a process-wide tracer; cargo/nextest run each integration binary in its own +//! process, keeping the global tracer isolated from the rest of the suite. + +use eyre::Result; +use reth_chainspec::{ChainSpecBuilder, MAINNET}; +use reth_e2e_test_utils::testsuite::{ + actions::{MakeCanonical, ProduceBlocks}, + setup::{NetworkSetup, Setup}, + TestBuilder, +}; +use reth_engine_tree::tree::TreeConfig; +use reth_ethereum_engine_primitives::EthEngineTypes; +use reth_node_ethereum::EthereumNode; +use std::sync::Arc; + +/// Number of blocks to produce. Block 1 is the Firehose genesis marker (emitted via +/// `on_genesis_block`); blocks 2.. exercise the live `execute_and_trace_block` path. +const PRODUCED_BLOCKS: u64 = 3; + +#[tokio::test] +async fn live_payload_validation_emits_firehose_blocks() -> Result<()> { + reth_tracing::init_test_tracing(); + + // Install a buffer-backed global Firehose tracer BEFORE the node validates any block, so the + // live path's `is_tracer_initialized()` gate activates and routes execution through + // `execute_and_trace_block`. Fork timings only affect how block contents are mapped, not + // whether a block is emitted; activate Shanghai + Cancun at genesis to match the + // `cancun_activated` chain spec below. + let buffer = reth_firehose::init_tracer_with_buffer( + MAINNET.chain.id(), + Some(0), // shanghai + Some(0), // cancun + None, // prague + ); + + let setup = Setup::::default() + .with_chain_spec(Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis( + serde_json::from_str(include_str!( + "../../../e2e-test-utils/src/testsuite/assets/genesis.json" + )) + .unwrap(), + ) + .cancun_activated() + .build(), + )) + .with_network(NetworkSetup::single_node()) + .with_tree_config( + TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true), + ); + + // Each produced block is submitted via `engine_newPayload`, which drives + // `validate_block_with_state` — the path under test. + let test = TestBuilder::new() + .with_setup(setup) + .with_action(ProduceBlocks::::new(PRODUCED_BLOCKS)) + .with_action(MakeCanonical::new()); + + test.run::().await?; + + // Collect the block numbers from every captured `FIRE BLOCK ...` line. + let raw = buffer.get_bytes(); + let text = String::from_utf8(raw).expect("captured tracer output is UTF-8"); + let traced: Vec = text + .lines() + .filter_map(|line| { + let mut parts = line.split(' '); + if parts.next()? != "FIRE" || parts.next()? != "BLOCK" { + return None; + } + parts.next()?.parse::().ok() + }) + .collect(); + + assert!( + !traced.is_empty(), + "no FIRE BLOCK lines were emitted — the live payload-validation path is not traced.\n\ + Captured tracer output:\n{text}" + ); + + // Blocks 2..=PRODUCED_BLOCKS go through the live `execute_and_trace_block` path (block 1 is the + // genesis marker, emitted separately via `on_genesis_block`). Require each to have been traced. + for number in 2..=PRODUCED_BLOCKS { + assert!( + traced.contains(&number), + "expected a FIRE BLOCK line for live block #{number}, got traced blocks {traced:?}" + ); + } + + Ok(()) +} diff --git a/crates/firehose/src/lib.rs b/crates/firehose/src/lib.rs index f9409ec9d43..bd9c275d342 100644 --- a/crates/firehose/src/lib.rs +++ b/crates/firehose/src/lib.rs @@ -116,6 +116,49 @@ pub fn init_tracer(config: firehose_tracer::config::Config) { .expect("init_tracer called more than once"); } +/// Initialize the process-wide tracer to capture all output into an in-memory buffer, returning a +/// handle to read it back. +/// +/// This is the buffer-backed counterpart to [`init_tracer`] (which writes to stdout): it builds a +/// fully blockchain-initialized [`firehose_tracer::Tracer`] over a +/// [`firehose_tracer::InMemoryBuffer`] and installs it as the process-wide tracer, so the live +/// engine path — [`is_tracer_initialized`] plus [`block_tracer::FirehoseBlockTracer::start`] — +/// becomes active and every `FIRE BLOCK` line is captured instead of printed. +/// +/// Intended for integration tests that drive the real validation path and need to assert on the +/// emitted Firehose blocks. Like [`init_tracer`], it must be called at most once per process. +/// +/// `shanghai_time` / `cancun_time` / `prague_time` are the timestamp-based fork activations the +/// tracer uses when mapping block contents (`Some(0)` = active from genesis, `None` = never). They +/// do not gate whether a block is emitted. +pub fn init_tracer_with_buffer( + chain_id: u64, + shanghai_time: Option, + cancun_time: Option, + prague_time: Option, +) -> firehose_tracer::InMemoryBuffer { + // Mirror `init_tracer`: ensure the shared stdout lock exists so any code path that later + // reaches for it (e.g. an additional flashblock tracer) does not panic. + let _ = init_stdout_lock(); + let (tracer, buffer) = firehose_tracer::Tracer::with_buffer( + firehose_tracer::config::Config::default(), + firehose_tracer::config::ChainConfig { + chain_id, + shanghai_time, + cancun_time, + prague_time, + verkle_time: None, + }, + "reth-firehose", + env!("CARGO_PKG_VERSION"), + ); + GLOBAL_TRACER + .set(Arc::new(Mutex::new(tracer))) + .ok() + .expect("init_tracer/init_tracer_with_buffer called more than once"); + buffer +} + /// Acquire exclusive access to the process-wide tracer. /// /// Panics if [`init_tracer`] has not been called yet, or if the mutex is poisoned.