Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 162 additions & 92 deletions Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ mod tests {
.iter_mut()
.find(|account| account.address() == sstore_contract)
.expect("SSTORE contract must be present in BAL");
account.storage_root = Some(B256::from([0x99; 32]));
account.storage_root = Some(alloy_eip7928::StorageRoot::Root(B256::from([0x99; 32])));

let tampered_hash = alloy_eip7928::compute_block_access_list_hash(&tampered_bal);
let block = empty_amsterdam_block(tampered_hash);
Expand Down
32 changes: 30 additions & 2 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::tree::{
ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
WaitForCaches,
};
use alloy_eip7928::bal::DecodedBal;
use alloy_eip7928::{bal::DecodedBal, StorageRoot};
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
Expand All @@ -30,6 +30,7 @@ use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
use reth_trie::{
hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory, HashedPostState,
};
use reth_trie_common::EMPTY_ROOT_HASH;
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
root::ParallelStateRootError,
Expand Down Expand Up @@ -83,6 +84,15 @@ pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
/// prewarm workers exceeds the execution time saved.
pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;

/// Converts a BAL storage root into the trie root used for state-root computation.
pub(crate) const fn bal_storage_root_to_b256(storage_root: Option<StorageRoot>) -> Option<B256> {
match storage_root {
Some(StorageRoot::Empty) => Some(EMPTY_ROOT_HASH),
Some(StorageRoot::Root(root)) => Some(root),
None => None,
}
}

/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
type IteratorTx<Evm, I> = RecoveredTx<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>;

Expand Down Expand Up @@ -1019,7 +1029,10 @@ where
#[cfg(test)]
mod tests {
use crate::tree::{
payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
payload_processor::{
bal_storage_root_to_b256, evm_state_to_hashed_post_state, ExecutionEnv,
PayloadProcessor,
},
precompile_cache::PrecompileCacheMap,
ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
};
Expand Down Expand Up @@ -1050,6 +1063,21 @@ mod tests {
SavedCache::new(hash, execution_cache)
}

#[test]
fn bal_storage_root_to_b256_maps_optional_empty_and_root() {
let root = B256::from([0x42; 32]);

assert_eq!(
bal_storage_root_to_b256(Some(alloy_eip7928::StorageRoot::Empty)),
Some(reth_trie_common::EMPTY_ROOT_HASH)
);
assert_eq!(
bal_storage_root_to_b256(Some(alloy_eip7928::StorageRoot::Root(root))),
Some(root)
);
assert_eq!(bal_storage_root_to_b256(None), None);
}

#[test]
fn execution_cache_allows_single_checkout() {
let execution_cache = PayloadExecutionCache::default();
Expand Down
22 changes: 20 additions & 2 deletions crates/engine/tree/src/tree/payload_processor/prewarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
//! 3. When actual block execution happens, it benefits from the warmed cache

use crate::tree::{
payload_processor::multiproof::StateRootMessage,
payload_processor::{bal_storage_root_to_b256, multiproof::StateRootMessage},
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
CachedStateCacheMetrics, CachedStateMetrics, CachedStateProvider, ExecutionEnv,
PayloadExecutionCache, SavedCache, StateProviderBuilder,
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::bal::DecodedBal;
use alloy_eips::eip4895::Withdrawal;
use alloy_primitives::{keccak256, StorageKey, B256};
use alloy_primitives::{keccak256, map::B256Map, StorageKey, B256};
use crossbeam_channel::Sender as CrossbeamSender;
use metrics::{Counter, Gauge, Histogram};
use rayon::prelude::*;
Expand All @@ -34,6 +34,7 @@ use reth_provider::{
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::{pool::WorkerPool, Runtime};
use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
use revm_primitives::hardfork::SpecId;
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::{self, channel, Receiver, Sender},
Expand Down Expand Up @@ -354,6 +355,8 @@ where
let ctx = self.ctx.clone();
let to_sparse_trie_task = self.to_sparse_trie_task.clone();
let executor = self.executor.clone();
let is_bogota_active =
Into::<SpecId>::into(*self.ctx.env.evm_env.spec_id()).is_enabled_in(SpecId::BOGOTA);
let parent_span = Span::current();
let prefetch_parent_span = parent_span.clone();
let stream_parent_span = parent_span;
Expand All @@ -374,6 +377,21 @@ where
let parent_span = branch_span.clone();
let _span = branch_span.entered();

if is_bogota_active {
let storage_roots = stream_bal
.as_bal()
.iter()
.filter_map(|account| {
bal_storage_root_to_b256(account.storage_root_value())
.map(|root| (keccak256(account.address()), root))
})
.collect::<B256Map<_>>();
if !storage_roots.is_empty() {
let _ =
to_sparse_trie_task.send(StateRootMessage::StorageRoots(storage_roots));
}
}

stream_bal.as_bal().par_iter().for_each(|account_changes| {
WorkerPool::with_worker_mut(|worker| {
let provider =
Expand Down
36 changes: 29 additions & 7 deletions crates/engine/tree/src/tree/payload_processor/sparse_trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = Configurab
account_updates: B256Map<LeafUpdate>,
/// Storage trie updates. hashed address -> slot -> update.
storage_updates: B256Map<B256Map<LeafUpdate>>,
/// Precomputed post-block storage roots from the Bogota BAL.
precomputed_storage_roots: B256Map<B256>,

/// Account updates that are buffered but were not yet applied to the trie.
new_account_updates: B256Map<LeafUpdate>,
Expand Down Expand Up @@ -158,6 +160,7 @@ where
max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
account_updates: Default::default(),
storage_updates: Default::default(),
precomputed_storage_roots: Default::default(),
new_account_updates: Default::default(),
new_storage_updates: Default::default(),
pending_account_updates: Default::default(),
Expand Down Expand Up @@ -208,6 +211,7 @@ where
StateRootMessage::HashedStateUpdate(state) => {
SparseTrieTaskMessage::HashedState(state)
}
StateRootMessage::StorageRoots(roots) => SparseTrieTaskMessage::StorageRoots(roots),
};
if hashed_state_tx.send(msg).is_err() {
break;
Expand Down Expand Up @@ -427,6 +431,9 @@ where
SparseTrieTaskMessage::HashedState(hashed_state) => {
self.on_hashed_state_update(hashed_state)
}
SparseTrieTaskMessage::StorageRoots(roots) => {
self.precomputed_storage_roots.extend(roots);
}
SparseTrieTaskMessage::FinishedStateUpdates => {
let _ = self
.final_hashed_state_tx
Expand Down Expand Up @@ -694,6 +701,9 @@ where
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
Vec::with_capacity(addresses_to_compute_roots.len());
for address in addresses_to_compute_roots {
if self.precomputed_storage_roots.contains_key(&address) {
continue;
}
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
!trie.is_root_cached()
{
Expand Down Expand Up @@ -754,29 +764,39 @@ where
let span = trace_span!("promote_updates", promoted = tracing::field::Empty).entered();
// Now handle pending account updates that can be upgraded to a proper update.
let account_rlp_buf = &mut self.account_rlp_buf;
let account_updates = &mut self.account_updates;
let storage_updates = &self.storage_updates;
let precomputed_storage_roots = &self.precomputed_storage_roots;
let trie = &mut self.trie;
let storage_root = |trie: &mut SparseStateTrie<A, S>, address: &B256| {
precomputed_storage_roots
.get(address)
.copied()
.or_else(|| trie.storage_root(address))
};
let mut num_promoted = 0;
self.pending_account_updates.retain(|addr, account| {
if let Some(updates) = self.storage_updates.get(addr) {
if let Some(updates) = storage_updates.get(addr) {
if !updates.is_empty() {
// If account has pending storage updates, it is still pending.
return true;
} else if let Some(account) = account.take() {
let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
let storage_root = storage_root(trie, addr).expect("updates are drained, storage trie should be revealed by now");
let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
account_updates.insert(*addr, LeafUpdate::Changed(encoded));
num_promoted += 1;
return false;
}
}

// Get the current account state either from the trie or from latest account update.
let trie_account = match self.account_updates.get(addr) {
let trie_account = match account_updates.get(addr) {
Some(LeafUpdate::Changed(encoded)) => {
Some(encoded).filter(|encoded| !encoded.is_empty())
}
// Needs to be revealed first
Some(LeafUpdate::Touched) => return true,
None => self.trie.get_account_value(addr),
None => trie.get_account_value(addr),
};

let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
Expand All @@ -790,11 +810,11 @@ where

(account, storage_root)
} else {
(trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
(trie_account.map(Into::into), storage_root(trie, addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
};

let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
account_updates.insert(*addr, LeafUpdate::Changed(encoded));
num_promoted += 1;

false
Expand Down Expand Up @@ -904,6 +924,8 @@ impl PendingTargets {
enum SparseTrieTaskMessage {
/// A hashed state update ready to be processed.
HashedState(HashedPostState),
/// Precomputed post-block storage roots keyed by hashed account address.
StorageRoots(B256Map<B256>),
/// Prefetch proof targets (passed through directly).
PrefetchProofs(MultiProofTargetsV2),
/// Signals that all state updates have been received.
Expand Down
26 changes: 23 additions & 3 deletions crates/engine/tree/src/tree/payload_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::tree::{
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
instrumented_state::{InstrumentedStateProvider, StateProviderMetrics, StateProviderStats},
multiproof::{StateRootComputeOutcome, StateRootHandle},
payload_processor::PayloadProcessor,
payload_processor::{bal_storage_root_to_b256, PayloadProcessor},
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
types::{InsertPayloadResult, ValidationOutput},
CacheWaitDurations, CachedStateProvider, EngineApiMetrics, EngineApiTreeState, ExecutionEnv,
Expand All @@ -52,7 +52,11 @@ use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::{bal::DecodedBal, compute_block_access_list_hash, BlockAccessList};
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
use alloy_evm::Evm;
use alloy_primitives::{map::B256Set, B256};
use alloy_primitives::{
keccak256,
map::{B256Map, B256Set},
B256,
};
use reth_tasks::LazyHandle;
#[cfg(feature = "trie-debug")]
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
Expand Down Expand Up @@ -573,6 +577,8 @@ where
// Execute the block and handle any execution errors.
// The receipt root task is spawned before execution and receives receipts incrementally
// as transactions complete, allowing parallel computation during execution.
let is_bogota_active =
Into::<SpecId>::into(*env.evm_env.spec_id()).is_enabled_in(SpecId::BOGOTA);
let execute_block_start = Instant::now();
let execution_result = if parallel_bal_execution {
self.execute_block_bal(env, &input, &handle, &make_state_provider)
Expand Down Expand Up @@ -651,7 +657,7 @@ where
&output,
&mut ctx,
receipt_root_bloom,
built_bal
built_bal.clone()
),
block
);
Expand Down Expand Up @@ -770,6 +776,7 @@ where
provider_factory,
overlay_builder,
&hashed_state,
built_bal.as_ref().filter(|_| is_bogota_active),
) {
Ok(result) => {
let elapsed = root_time.elapsed();
Expand Down Expand Up @@ -1339,6 +1346,7 @@ where
provider_factory: P,
overlay_builder: OverlayBuilder<N>,
hashed_state: &LazyHashedPostState,
built_bal: Option<&BlockAccessList>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let hashed_state = hashed_state.get();
// The `hashed_state` argument will be taken into account as part of the overlay, but we
Expand All @@ -1349,9 +1357,21 @@ where
overlay_builder.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
let overlay_factory = OverlayStateProviderFactory::new(provider_factory, overlay_builder);
ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
.with_precomputed_storage_roots(Self::bal_storage_roots(built_bal))
.incremental_root_with_updates()
}

fn bal_storage_roots(built_bal: Option<&BlockAccessList>) -> B256Map<B256> {
built_bal
.into_iter()
.flat_map(|bal| bal.iter())
.filter_map(|account| {
bal_storage_root_to_b256(account.storage_root_value())
.map(|storage_root| (keccak256(account.address()), storage_root))
})
.collect()
}

/// Compute state root for the given hashed post state in serial.
///
/// Uses the same provider construction path as main execution and computes the state root and
Expand Down
Loading
Loading