Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ __pycache__/
.envrc
.direnv/
.dev/tasks.md
.dev/todo/
/.worktrees
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Changelog

All notable changes to the StreamingFast Firehose fork of reth are documented here.

This changelog covers Firehose-specific changes only. For upstream reth changes, see the
[official reth releases](https://github.com/paradigmxyz/reth/releases).

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## v2.2.0-fh

First Firehose-instrumented release on top of upstream reth v2.2.0.

### Added

- Add flashblocks support to `reth-firehose`: `start_flashblock_local` and `mark_flashblock` methods on `FirehoseBlockTracer` allow partial block ("flashblock") boundaries to be emitted during block execution.
- Add `SynchronizedStdout` for coordinated stdout writes across multiple concurrent tracer instances; stdout lock initialization is now handled internally by `init_tracer`.
- Expose prestate types and helpers as `pub` in `reth-firehose-tests` crate to allow reuse in downstream integration test suites.

### Changed

- `init_tracer` now accepts `Config` directly and sets up the stdout lock internally, removing the need for callers to manage stdout coordination themselves.
- Update `firehose-tracer` dependency to version 5.1.1.

### Fixed

- Restore the Firehose live-path hooks in the engine-tree payload validator so blocks arriving through the engine API (`newPayload` / `forkchoiceUpdated`) are traced again. The hooks had been dropped during a merge, leaving only the historical/stage execution path instrumented.
221 changes: 207 additions & 14 deletions crates/engine/tree/src/tree/payload_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ use reth_payload_primitives::{
};
use reth_primitives_traits::{
AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives,
RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable, TxTy,
};
use reth_provider::{
providers::{OverlayBuilder, OverlayStateProviderFactory},
Expand Down Expand Up @@ -394,12 +394,13 @@ where
where
V: PayloadValidator<T, Block = N::Block> + Clone,
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
TxTy<N>: reth_firehose::mapper::SignatureFields,
{
// Spawn payload conversion on a background thread so it runs concurrently with the
// rest of the function (setup + execution). For payloads this overlaps the cost of
// RLP decoding + header hashing.
let is_payload = matches!(&input, BlockOrPayload::Payload(_));
let convert_to_block = match &input {
let mut convert_to_block_handle = match &input {
BlockOrPayload::Payload(_) => {
let payload_clone = input.clone();
let validator = self.validator.clone();
Expand All @@ -412,21 +413,28 @@ where
validator.convert_payload_to_block(payload)
},
);
Either::Left(handle)
Some(handle)
}
BlockOrPayload::Block(_) => Either::Right(()),
BlockOrPayload::Block(_) => None,
};

// Returns the sealed block, either by awaiting the background conversion task (for
// payloads) or by extracting the already-converted block directly.
let convert_to_block =
//
// Dispatches on the variant of the `input` it is handed rather than on the variant seen at
// spawn time, which makes it callable more than once: the Firehose preamble below resolves
// the block eagerly and rebuilds `input` as a `Block`, so the later call site returns the
// already-sealed block directly instead of touching the (already-consumed) conversion
// handle.
let mut convert_to_block =
move |input: BlockOrPayload<T>| -> Result<SealedBlock<N::Block>, NewPayloadError> {
match convert_to_block {
Either::Left(handle) => handle.try_into_inner().expect("sole handle"),
Either::Right(()) => {
let BlockOrPayload::Block(block) = input else { unreachable!() };
Ok(block)
}
match input {
BlockOrPayload::Block(block) => Ok(block),
BlockOrPayload::Payload(_) => convert_to_block_handle
.take()
.expect("sole handle")
.try_into_inner()
.expect("sole handle"),
}
};

Expand Down Expand Up @@ -564,15 +572,57 @@ where
None
};

// Firehose: when the tracer is active, resolve the sealed block synchronously so we can
// start the block-level tracer guard before execution. The guard emits
// `on_block_start`/`on_genesis_block` now and defers `on_block_end(None)` until
// `mark_verified()` runs after post-execution validation.
//
// If any early return is taken between here and `mark_verified()`, the guard's Drop emits
// `on_block_end(Some(err))` so invalid blocks are never flushed downstream.
//
// `finalized` is `None` on the live engine path: the consensus layer hasn't necessarily
// advanced the finalized head by the time we're validating a payload. The pipeline
// (staged-sync) path advertises the finalized ref itself.
//
// We resolve the block eagerly here and rebuild `input` as a `Block` variant so the later
// `convert_to_block(input)?` site becomes a cheap unwrap of the already-sealed block.
//
// Block 1 is the genesis marker: `start` emits `on_genesis_block` as a standalone event and
// does NOT leave the tracer in "block state", so wrapping the executor would panic in
// `on_system_call_start`. Let the guard drop (a no-op for genesis) and fall through to the
// non-Firehose execution path.
let (mut fh_tracer, input): (Option<reth_firehose::FirehoseBlockTracer>, _) =
if reth_firehose::is_tracer_initialized() {
let sealed = convert_to_block(input)?;
let tracer = reth_firehose::FirehoseBlockTracer::start::<N>(&sealed, None);
let fh_tracer = (!tracer.is_genesis()).then_some(tracer);
(fh_tracer, BlockOrPayload::Block(sealed))
} else {
(None, input)
};

// Execute the block and handle any execution errors.
// The receipt root task is spawned before execution and receives receipts incrementally
// as transactions complete, allowing parallel computation during execution.
//
// Two entry points exist for the same work: `execute_block` is the upstream-pristine path
// and `execute_and_trace_block` is its Firehose-enabled twin. We pick based on whether a
// live tracer guard is available for this block (see the Firehose preamble above for when
// the guard is `None` — notably the block-1 genesis marker).
let execute_block_start = Instant::now();
let (output, senders, receipt_root_rx) =
match self.execute_block(state_provider, env, &input, &mut handle) {
let (output, senders, receipt_root_rx) = match fh_tracer.as_mut() {
Some(tracer) => {
match self.execute_and_trace_block(state_provider, env, &input, tracer, &mut handle)
{
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
}
}
None => match self.execute_block(state_provider, env, &input, &mut handle) {
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
};
},
};
let execution_duration = execute_block_start.elapsed();

// After executing the block we can stop prewarming transactions
Expand Down Expand Up @@ -836,6 +886,13 @@ where
let changeset_provider =
ensure_ok_post_block!(overlay_factory.database_provider_ro(), block);

// All post-execution validations passed — flush the Firehose block. Placed after the last
// fallible step so any earlier failure instead drops the guard (emitting
// `on_block_end(Some(err))`) and discards the block.
if let Some(guard) = fh_tracer.take() {
guard.mark_verified();
}

let executed_block = self.spawn_deferred_trie_task(
block,
output,
Expand Down Expand Up @@ -1004,6 +1061,141 @@ where
Ok((output, senders, result_rx))
}

/// Firehose-enabled twin of [`Self::execute_block`].
///
/// This is a copy of `execute_block` with two surgical differences:
/// - the EVM is constructed via `evm_with_env_and_inspector` with a `FirehoseInspector`
/// borrowed from the supplied `tracer`;
/// - the resulting executor is wrapped in a
/// [`reth_firehose::executor::FirehoseWrappedExecutor`] so system-call boundaries and
/// withdrawals are funnelled into the tracer.
///
/// MAINTENANCE CONTRACT: keep this function in sync with [`Self::execute_block`]. Any change to
/// the upstream logic (new metrics, error handling, ordering of pre/post steps, etc.) must be
/// mirrored here. The intent of duplicating rather than branching is to keep the diff against
/// upstream minimal and localized: upstream's `execute_block` stays pristine, and the
/// Firehose-specific wiring lives here.
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
#[expect(clippy::type_complexity)]
fn execute_and_trace_block<S, Err, T>(
&mut self,
state_provider: S,
env: ExecutionEnv<Evm>,
input: &BlockOrPayload<T>,
tracer: &mut reth_firehose::FirehoseBlockTracer,
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
) -> Result<
(
BlockExecutionOutput<N::Receipt>,
Vec<Address>,
tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
),
InsertBlockErrorKind,
>
where
S: StateProvider + Send,
Err: core::error::Error + Send + Sync + 'static,
V: PayloadValidator<T, Block = N::Block>,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
TxTy<N>: reth_firehose::mapper::SignatureFields,
{
debug!(target: "engine::tree::payload_validator", "Executing block (with Firehose tracing)");

let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| {
State::builder()
.with_database(StateProviderDatabase::new(state_provider))
.with_bundle_update()
.build()
});

// Firehose-specific: install the inspector on the EVM and wrap the executor so that
// system-call boundaries and withdrawals are forwarded to the tracer. Withdrawals are
// pre-materialized here because the wrapper emits them as part of the end-of-block event.
let withdrawals =
input.withdrawals().map(|w| alloy_eips::eip4895::Withdrawals::new(w.to_vec()));
let (spec_id, mut executor) = {
let _span = debug_span!(target: "engine::tree", "create_evm").entered();
let spec_id = *env.evm_env.spec_id();
let inspector = tracer.inspector();
let evm = self.evm_config.evm_with_env_and_inspector(&mut db, env.evm_env, inspector);
let ctx = self
.execution_ctx_for(input)
.map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
let inner = self.evm_config.create_executor(evm, ctx);
let executor =
reth_firehose::executor::FirehoseWrappedExecutor::new(inner, withdrawals);
(spec_id, executor)
};

if !self.config.precompile_cache_disabled() {
let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
executor.evm_mut().precompiles_mut().map_cacheable_precompiles(
|address, precompile| {
let metrics = self
.precompile_cache_metrics
.entry(*address)
.or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
.clone();
CachedPrecompile::wrap(
precompile,
self.precompile_cache_map.cache_for_address(*address),
spec_id,
Some(metrics),
)
},
);
}

// Spawn background task to compute receipt root and logs bloom incrementally.
// Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block).
let receipts_len = input.transaction_count();
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
self.payload_processor
.executor()
.spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));

let transaction_count = input.transaction_count();
let executed_tx_index = Arc::clone(handle.executed_tx_index());
let executor = executor.with_state_hook(
handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook>),
);

let execution_start = Instant::now();

// Execute all transactions and finalize
let (executor, senders) = self.execute_transactions(
executor,
transaction_count,
handle.iter_transactions(),
&receipt_tx,
&executed_tx_index,
)?;
drop(receipt_tx);

// Finish execution and get the result
let post_exec_start = Instant::now();
let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish")
.in_scope(|| executor.finish())
.map(|(evm, result)| (evm.into_db(), result))?;
self.metrics.record_post_execution(post_exec_start.elapsed());

// Merge transitions into bundle state
debug_span!(target: "engine::tree", "merge_transitions")
.in_scope(|| db.merge_transitions(BundleRetention::Reverts));

let output = BlockExecutionOutput { result, state: db.take_bundle() };

let execution_duration = execution_start.elapsed();
self.metrics.record_block_execution(&output, execution_duration);
self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block (with Firehose tracing)");

Ok((output, senders, result_rx))
}

/// Executes transactions and collects senders, streaming receipts to a background task.
///
/// This method handles:
Expand Down Expand Up @@ -1976,6 +2168,7 @@ where
V: PayloadValidator<Types, Block = N::Block> + Clone,
Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
TxTy<N>: reth_firehose::mapper::SignatureFields,
{
fn validate_payload_attributes_against_header(
&self,
Expand Down
Loading
Loading