Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion crates/apollo_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,10 +903,14 @@ impl Batcher {
)
.await?;

// Synced blocks are not executed locally, so no accessed keys are available; the block is
// committed via `CommitBlock`.
self.write_commitment_results_and_add_new_task(
height,
state_diff,
optional_state_diff_commitment,
#[cfg(feature = "os_input")]
None,
)
.await?;

Expand Down Expand Up @@ -971,7 +975,7 @@ impl Batcher {
.unzip();

#[cfg(feature = "os_input")]
self.write_block_accessed_keys(
let accessed_keys = self.write_block_accessed_keys(
height,
&block_execution_artifacts.commitment_state_diff,
&block_execution_artifacts.execution_data.proof_facts_block_numbers,
Expand All @@ -982,6 +986,8 @@ impl Batcher {
height,
state_diff.clone(), // TODO(Nimrod): Remove the clone here.
Some(state_diff_commitment),
#[cfg(feature = "os_input")]
Some(accessed_keys),
)
.await?;

Expand Down Expand Up @@ -1518,6 +1524,7 @@ impl Batcher {
height: BlockNumber,
state_diff: ThinStateDiff,
optional_state_diff_commitment: Option<StateDiffCommitment>,
#[cfg(feature = "os_input")] accessed_keys: Option<AccessedKeys>,
) -> BatcherResult<()> {
self.get_commitment_results_and_write_to_storage()?;
self.commitment_manager
Expand All @@ -1528,6 +1535,8 @@ impl Batcher {
&self.config.static_config.first_block_with_partial_block_hash,
self.storage_reader.clone(),
&mut self.storage_writer,
#[cfg(feature = "os_input")]
accessed_keys,
)
.await
.expect("The commitment offset unexpectedly doesn't match the given block height.");
Expand Down
6 changes: 6 additions & 0 deletions crates/apollo_batcher/src/batcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use apollo_mempool_types::communication::{
};
use apollo_mempool_types::mempool_types::CommitBlockArgs;
use apollo_state_sync_types::state_sync_types::SyncBlock;
#[cfg(feature = "os_input")]
use apollo_storage::accessed_keys::AccessedKeys;
use apollo_storage::db::DbError;
use apollo_storage::test_utils::get_test_storage;
use apollo_storage::{StorageError, StorageReader, StorageWriter};
Expand Down Expand Up @@ -1192,6 +1194,8 @@ async fn add_sync_block(
let mut storage_reader = MockBatcherStorageReader::new();
storage_reader.expect_state_diff_height().returning(move || Ok(block_number));
storage_reader.expect_global_root_height().returning(move || Ok(block_number));
#[cfg(feature = "os_input")]
storage_reader.expect_get_accessed_keys().returning(|_| Ok(Some(AccessedKeys::default())));

let mut storage_writer = MockBatcherStorageWriter::new();
storage_writer
Expand Down Expand Up @@ -1342,6 +1346,8 @@ async fn add_sync_block_for_first_new_block() {
storage_reader
.expect_global_root_height()
.returning(|| Ok(FIRST_BLOCK_NUMBER_WITH_PARTIAL_BLOCK_HASH));
#[cfg(feature = "os_input")]
storage_reader.expect_get_accessed_keys().returning(|_| Ok(Some(AccessedKeys::default())));
let mut mock_dependencies = MockDependencies { storage_reader, ..Default::default() };

// Expect setting the block hash for the last old block (i.e the parent of the first new block).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ use apollo_batcher_config::config::{
CommitmentManagerConfig,
FirstBlockWithPartialBlockHash,
};
#[cfg(feature = "os_input")]
use apollo_committer_types::committer_types::ReadPathsAndCommitBlockRequest;
use apollo_committer_types::committer_types::{
CommitBlockRequest,
CommitBlockResponse,
RevertBlockRequest,
};
use apollo_committer_types::communication::{CommitterRequestLabelValue, SharedCommitterClient};
#[cfg(feature = "os_input")]
use apollo_storage::accessed_keys::AccessedKeys as StorageAccessedKeys;
use lru::LruCache;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::block_hash::block_hash_calculator::{
Expand Down Expand Up @@ -91,6 +95,7 @@ impl<S: StateCommitterTrait> CommitmentManager<S> {

/// Adds a commitment task to the state committer. If the task height does not match the
/// task offset, an error is returned.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn add_commitment_task<
R: BatcherStorageReader + ?Sized,
W: BatcherStorageWriter + ?Sized,
Expand All @@ -102,6 +107,9 @@ impl<S: StateCommitterTrait> CommitmentManager<S> {
first_block_with_partial_block_hash: &Option<FirstBlockWithPartialBlockHash>,
storage_reader: Arc<R>,
storage_writer: &mut Box<W>,
// When present, the task issues `ReadPathsAndCommitBlock` to also fetch the Patricia
// witnesses; otherwise it falls back to `CommitBlock`.
#[cfg(feature = "os_input")] accessed_keys: Option<StorageAccessedKeys>,
) -> CommitmentManagerResult<()> {
if height != self.commitment_task_offset {
return Err(CommitmentManagerError::WrongCommitmentTaskHeight {
Expand All @@ -110,19 +118,28 @@ impl<S: StateCommitterTrait> CommitmentManager<S> {
state_diff_commitment,
});
}
let commitment_task_input = CommitterTaskInput::Commit(CommitBlockRequest {
height,
state_diff,
state_diff_commitment,
});
let commit_request = CommitBlockRequest { height, state_diff, state_diff_commitment };
#[cfg(feature = "os_input")]
let task_input = match accessed_keys {
Some(accessed_keys) => {
CommitterTaskInput::ReadPathsAndCommitBlock(ReadPathsAndCommitBlockRequest {
commit: commit_request,
accessed_keys,
})
}
None => CommitterTaskInput::Commit(commit_request),
};
#[cfg(not(feature = "os_input"))]
let task_input = CommitterTaskInput::Commit(commit_request);
let commit_label = task_input.task_type();
self.add_task_with_retries(
commitment_task_input,
task_input,
first_block_with_partial_block_hash,
storage_reader,
storage_writer,
)
.await?;
self.successfully_added_commitment_task(height, state_diff_commitment);
self.successfully_added_commitment_task(height, state_diff_commitment, commit_label);
Ok(())
}

Expand Down Expand Up @@ -320,11 +337,12 @@ impl<S: StateCommitterTrait> CommitmentManager<S> {
&mut self,
height: BlockNumber,
state_diff_commitment: Option<StateDiffCommitment>,
commit_label: CommitterRequestLabelValue,
) {
self.task_timer.start_timer(CommitterRequestLabelValue::CommitBlock, height);
self.task_timer.start_timer(commit_label, height);
debug!(
"Sent commitment task for block {height} and state diff {state_diff_commitment:?} to \
state committer."
"Sent {commit_label:?} task for block {height} and state diff \
{state_diff_commitment:?} to state committer."
);
self.increase_commitment_task_offset();
}
Expand Down Expand Up @@ -399,13 +417,22 @@ impl<S: StateCommitterTrait> CommitmentManager<S> {
Err(err) => panic!("Failed to read hash commitment for height {height}: {err}"),
}
};
// If accessed keys were persisted for this height, the task fetches the Patricia witnesses
// via `ReadPathsAndCommitBlock`; otherwise it falls back to `CommitBlock`.
#[cfg(feature = "os_input")]
let accessed_keys =
batcher_storage_reader.get_accessed_keys(height).unwrap_or_else(|err| {
panic!("Failed to read accessed keys for height {height}: {err}")
});
self.add_commitment_task(
height,
state_diff,
state_diff_commitment,
&batcher_config.static_config.first_block_with_partial_block_hash,
batcher_storage_reader,
storage_writer,
#[cfg(feature = "os_input")]
accessed_keys,
)
.await
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ use apollo_batcher_config::config::{
CommitmentManagerConfig,
FirstBlockWithPartialBlockHash,
};
#[cfg(feature = "os_input")]
use apollo_committer_types::committer_types::ReadPathsAndCommitBlockResponse;
use apollo_committer_types::committer_types::{CommitBlockResponse, RevertBlockResponse};
use apollo_committer_types::communication::MockCommitterClient;
#[cfg(feature = "os_input")]
use apollo_storage::accessed_keys::AccessedKeys;
use apollo_storage::StorageResult;
use assert_matches::assert_matches;
use mockall::predicate::eq;
Expand All @@ -24,6 +28,8 @@ use crate::commitment_manager::commitment_manager_impl::{
CommitmentManager,
};
use crate::commitment_manager::errors::CommitmentManagerError;
#[cfg(feature = "os_input")]
use crate::test_utils::empty_patricia_proofs;
use crate::test_utils::{
get_number_of_items_in_channel_from_receiver,
get_number_of_items_in_channel_from_sender,
Expand Down Expand Up @@ -59,8 +65,25 @@ fn mock_dependencies() -> MockDependencies {
committer_client.expect_revert_block().returning(|_| {
Box::pin(async { Ok(RevertBlockResponse::RevertedTo(GlobalRoot::default())) })
});
#[cfg(feature = "os_input")]
committer_client.expect_read_paths_and_commit_block().returning(|_| {
Box::pin(async {
Ok(ReadPathsAndCommitBlockResponse {
global_root: GlobalRoot::default(),
patricia_proofs: empty_patricia_proofs(),
})
})
});
#[cfg(not(feature = "os_input"))]
let storage_reader = MockBatcherStorageReader::new();
#[cfg(feature = "os_input")]
let storage_reader = {
let mut storage_reader = MockBatcherStorageReader::new();
storage_reader.expect_get_accessed_keys().returning(|_| Ok(Some(AccessedKeys::default())));
storage_reader
};
MockDependencies {
storage_reader: MockBatcherStorageReader::new(),
storage_reader,
storage_writer: Box::new(MockBatcherStorageWriter::new()),
batcher_config,
committer_client,
Expand Down Expand Up @@ -141,6 +164,8 @@ async fn fill_channels(
first_block_with_partial_block_hash,
storage_reader.clone(),
storage_writer,
#[cfg(feature = "os_input")]
Some(AccessedKeys::default()),
)
.await
.unwrap_or_else(|_| panic!("Failed to add commitment task with correct height."));
Expand All @@ -159,6 +184,8 @@ async fn fill_channels(
first_block_with_partial_block_hash,
storage_reader.clone(),
storage_writer,
#[cfg(feature = "os_input")]
Some(AccessedKeys::default()),
)
.await
.unwrap_or_else(|_| panic!("Failed to add commitment task with correct height."));
Expand All @@ -171,6 +198,8 @@ async fn fill_channels(
first_block_with_partial_block_hash,
storage_reader.clone(),
storage_writer,
#[cfg(feature = "os_input")]
Some(AccessedKeys::default()),
)
.await
.unwrap_or_else(|_| panic!("Failed to add commitment task with correct height."));
Expand Down Expand Up @@ -240,6 +269,64 @@ async fn test_add_missing_commitment_tasks(mut mock_dependencies: MockDependenci
assert_eq!(result.height, global_root_height);
}

/// When no accessed keys are stored for a height, the catch-up flow must fall back to `CommitBlock`
/// (not `ReadPathsAndCommitBlock`).
#[cfg(feature = "os_input")]
#[rstest]
#[tokio::test]
async fn test_add_missing_commitment_tasks_without_accessed_keys(
mut mock_dependencies: MockDependencies,
) {
let global_root_height = INITIAL_HEIGHT.prev().unwrap();
// Discard the fixture's catch-all expectations; mockall matches expectations in FIFO order,
// so they would shadow the diverging expectations this test sets below.
mock_dependencies.storage_reader.checkpoint();
mock_dependencies.committer_client.checkpoint();
mock_dependencies
.committer_client
.expect_commit_block()
.returning(|_| Box::pin(async { Ok(CommitBlockResponse::default()) }));
// The block has no stored accessed keys, so the witness-fetching endpoint must not be called.
mock_dependencies.committer_client.expect_read_paths_and_commit_block().never();

mock_dependencies
.storage_reader
.expect_global_root_height()
.returning(move || Ok(global_root_height));
mock_dependencies
.storage_reader
.expect_get_parent_hash_and_partial_block_hash_components()
.with(eq(global_root_height))
.returning(|height| get_dummy_parent_hash_and_partial_block_hash_components(&height));
mock_dependencies
.storage_reader
.expect_get_state_diff()
.with(eq(global_root_height))
.returning(|_| Ok(Some(test_state_diff())));
mock_dependencies
.storage_reader
.expect_get_accessed_keys()
.with(eq(global_root_height))
.returning(|_| Ok(None));

let batcher_config = mock_dependencies.batcher_config.clone();
let (mut commitment_manager, storage_reader, mut storage_writer) =
create_commitment_manager(mock_dependencies).await;

commitment_manager
.add_missing_commitment_tasks(
INITIAL_HEIGHT,
&batcher_config,
storage_reader,
&mut storage_writer,
)
.await;

let results = await_items(&mut commitment_manager.results_receiver, 1).await;
let result = results.first().unwrap().clone().expect_commitment();
assert_eq!(result.height, global_root_height);
}

#[rstest]
#[tokio::test]
async fn test_add_commitment_task(mut mock_dependencies: MockDependencies) {
Expand All @@ -260,6 +347,8 @@ async fn test_add_commitment_task(mut mock_dependencies: MockDependencies) {
&None,
storage_reader.clone(),
&mut storage_writer,
#[cfg(feature = "os_input")]
Some(AccessedKeys::default()),
)
.await;
assert_matches!(
Expand All @@ -282,6 +371,8 @@ async fn test_add_commitment_task(mut mock_dependencies: MockDependencies) {
&None,
storage_reader.clone(),
&mut storage_writer,
#[cfg(feature = "os_input")]
Some(AccessedKeys::default()),
)
.await
.unwrap_or_else(|_| panic!("Failed to add commitment task with correct height."));
Expand Down Expand Up @@ -330,6 +421,8 @@ async fn test_add_task_wait_for_full_channel(mut mock_dependencies: MockDependen
&None,
storage_reader.clone(),
&mut storage_writer,
#[cfg(feature = "os_input")]
Some(AccessedKeys::default()),
)
.await
.unwrap();
Expand Down Expand Up @@ -388,6 +481,8 @@ async fn test_add_task_panic_on_full_channel(mut mock_dependencies: MockDependen
&None,
storage_reader.clone(),
&mut storage_writer,
#[cfg(feature = "os_input")]
Some(AccessedKeys::default()),
)
.await
.expect("This call should panic.")
Expand Down Expand Up @@ -422,6 +517,8 @@ async fn test_get_commitment_results(mut mock_dependencies: MockDependencies) {
&None,
storage_reader.clone(),
&mut storage_writer,
#[cfg(feature = "os_input")]
Some(AccessedKeys::default()),
)
.await
.unwrap();
Expand All @@ -433,6 +530,8 @@ async fn test_get_commitment_results(mut mock_dependencies: MockDependencies) {
&None,
storage_reader,
&mut storage_writer,
#[cfg(feature = "os_input")]
Some(AccessedKeys::default()),
)
.await
.unwrap();
Expand Down Expand Up @@ -461,6 +560,8 @@ async fn add_commitments_and_revert_tasks(
&None,
storage_reader.clone(),
storage_writer,
#[cfg(feature = "os_input")]
Some(AccessedKeys::default()),
)
.await
.unwrap();
Expand Down
Loading
Loading