diff --git a/src/lean_spec/subspecs/api/endpoints/__init__.py b/src/lean_spec/subspecs/api/endpoints/__init__.py index 02da1aa01..78ef5b6ae 100644 --- a/src/lean_spec/subspecs/api/endpoints/__init__.py +++ b/src/lean_spec/subspecs/api/endpoints/__init__.py @@ -1,12 +1 @@ """API endpoint specifications.""" - -from . import aggregator, checkpoints, fork_choice, health, metrics, states - -__all__ = [ - "aggregator", - "checkpoints", - "fork_choice", - "health", - "metrics", - "states", -] diff --git a/src/lean_spec/subspecs/api/endpoints/blocks.py b/src/lean_spec/subspecs/api/endpoints/blocks.py deleted file mode 100644 index 001485b03..000000000 --- a/src/lean_spec/subspecs/api/endpoints/blocks.py +++ /dev/null @@ -1,56 +0,0 @@ -"""Blocks endpoint handlers.""" - -from __future__ import annotations - -import asyncio -import logging - -from aiohttp import web - -logger = logging.getLogger(__name__) - - -async def handle_finalized(request: web.Request) -> web.Response: - """ - Handle finalized signed block request. - - Returns the SignedBlock matching ``store.latest_finalized.root`` as raw - SSZ bytes (not snappy compressed). - - Together with ``/lean/v0/states/finalized`` this lets a checkpoint-syncing - node obtain the ``(state, signed_block)`` pair required by - ``Store.create_store`` (which asserts - ``anchor_block.state_root == hash_tree_root(state)`` and seeds - ``store.blocks[anchor_root] = anchor_block``). - - Response: SSZ-encoded SignedBlock (binary, application/octet-stream) - - Status Codes: - 200 OK: SignedBlock returned successfully. - 404 Not Found: Finalized signed block not available on this node - (e.g. server retains only ``Block`` and not ``SignedBlock``). - 503 Service Unavailable: Store / signed-block source not initialized. - """ - signed_block_getter = request.app.get("signed_block_getter") - store_getter = request.app.get("store_getter") - store = store_getter() if store_getter else None - - if store is None: - raise web.HTTPServiceUnavailable(reason="Store not initialized") - - if signed_block_getter is None: - raise web.HTTPServiceUnavailable(reason="Signed block source not configured") - - finalized_root = store.latest_finalized.root - signed_block = signed_block_getter(finalized_root) - - if signed_block is None: - raise web.HTTPNotFound(reason="Finalized signed block not available") - - try: - ssz_bytes = await asyncio.to_thread(signed_block.encode_bytes) - except Exception as e: - logger.error("Failed to encode signed block: %s", e) - raise web.HTTPInternalServerError(reason="Encoding failed") from e - - return web.Response(body=ssz_bytes, content_type="application/octet-stream") diff --git a/src/lean_spec/subspecs/api/routes.py b/src/lean_spec/subspecs/api/routes.py index 094d706b6..ffdc7f8f7 100644 --- a/src/lean_spec/subspecs/api/routes.py +++ b/src/lean_spec/subspecs/api/routes.py @@ -6,7 +6,7 @@ from aiohttp import web -from .endpoints import aggregator, blocks, checkpoints, fork_choice, health, metrics, states +from .endpoints import aggregator, checkpoints, fork_choice, health, metrics, states Handler = Callable[[web.Request], Awaitable[web.Response]] """Type alias for aiohttp request handlers.""" @@ -14,7 +14,6 @@ ROUTES: dict[str, Handler] = { "/lean/v0/health": health.handle, "/lean/v0/states/finalized": states.handle_finalized, - "/lean/v0/blocks/finalized": blocks.handle_finalized, "/lean/v0/checkpoints/justified": checkpoints.handle_justified, "/lean/v0/fork_choice": fork_choice.handle, "/metrics": metrics.handle, diff --git a/src/lean_spec/subspecs/api/server.py b/src/lean_spec/subspecs/api/server.py index 040d19391..9842fd8b0 100644 --- a/src/lean_spec/subspecs/api/server.py +++ b/src/lean_spec/subspecs/api/server.py @@ -14,8 +14,7 @@ from aiohttp import web -from lean_spec.forks import LstarSpec, SignedBlock, Store -from lean_spec.types import Bytes32 +from lean_spec.forks import LstarSpec, Store from .aggregator_controller import AggregatorController from .routes import ADMIN_ROUTES, ROUTES @@ -72,17 +71,6 @@ class ApiServer: store_getter: Callable[[], Store | None] | None = None """Callable that returns the current Store instance.""" - signed_block_getter: Callable[[Bytes32], SignedBlock | None] | None = None - """ - Callable that returns the SignedBlock for a given block root, if available. - - Used by ``/lean/v0/blocks/finalized`` to serve the anchor block alongside - the finalized state for checkpoint sync. Implementations must retain the - SignedBlock for at least ``store.latest_finalized.root``; ``Store.blocks`` - holds only unsigned ``Block`` objects, so a separate signed-block source - is required (e.g. a long-lived anchor cache or a SignedBlock-aware store). - """ - aggregator_controller: AggregatorController | None = None """ Optional controller for toggling the aggregator role at runtime. @@ -113,10 +101,6 @@ async def start(self) -> None: # Store the store_getter in app for handlers that need store access app["store_getter"] = self.store_getter - # Expose the signed-block lookup for endpoints serving SignedBlocks - # (e.g. /lean/v0/blocks/finalized for checkpoint-sync anchor block). - app["signed_block_getter"] = self.signed_block_getter - # Expose the fork spec for handlers that drive consensus computations. app["spec"] = self.spec diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 05a6cda57..69d689643 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -514,7 +514,7 @@ async def _log_justified_finalized_periodically(self) -> None: break store = self.sync_service.store peers_connected = sum( - 1 for p in self.sync_service.peer_manager.get_all_peers() if p.is_connected() + 1 for p in self.sync_service.peer_manager.peers.values() if p.is_connected() ) metrics.lean_current_slot.set(self.clock.current_slot()) metrics.lean_connected_peers.set(peers_connected) diff --git a/src/lean_spec/subspecs/storage/database.py b/src/lean_spec/subspecs/storage/database.py index 5dd609575..2bf9012bd 100644 --- a/src/lean_spec/subspecs/storage/database.py +++ b/src/lean_spec/subspecs/storage/database.py @@ -57,18 +57,6 @@ def put_block(self, block: SpecBlockType, root: Bytes32) -> None: """ ... - def has_block(self, root: Bytes32) -> bool: - """ - Check if a block exists in storage. - - Args: - root: SSZ hash tree root of the block. - - Returns: - True if block exists. - """ - ... - # State Operations def get_state(self, root: Bytes32) -> SpecStateType | None: @@ -93,18 +81,6 @@ def put_state(self, state: SpecStateType, root: Bytes32) -> None: """ ... - def has_state(self, root: Bytes32) -> bool: - """ - Check if a state exists in storage. - - Args: - root: Block root hash associated with the state. - - Returns: - True if state exists. - """ - ... - # Checkpoint Operations def get_justified_checkpoint(self) -> Checkpoint | None: @@ -237,15 +213,6 @@ def put_genesis_time(self, genesis_time: Uint64) -> None: # Transaction Control - def commit(self) -> None: - """ - Commit pending writes to durable storage. - - All writes via put_* methods are buffered until commit() or batch_write(). - Callers must explicitly commit after writes. - """ - ... - @contextmanager def batch_write(self) -> Generator[None]: """ diff --git a/src/lean_spec/subspecs/storage/sqlite.py b/src/lean_spec/subspecs/storage/sqlite.py index 09d199af3..34b12a75a 100644 --- a/src/lean_spec/subspecs/storage/sqlite.py +++ b/src/lean_spec/subspecs/storage/sqlite.py @@ -166,23 +166,6 @@ def put_block(self, block: SpecBlockType, root: Bytes32) -> None: except sqlite3.Error as e: raise StorageWriteError(f"Failed to write block {root.hex()}: {e}") from e - def has_block(self, root: Bytes32) -> bool: - """Check if a block exists in storage.""" - try: - cursor = self._conn.cursor() - - # SELECT 1 is an existence check optimization. - # - # We only care whether a row exists, not its contents. - # This avoids deserializing potentially large SSZ data. - cursor.execute( - f"SELECT 1 FROM {BLOCKS.TABLE_NAME} WHERE root = ?", - (bytes(root),), - ) - return cursor.fetchone() is not None - except sqlite3.Error as e: - raise StorageReadError(f"Failed to check block existence {root.hex()}: {e}") from e - # State Operations # @@ -231,20 +214,6 @@ def put_state(self, state: SpecStateType, root: Bytes32) -> None: except sqlite3.Error as e: raise StorageWriteError(f"Failed to write state for block {root.hex()}: {e}") from e - def has_state(self, root: Bytes32) -> bool: - """Check if a state exists in storage.""" - try: - cursor = self._conn.cursor() - cursor.execute( - f"SELECT 1 FROM {STATES.TABLE_NAME} WHERE root = ?", - (bytes(root),), - ) - return cursor.fetchone() is not None - except sqlite3.Error as e: - raise StorageReadError( - f"Failed to check state existence for block {root.hex()}: {e}" - ) from e - # Checkpoint Operations # @@ -501,13 +470,6 @@ def put_genesis_time(self, genesis_time: Uint64) -> None: # Transaction Control - def commit(self) -> None: - """Commit pending writes to durable storage.""" - try: - self._conn.commit() - except sqlite3.Error as e: - raise StorageWriteError(f"Failed to commit: {e}") from e - @contextmanager def batch_write(self) -> Generator[None]: """ diff --git a/src/lean_spec/subspecs/sync/checkpoint_sync.py b/src/lean_spec/subspecs/sync/checkpoint_sync.py index cdea0e52f..1a6668b6a 100644 --- a/src/lean_spec/subspecs/sync/checkpoint_sync.py +++ b/src/lean_spec/subspecs/sync/checkpoint_sync.py @@ -23,7 +23,7 @@ import httpx -from lean_spec.forks import SignedBlock, State +from lean_spec.forks import State from lean_spec.subspecs.chain.config import VALIDATOR_REGISTRY_LIMIT from lean_spec.subspecs.ssz.hash import hash_tree_root @@ -35,16 +35,6 @@ FINALIZED_STATE_ENDPOINT: Final = "/lean/v0/states/finalized" """API endpoint for fetching finalized state. Follows Beacon API conventions.""" -FINALIZED_BLOCK_ENDPOINT: Final = "/lean/v0/blocks/finalized" -"""API endpoint for fetching the SignedBlock matching the finalized state. - -``Store.create_store`` requires both the finalized state and the anchor block -(it asserts ``anchor_block.state_root == hash_tree_root(state)`` and seeds -``store.blocks[anchor_root] = anchor_block``). The state endpoint alone is -insufficient; this endpoint ships the matching SignedBlock so the pair can -be obtained atomically via two requests. -""" - class CheckpointSyncError(Exception): """ @@ -113,98 +103,6 @@ async def fetch_finalized_state(url: str, state_class: type[State]) -> State: raise CheckpointSyncError(f"Failed to fetch state: {e}") from e -async def fetch_finalized_block(url: str) -> SignedBlock: - """ - Fetch the SignedBlock matching the finalized state via checkpoint sync. - - Retrieves the SSZ-encoded SignedBlock at ``store.latest_finalized.root``. - The caller is responsible for verifying that - ``signed_block.block.state_root == hash_tree_root(state)`` before passing - the pair to ``Store.create_store``. - - Args: - url: Base URL of the node API (e.g., "http://localhost:5052"). - - Returns: - The finalized SignedBlock object. - - Raises: - CheckpointSyncError: If the request fails or block bytes are invalid. - """ - base_url = url.rstrip("/") - full_url = f"{base_url}{FINALIZED_BLOCK_ENDPOINT}" - - logger.info("Fetching finalized signed block from %s", full_url) - - headers = {"Accept": "application/octet-stream"} - - try: - async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: - response = await client.get(full_url, headers=headers) - response.raise_for_status() - - ssz_data = response.content - logger.info("Downloaded %d bytes of SSZ signed block data", len(ssz_data)) - - signed_block = SignedBlock.decode_bytes(ssz_data) - logger.info( - "Deserialized signed block at slot %s", - signed_block.block.slot, - ) - - return signed_block - - except httpx.RequestError as exc: - raise CheckpointSyncError( - f"Network error while connecting to {exc.request.url}: {exc}" - ) from exc - except httpx.HTTPStatusError as exc: - raise CheckpointSyncError( - f"HTTP error {exc.response.status_code}: {exc.response.text[:200]}" - ) from exc - except Exception as e: - raise CheckpointSyncError(f"Failed to fetch signed block: {e}") from e - - -async def fetch_finalized_anchor(url: str, state_class: type[State]) -> tuple[State, SignedBlock]: - """ - Fetch the ``(state, signed_block)`` pair required by ``Store.create_store``. - - Issues two requests in sequence: ``/lean/v0/states/finalized`` then - ``/lean/v0/blocks/finalized``. Both endpoints serve the snapshot at - ``store.latest_finalized.root``; on the server side these reads are - expected to be consistent against a single finalized checkpoint. - - Verifies the block / state pairing before returning. If the server - advanced finalization between the two requests the pairing assertion - will fail and the caller should retry. - - Args: - url: Base URL of the node API. - state_class: State class used to decode the state SSZ bytes. - - Returns: - Tuple of (finalized state, finalized signed block). - - Raises: - CheckpointSyncError: If either fetch fails, or the block's - ``state_root`` does not equal ``hash_tree_root(state)``. - """ - state = await fetch_finalized_state(url, state_class) - signed_block = await fetch_finalized_block(url) - - expected_state_root = hash_tree_root(state) - if signed_block.block.state_root != expected_state_root: - raise CheckpointSyncError( - "Anchor block / state mismatch: " - f"signed_block.block.state_root={signed_block.block.state_root.hex()} " - f"hash_tree_root(state)={expected_state_root.hex()}. " - "Server may have advanced finalization between requests; retry." - ) - - return state, signed_block - - def verify_checkpoint_state(state: State) -> bool: """ Verify that a checkpoint state is structurally valid. diff --git a/src/lean_spec/subspecs/sync/peer_manager.py b/src/lean_spec/subspecs/sync/peer_manager.py index da679e4bd..b6cd6286b 100644 --- a/src/lean_spec/subspecs/sync/peer_manager.py +++ b/src/lean_spec/subspecs/sync/peer_manager.py @@ -88,38 +88,34 @@ class PeerManager: Tracks peer chain status and provides peer selection for block requests. """ - _peers: dict[PeerId, SyncPeer] = field(default_factory=dict) - """Mapping of peer ID to SyncPeer.""" + peers: dict[PeerId, SyncPeer] = field(default_factory=dict) + """Mapping of peer ID to SyncPeer. Public so callers can iterate or look up directly.""" def __len__(self) -> int: """Return the number of tracked peers.""" - return len(self._peers) + return len(self.peers) def __contains__(self, peer_id: PeerId) -> bool: """Check if a peer is being tracked.""" - return peer_id in self._peers + return peer_id in self.peers def add_peer(self, info: PeerInfo) -> SyncPeer: """Register a new peer or update existing.""" - if info.peer_id in self._peers: - self._peers[info.peer_id].info = info - return self._peers[info.peer_id] + if info.peer_id in self.peers: + self.peers[info.peer_id].info = info + return self.peers[info.peer_id] sync_peer = SyncPeer(info=info) - self._peers[info.peer_id] = sync_peer + self.peers[info.peer_id] = sync_peer return sync_peer def remove_peer(self, peer_id: PeerId) -> SyncPeer | None: """Remove a peer from tracking.""" - return self._peers.pop(peer_id, None) - - def get_peer(self, peer_id: PeerId) -> SyncPeer | None: - """Get a tracked peer by ID.""" - return self._peers.get(peer_id) + return self.peers.pop(peer_id, None) def update_status(self, peer_id: PeerId, status: Status) -> None: """Update a peer's chain status.""" - peer = self._peers.get(peer_id) + peer = self.peers.get(peer_id) if peer is not None: peer.status = status @@ -137,7 +133,7 @@ def select_peer_for_request(self, min_slot: Slot | None = None) -> SyncPeer | No An available SyncPeer, or None if no suitable peer exists. """ candidates: list[SyncPeer] = [] - for peer in self._peers.values(): + for peer in self.peers.values(): if not peer.is_available(): continue if min_slot is not None and not peer.has_slot(min_slot): @@ -159,7 +155,7 @@ def get_network_finalized_slot(self) -> Slot | None: """ slots = ( peer.status.finalized.slot - for peer in self._peers.values() + for peer in self.peers.values() if peer.status is not None and peer.is_connected() ) counter = Counter(slots) @@ -169,18 +165,14 @@ def get_network_finalized_slot(self) -> Slot | None: def on_request_success(self, peer_id: PeerId) -> None: """Record a successful request to a peer.""" - peer = self._peers.get(peer_id) + peer = self.peers.get(peer_id) if peer is not None: peer.on_request_complete() peer.score = min(peer.score + SCORE_SUCCESS_BONUS, MAX_PEER_SCORE) def on_request_failure(self, peer_id: PeerId) -> None: """Record a failed request to a peer.""" - peer = self._peers.get(peer_id) + peer = self.peers.get(peer_id) if peer is not None: peer.on_request_complete() peer.score = max(peer.score - SCORE_FAILURE_PENALTY, MIN_PEER_SCORE) - - def get_all_peers(self) -> list[SyncPeer]: - """Get all tracked peers.""" - return list(self._peers.values()) diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 66feed21b..37c7280d8 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -48,36 +48,6 @@ async def _noop_publish_agg(signed_attestation: SignedAggregatedAttestation) -> """No-op default for aggregated attestation publishing.""" -@dataclass(slots=True) -class SyncProgress: - """ - Current synchronization progress. - - Provides a snapshot of sync state for monitoring and logging. - """ - - state: SyncState - """Current sync state machine state.""" - - local_head_slot: Slot | None = None - """Slot of our current chain head.""" - - network_finalized_slot: Slot | None = None - """Network consensus on finalized slot (mode of peer reports).""" - - blocks_processed: int = 0 - """Total blocks integrated into Store this session.""" - - peers_connected: int = 0 - """Number of connected peers with status.""" - - cache_size: int = 0 - """Number of blocks in pending cache.""" - - orphan_count: int = 0 - """Number of orphan blocks awaiting parents.""" - - @dataclass(slots=True) class SyncService: """Central coordinator for the sync state machine.""" @@ -341,21 +311,6 @@ def head_slot(self) -> Slot: """Return the slot of the current canonical head.""" return self.store.blocks[self.store.head].slot - def get_progress(self) -> SyncProgress: - """Snapshot the current sync state for monitoring and logging.""" - # Mode of peer-reported finalized slots, or None if too few peers reported. - network_slot = self.peer_manager.get_network_finalized_slot() - - return SyncProgress( - state=self.state, - local_head_slot=self.head_slot(), - network_finalized_slot=network_slot, - blocks_processed=self._blocks_processed, - peers_connected=sum(1 for p in self.peer_manager.get_all_peers() if p.is_connected()), - cache_size=len(self.block_cache), - orphan_count=self.block_cache.orphan_count, - ) - async def on_peer_status(self, peer_id: PeerId, status: Status) -> None: """Record a peer's chain status and move to SYNCING if needed.""" self.peer_manager.update_status(peer_id, status) @@ -778,17 +733,3 @@ async def _transition_to(self, new_state: SyncState) -> None: raise ValueError(f"Invalid state transition: {self.state.name} -> {new_state.name}") self.state = new_state - - def reset(self) -> None: - """Return to IDLE and clear caches, counters, and sub-component state.""" - self.state = SyncState.IDLE - self._blocks_processed = 0 - - # Cached blocks may be stale or invalid after a reset. - self.block_cache.clear() - - # Sub-components hold pending requests and orphan trackers that must clear too. - if self._backfill is not None: - self._backfill.reset() - if self._head_sync is not None: - self._head_sync.reset() diff --git a/tests/api/conftest.py b/tests/api/conftest.py index 8dbf208aa..6f9a34c40 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -9,10 +9,7 @@ import httpx import pytest -from lean_spec.forks import SignedBlock from lean_spec.subspecs.api import AggregatorController, ApiServer, ApiServerConfig -from lean_spec.subspecs.ssz.hash import hash_tree_root -from lean_spec.types import ByteList512KiB, Bytes32 from tests.lean_spec.helpers.builders import make_genesis_data # Default port for auto-started local server @@ -73,25 +70,11 @@ def _create_server(self) -> ApiServer: ) store = genesis.store - # Wrap the genesis Block in a SignedBlock so the - # /lean/v0/blocks/finalized endpoint has something to serve. Real - # implementations are responsible for retaining the SignedBlock for - # at least the finalized root; see ApiServer.signed_block_getter. - anchor_signed_block = SignedBlock( - block=genesis.block, - proof=ByteList512KiB(data=b""), - ) - anchor_root = hash_tree_root(genesis.block) - - def signed_block_lookup(root: Bytes32) -> SignedBlock | None: - return anchor_signed_block if root == anchor_root else None - controller = _make_conformance_controller(initial=False) config = ApiServerConfig(host="127.0.0.1", port=self.port) return ApiServer( config=config, store_getter=lambda: store, - signed_block_getter=signed_block_lookup, aggregator_controller=controller, ) diff --git a/tests/api/endpoints/test_blocks.py b/tests/api/endpoints/test_blocks.py deleted file mode 100644 index 9a7845adf..000000000 --- a/tests/api/endpoints/test_blocks.py +++ /dev/null @@ -1,76 +0,0 @@ -"""Tests for the blocks endpoints.""" - -import httpx - -from lean_spec.forks import SignedBlock - - -def get_finalized_block(server_url: str) -> httpx.Response: - """Fetch the finalized signed block from the server.""" - return httpx.get( - f"{server_url}/lean/v0/blocks/finalized", - headers={"Accept": "application/octet-stream"}, - ) - - -class TestFinalizedBlock: - """Tests for the /lean/v0/blocks/finalized endpoint.""" - - def test_returns_200(self, server_url: str) -> None: - """Finalized block endpoint returns 200 status code.""" - response = get_finalized_block(server_url) - assert response.status_code == 200 - - def test_content_type_is_octet_stream(self, server_url: str) -> None: - """Finalized block endpoint returns octet-stream content type.""" - response = get_finalized_block(server_url) - content_type = response.headers.get("content-type", "") - assert "application/octet-stream" in content_type - - def test_ssz_deserializes(self, server_url: str) -> None: - """Finalized block SSZ bytes deserialize to a valid SignedBlock object.""" - response = get_finalized_block(server_url) - signed_block = SignedBlock.decode_bytes(response.content) - assert signed_block is not None - - def test_block_root_matches_finalized_checkpoint(self, server_url: str) -> None: - """Returned block's hash_tree_root matches the store's finalized root. - - This is the protocol-level invariant the endpoint exists to guarantee: - a checkpoint-syncing client must be able to seed - ``Store.create_store(state, anchor_block)`` such that - ``hash_tree_root(anchor_block) == store.latest_finalized.root`` on the - source node. - """ - from lean_spec.subspecs.ssz.hash import hash_tree_root - - block_response = get_finalized_block(server_url) - signed_block = SignedBlock.decode_bytes(block_response.content) - - checkpoint_response = httpx.get(f"{server_url}/lean/v0/checkpoints/justified") - # Justified checkpoint is the seed checkpoint at genesis-only test node; - # finalized checkpoint exposed via /states/finalized comparison instead. - assert checkpoint_response.status_code == 200 - - block_root = hash_tree_root(signed_block.block) - assert block_root is not None - - def test_state_root_matches_finalized_state(self, server_url: str) -> None: - """Returned block's ``state_root`` equals ``hash_tree_root(state)``. - - This is the assertion made by ``Store.create_store``; if it fails the - ``(state, signed_block)`` pair cannot be used to bootstrap the store. - """ - from lean_spec.forks import State - from lean_spec.subspecs.ssz.hash import hash_tree_root - - block_response = get_finalized_block(server_url) - signed_block = SignedBlock.decode_bytes(block_response.content) - - state_response = httpx.get( - f"{server_url}/lean/v0/states/finalized", - headers={"Accept": "application/octet-stream"}, - ) - state = State.decode_bytes(state_response.content) - - assert signed_block.block.state_root == hash_tree_root(state) diff --git a/tests/lean_spec/subspecs/storage/test_sqlite.py b/tests/lean_spec/subspecs/storage/test_sqlite.py index 700afdf6a..3fcbc2449 100644 --- a/tests/lean_spec/subspecs/storage/test_sqlite.py +++ b/tests/lean_spec/subspecs/storage/test_sqlite.py @@ -38,8 +38,8 @@ class TestBlockOperations: def test_put_and_get_block(self, db: SQLiteDatabase, genesis_block: Block) -> None: """Block can be stored and retrieved by root.""" root = hash_tree_root(genesis_block) - db.put_block(genesis_block, root) - db.commit() + with db.batch_write(): + db.put_block(genesis_block, root) assert db.get_block(root) == genesis_block @@ -48,15 +48,6 @@ def test_get_nonexistent_block(self, db: SQLiteDatabase) -> None: fake_root = Bytes32(b"\x01" * 32) assert db.get_block(fake_root) is None - def test_has_block(self, db: SQLiteDatabase, genesis_block: Block) -> None: - """has_block returns correct existence status.""" - root = hash_tree_root(genesis_block) - - assert not db.has_block(root) - db.put_block(genesis_block, root) - db.commit() - assert db.has_block(root) - def test_put_block_overwrites(self, db: SQLiteDatabase, genesis_state: State) -> None: """Putting a block with same root overwrites previous.""" block1 = Block( @@ -68,11 +59,11 @@ def test_put_block_overwrites(self, db: SQLiteDatabase, genesis_state: State) -> ) root = hash_tree_root(block1) - db.put_block(block1, root) - db.put_block(block1, root) # Same block again - db.commit() + with db.batch_write(): + db.put_block(block1, root) + db.put_block(block1, root) # Same block again - assert db.has_block(root) + assert db.get_block(root) == block1 class TestStateOperations: @@ -81,8 +72,8 @@ class TestStateOperations: def test_put_and_get_state(self, db: SQLiteDatabase, genesis_state: State) -> None: """State can be stored and retrieved by block root.""" root = hash_tree_root(genesis_state) - db.put_state(genesis_state, root) - db.commit() + with db.batch_write(): + db.put_state(genesis_state, root) assert db.get_state(root) == genesis_state @@ -91,15 +82,6 @@ def test_get_nonexistent_state(self, db: SQLiteDatabase) -> None: fake_root = Bytes32(b"\x02" * 32) assert db.get_state(fake_root) is None - def test_has_state(self, db: SQLiteDatabase, genesis_state: State) -> None: - """has_state returns correct existence status.""" - root = hash_tree_root(genesis_state) - - assert not db.has_state(root) - db.put_state(genesis_state, root) - db.commit() - assert db.has_state(root) - class TestCheckpointOperations: """Tests for checkpoint storage operations.""" @@ -107,16 +89,16 @@ class TestCheckpointOperations: def test_put_and_get_justified_checkpoint(self, db: SQLiteDatabase) -> None: """Justified checkpoint can be stored and retrieved.""" checkpoint = Checkpoint(root=Bytes32(b"\x03" * 32), slot=Slot(10)) - db.put_justified_checkpoint(checkpoint) - db.commit() + with db.batch_write(): + db.put_justified_checkpoint(checkpoint) assert db.get_justified_checkpoint() == checkpoint def test_put_and_get_finalized_checkpoint(self, db: SQLiteDatabase) -> None: """Finalized checkpoint can be stored and retrieved.""" checkpoint = Checkpoint(root=Bytes32(b"\x04" * 32), slot=Slot(5)) - db.put_finalized_checkpoint(checkpoint) - db.commit() + with db.batch_write(): + db.put_finalized_checkpoint(checkpoint) assert db.get_finalized_checkpoint() == checkpoint @@ -132,8 +114,8 @@ class TestHeadTracking: def test_put_and_get_head_root(self, db: SQLiteDatabase) -> None: """Head root can be stored and retrieved.""" head_root = Bytes32(b"\x08" * 32) - db.put_head_root(head_root) - db.commit() + with db.batch_write(): + db.put_head_root(head_root) assert db.get_head_root() == head_root @@ -146,12 +128,12 @@ def test_head_root_updates(self, db: SQLiteDatabase) -> None: root1 = Bytes32(b"\x09" * 32) root2 = Bytes32(b"\x0a" * 32) - db.put_head_root(root1) - db.commit() + with db.batch_write(): + db.put_head_root(root1) assert db.get_head_root() == root1 - db.put_head_root(root2) - db.commit() + with db.batch_write(): + db.put_head_root(root2) assert db.get_head_root() == root2 @@ -162,8 +144,8 @@ def test_put_and_get_block_root_by_slot(self, db: SQLiteDatabase) -> None: """Block root can be stored and retrieved by slot.""" slot = Slot(100) root = Bytes32(b"\x0b" * 32) - db.put_block_root_by_slot(slot, root) - db.commit() + with db.batch_write(): + db.put_block_root_by_slot(slot, root) assert db.get_block_root_by_slot(slot) == root @@ -177,13 +159,13 @@ def test_slot_index_reorg(self, db: SQLiteDatabase) -> None: root_a = Bytes32(b"\x0b" * 32) root_b = Bytes32(b"\x0c" * 32) - db.put_block_root_by_slot(slot, root_a) - db.commit() + with db.batch_write(): + db.put_block_root_by_slot(slot, root_a) assert db.get_block_root_by_slot(slot) == root_a # Reorg: overwrite with different root at same slot - db.put_block_root_by_slot(slot, root_b) - db.commit() + with db.batch_write(): + db.put_block_root_by_slot(slot, root_b) assert db.get_block_root_by_slot(slot) == root_b @@ -195,8 +177,8 @@ def test_put_and_get_block_root_by_state_root(self, db: SQLiteDatabase) -> None: state_root = Bytes32(b"\x0d" * 32) block_root = Bytes32(b"\x0e" * 32) - db.put_block_root_by_state_root(state_root, block_root) - db.commit() + with db.batch_write(): + db.put_block_root_by_state_root(state_root, block_root) assert db.get_block_root_by_state_root(state_root) == block_root @@ -211,8 +193,8 @@ class TestGenesisTime: def test_put_and_get_genesis_time(self, db: SQLiteDatabase) -> None: """Genesis time can be stored and retrieved.""" genesis_time = Uint64(1606824023) - db.put_genesis_time(genesis_time) - db.commit() + with db.batch_write(): + db.put_genesis_time(genesis_time) assert db.get_genesis_time() == genesis_time @@ -222,8 +204,8 @@ def test_get_nonexistent_genesis_time(self, db: SQLiteDatabase) -> None: def test_genesis_time_zero(self, db: SQLiteDatabase) -> None: """Genesis time of zero round-trips correctly.""" - db.put_genesis_time(Uint64(0)) - db.commit() + with db.batch_write(): + db.put_genesis_time(Uint64(0)) assert db.get_genesis_time() == Uint64(0) @@ -246,8 +228,8 @@ def test_batch_write_commits_all(self, db: SQLiteDatabase) -> None: def test_batch_write_rolls_back_on_exception(self, db: SQLiteDatabase) -> None: """Writes are rolled back when an exception occurs within batch.""" root = Bytes32(b"\x12" * 32) - db.put_head_root(root) - db.commit() + with db.batch_write(): + db.put_head_root(root) with pytest.raises(ValueError, match="intentional"): with db.batch_write(): @@ -286,8 +268,8 @@ def test_batch_write_with_block_and_state( def test_batch_write_rolls_back_storage_write_error(self, db: SQLiteDatabase) -> None: """StorageWriteError within batch triggers rollback.""" root = Bytes32(b"\x15" * 32) - db.put_head_root(root) - db.commit() + with db.batch_write(): + db.put_head_root(root) with pytest.raises(StorageWriteError): with db.batch_write(): @@ -532,8 +514,8 @@ def test_context_manager(self) -> None: """Database works as context manager.""" with SQLiteDatabase(":memory:", State, Block) as db: root = Bytes32(b"\x0c" * 32) - db.put_head_root(root) - db.commit() + with db.batch_write(): + db.put_head_root(root) assert db.get_head_root() == root def test_close_is_idempotent(self, db: SQLiteDatabase) -> None: diff --git a/tests/lean_spec/subspecs/sync/test_backfill_sync.py b/tests/lean_spec/subspecs/sync/test_backfill_sync.py index 671bee094..5fbd333e7 100644 --- a/tests/lean_spec/subspecs/sync/test_backfill_sync.py +++ b/tests/lean_spec/subspecs/sync/test_backfill_sync.py @@ -293,7 +293,7 @@ async def test_empty_response_does_not_leak_requests_in_flight( unknown_root = Bytes32(b"\xff" * 32) await backfill.fill_missing([unknown_root]) - peer = manager.get_peer(peer_id) + peer = manager.peers.get(peer_id) assert peer == SyncPeer( info=info, requests_in_flight=0, @@ -481,7 +481,7 @@ async def test_failed_range_does_not_advance_watermark( # Watermark unchanged: a future call covering the same range can retry. assert backfill_system._max_range_slot == Slot(0) # The peer recorded a failure (score decreased). - peer = backfill_system.peer_manager.get_peer(peer_id) + peer = backfill_system.peer_manager.peers.get(peer_id) assert peer is not None assert peer.score == INITIAL_PEER_SCORE - SCORE_FAILURE_PENALTY diff --git a/tests/lean_spec/subspecs/sync/test_checkpoint_sync.py b/tests/lean_spec/subspecs/sync/test_checkpoint_sync.py index e3fb709c3..4903e5589 100644 --- a/tests/lean_spec/subspecs/sync/test_checkpoint_sync.py +++ b/tests/lean_spec/subspecs/sync/test_checkpoint_sync.py @@ -7,25 +7,17 @@ import httpx import pytest -from lean_spec.forks import ( - Block, - SignedBlock, -) from lean_spec.forks.lstar import State, Store from lean_spec.forks.lstar.containers.state import Validators from lean_spec.subspecs.api import ApiServer, ApiServerConfig from lean_spec.subspecs.chain.config import VALIDATOR_REGISTRY_LIMIT -from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.sync.checkpoint_sync import ( - FINALIZED_BLOCK_ENDPOINT, FINALIZED_STATE_ENDPOINT, CheckpointSyncError, - fetch_finalized_anchor, - fetch_finalized_block, fetch_finalized_state, verify_checkpoint_state, ) -from lean_spec.types import ByteList512KiB, Bytes32, Slot +from lean_spec.types import Slot class _MockTransport(httpx.AsyncBaseTransport): @@ -228,138 +220,3 @@ async def test_client_fetches_and_deserializes_state(self, base_store: Store) -> finally: await server.aclose() - - -def _wrap_as_signed_block(block: Block) -> SignedBlock: - """Build a SignedBlock around a Block using an empty proof envelope. - - The spec retains only Block in Store; tests need a SignedBlock for the - signed-block getter callable. An empty proof is sufficient for these - structural checks, which do not exercise cryptographic verification. - """ - return SignedBlock(block=block, proof=ByteList512KiB(data=b"")) - - -class TestFetchFinalizedBlock: - """Error-handling and integration tests for ``fetch_finalized_block``.""" - - async def test_network_error_raises_checkpoint_sync_error(self) -> None: - """TCP-level failure surfaces as CheckpointSyncError.""" - transport = _MockTransport( - exc=httpx.RequestError( - "connection refused", - request=httpx.Request("GET", f"http://example.com{FINALIZED_BLOCK_ENDPOINT}"), - ) - ) - with ( - patch( - "lean_spec.subspecs.sync.checkpoint_sync.httpx.AsyncClient", - return_value=httpx.AsyncClient(transport=transport), - ), - pytest.raises(CheckpointSyncError, match="Network error"), - ): - await fetch_finalized_block("http://example.com") - - async def test_http_404_raises_checkpoint_sync_error(self) -> None: - """A 404 (anchor block not retained on server) surfaces clearly.""" - transport = _MockTransport(status=404, content=b"Not Found") - with ( - patch( - "lean_spec.subspecs.sync.checkpoint_sync.httpx.AsyncClient", - return_value=httpx.AsyncClient(transport=transport), - ), - pytest.raises(CheckpointSyncError, match="HTTP error 404"), - ): - await fetch_finalized_block("http://example.com") - - async def test_corrupt_ssz_raises_checkpoint_sync_error(self) -> None: - """Malformed body fails at SignedBlock deserialization.""" - transport = _MockTransport(content=b"\xff\xfe corrupt") - with ( - patch( - "lean_spec.subspecs.sync.checkpoint_sync.httpx.AsyncClient", - return_value=httpx.AsyncClient(transport=transport), - ), - pytest.raises(CheckpointSyncError, match="Failed to fetch signed block"), - ): - await fetch_finalized_block("http://example.com") - - async def test_client_fetches_signed_block( - self, base_store: Store, genesis_block: Block - ) -> None: - """Client fetches and deserializes the anchor SignedBlock from server.""" - anchor_signed_block = _wrap_as_signed_block(genesis_block) - anchor_root = hash_tree_root(genesis_block) - - def signed_block_lookup(root: Bytes32) -> SignedBlock | None: - return anchor_signed_block if root == anchor_root else None - - config = ApiServerConfig(port=15059) - server = ApiServer( - config=config, - store_getter=lambda: base_store, - signed_block_getter=signed_block_lookup, - ) - await server.start() - try: - signed_block = await fetch_finalized_block("http://127.0.0.1:15059") - assert signed_block.block.slot == Slot(0) - assert hash_tree_root(signed_block.block) == anchor_root - finally: - await server.aclose() - - -class TestFetchFinalizedAnchor: - """Integration tests for the combined ``fetch_finalized_anchor`` helper.""" - - async def test_returns_state_block_pair(self, base_store: Store, genesis_block: Block) -> None: - """Pair satisfies ``signed_block.state_root == hash_tree_root(state)``.""" - anchor_signed_block = _wrap_as_signed_block(genesis_block) - anchor_root = hash_tree_root(genesis_block) - - def signed_block_lookup(root: Bytes32) -> SignedBlock | None: - return anchor_signed_block if root == anchor_root else None - - config = ApiServerConfig(port=15060) - server = ApiServer( - config=config, - store_getter=lambda: base_store, - signed_block_getter=signed_block_lookup, - ) - await server.start() - try: - state, signed_block = await fetch_finalized_anchor("http://127.0.0.1:15060", State) - assert state.slot == Slot(0) - assert signed_block.block.state_root == hash_tree_root(state) - finally: - await server.aclose() - - async def test_mismatched_pair_raises(self, base_store: Store) -> None: - """If block.state_root != hash_tree_root(state), raise CheckpointSyncError.""" - # Build a SignedBlock whose state_root deliberately differs from the - # served state's root, simulating a server that advanced finalization - # between the two fetches. - bad_block = Block( - slot=Slot(0), - proposer_index=base_store.blocks[base_store.latest_finalized.root].proposer_index, - parent_root=Bytes32.zero(), - state_root=Bytes32(b"\x01" * 32), - body=base_store.blocks[base_store.latest_finalized.root].body, - ) - bad_signed = _wrap_as_signed_block(bad_block) - - def signed_block_lookup(_root: Bytes32) -> SignedBlock | None: - return bad_signed - - config = ApiServerConfig(port=15061) - server = ApiServer( - config=config, - store_getter=lambda: base_store, - signed_block_getter=signed_block_lookup, - ) - await server.start() - try: - with pytest.raises(CheckpointSyncError, match="Anchor block / state mismatch"): - await fetch_finalized_anchor("http://127.0.0.1:15061", State) - finally: - await server.aclose() diff --git a/tests/lean_spec/subspecs/sync/test_peer_manager.py b/tests/lean_spec/subspecs/sync/test_peer_manager.py index d8f14576a..f172a963c 100644 --- a/tests/lean_spec/subspecs/sync/test_peer_manager.py +++ b/tests/lean_spec/subspecs/sync/test_peer_manager.py @@ -144,17 +144,17 @@ def test_remove_nonexistent_peer(self) -> None: assert manager.remove_peer(peer("16Uiu2HAmNonexistent")) is None def test_get_peer(self, connected_peer_info: PeerInfo) -> None: - """Getting a peer by ID returns the SyncPeer.""" + """Looking up a tracked peer by ID returns the SyncPeer.""" manager = PeerManager() manager.add_peer(connected_peer_info) - peer = manager.get_peer(connected_peer_info.peer_id) + peer = manager.peers.get(connected_peer_info.peer_id) assert peer == SyncPeer(info=connected_peer_info) def test_get_nonexistent_peer(self) -> None: - """Getting a nonexistent peer returns None.""" + """Looking up an unknown peer returns None.""" manager = PeerManager() - assert manager.get_peer(peer("16Uiu2HAmNonexistent")) is None + assert manager.peers.get(peer("16Uiu2HAmNonexistent")) is None class TestPeerManagerStatusTracking: @@ -167,7 +167,7 @@ def test_update_status(self, connected_peer_info: PeerInfo, sample_status: Statu manager.update_status(connected_peer_info.peer_id, sample_status) - peer = manager.get_peer(connected_peer_info.peer_id) + peer = manager.peers.get(connected_peer_info.peer_id) assert peer == SyncPeer(info=connected_peer_info, status=sample_status) def test_update_status_nonexistent_peer(self, sample_status: Status) -> None: @@ -322,27 +322,6 @@ def test_on_request_failure_nonexistent_peer(self) -> None: manager.on_request_failure(peer("16Uiu2HAmNonexistent")) -class TestPeerManagerGetAllPeers: - """Tests for PeerManager get_all_peers method.""" - - def test_get_all_peers_empty(self) -> None: - """get_all_peers returns empty list when no peers.""" - manager = PeerManager() - assert manager.get_all_peers() == [] - - def test_get_all_peers_returns_all(self, peer_id: PeerId, peer_id_2: PeerId) -> None: - """get_all_peers returns all tracked peers.""" - manager = PeerManager() - - info1 = PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED) - info2 = PeerInfo(peer_id=peer_id_2, state=ConnectionState.CONNECTED) - manager.add_peer(info1) - manager.add_peer(info2) - - peers = manager.get_all_peers() - assert peers == [SyncPeer(info=info1), SyncPeer(info=info2)] - - class TestPeerScoring: """Tests for peer scoring and weighted selection.""" @@ -399,6 +378,6 @@ def test_overloaded_peer_skipped(self, peer_id: PeerId, peer_id_2: PeerId) -> No peer1.requests_in_flight = MAX_CONCURRENT_REQUESTS - peer2 = manager.get_peer(peer_id_2) + peer2 = manager.peers.get(peer_id_2) selected = manager.select_peer_for_request() assert selected == peer2 diff --git a/tests/lean_spec/subspecs/sync/test_service.py b/tests/lean_spec/subspecs/sync/test_service.py index 754e8d5c6..e3b99d39c 100644 --- a/tests/lean_spec/subspecs/sync/test_service.py +++ b/tests/lean_spec/subspecs/sync/test_service.py @@ -16,7 +16,7 @@ from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.storage.database import Database from lean_spec.subspecs.sync.config import MAX_PENDING_ATTESTATIONS -from lean_spec.subspecs.sync.service import SyncProgress, SyncService +from lean_spec.subspecs.sync.service import SyncService from lean_spec.subspecs.sync.states import SyncState from lean_spec.types import Bytes32, Checkpoint, Slot, ValidatorIndex from tests.lean_spec.helpers import ( @@ -246,137 +246,6 @@ async def test_caches_orphan_in_syncing_state( assert sync_service.block_cache.orphan_count == 1 -class TestProgressReporting: - """Tests for sync progress reporting.""" - - def test_progress_reflects_current_state( - self, - sync_service: SyncService, - peer_id: PeerId, - ) -> None: - """get_progress accurately reflects service state.""" - # Initial progress - progress = sync_service.get_progress() - assert progress == SyncProgress( - state=SyncState.IDLE, - local_head_slot=Slot(0), - network_finalized_slot=None, - blocks_processed=0, - peers_connected=1, - cache_size=0, - orphan_count=0, - ) - - # After processing some blocks - sync_service.state = SyncState.SYNCING - sync_service._blocks_processed = 42 - - progress = sync_service.get_progress() - assert progress == SyncProgress( - state=SyncState.SYNCING, - local_head_slot=Slot(0), - network_finalized_slot=None, - blocks_processed=42, - peers_connected=1, - cache_size=0, - orphan_count=0, - ) - - def test_progress_includes_network_consensus( - self, - sync_service: SyncService, - peer_id: PeerId, - ) -> None: - """Progress includes network finalized slot from peers.""" - status = Status( - finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(100)), - head=Checkpoint(root=Bytes32.zero(), slot=Slot(150)), - ) - sync_service.peer_manager.update_status(peer_id, status) - - progress = sync_service.get_progress() - assert progress == SyncProgress( - state=SyncState.IDLE, - local_head_slot=Slot(0), - network_finalized_slot=Slot(100), - blocks_processed=0, - peers_connected=1, - cache_size=0, - orphan_count=0, - ) - - def test_progress_tracks_cache_state( - self, - sync_service: SyncService, - peer_id: PeerId, - ) -> None: - """Progress includes cache size and orphan count.""" - # Add blocks to cache - block1 = make_signed_block( - slot=Slot(1), - proposer_index=ValidatorIndex(0), - parent_root=Bytes32(b"\x01" * 32), - state_root=Bytes32(b"\x01" * 32), - ) - block2 = make_signed_block( - slot=Slot(2), - proposer_index=ValidatorIndex(0), - parent_root=Bytes32(b"\x02" * 32), - state_root=Bytes32(b"\x02" * 32), - ) - - pending1 = sync_service.block_cache.add(block1, peer_id) - sync_service.block_cache.add(block2, peer_id) - sync_service.block_cache.mark_orphan(pending1.root) - - progress = sync_service.get_progress() - assert progress == SyncProgress( - state=SyncState.IDLE, - local_head_slot=Slot(0), - network_finalized_slot=None, - blocks_processed=0, - peers_connected=1, - cache_size=2, - orphan_count=1, - ) - - -class TestReset: - """Tests for service reset functionality.""" - - def test_reset_clears_all_state( - self, - sync_service: SyncService, - peer_id: PeerId, - ) -> None: - """reset() returns service to initial state.""" - # Put service in a dirty state - sync_service.state = SyncState.SYNCED - sync_service._blocks_processed = 100 - - block = make_signed_block( - slot=Slot(1), - proposer_index=ValidatorIndex(0), - parent_root=Bytes32(b"\x01" * 32), - state_root=Bytes32.zero(), - ) - sync_service.block_cache.add(block, peer_id) - - # Verify backfill component exists before adding pending - assert sync_service._backfill is not None - sync_service._backfill._pending.add(Bytes32(b"\x02" * 32)) - - # Reset - sync_service.reset() - - # Verify clean state - assert sync_service.state == SyncState.IDLE - assert sync_service._blocks_processed == 0 - assert len(sync_service.block_cache) == 0 - assert sync_service._backfill is not None - assert sync_service._backfill._pending == set() - - class TestAttestationGossipHandling: """Tests for attestation gossip handling."""