Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions src/lean_spec/subspecs/api/endpoints/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1 @@
"""API endpoint specifications."""

from . import aggregator, checkpoints, fork_choice, health, metrics, states

__all__ = [
"aggregator",
"checkpoints",
"fork_choice",
"health",
"metrics",
"states",
]
56 changes: 0 additions & 56 deletions src/lean_spec/subspecs/api/endpoints/blocks.py

This file was deleted.

3 changes: 1 addition & 2 deletions src/lean_spec/subspecs/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@

from aiohttp import web

from .endpoints import aggregator, blocks, checkpoints, fork_choice, health, metrics, states
from .endpoints import aggregator, checkpoints, fork_choice, health, metrics, states

Handler = Callable[[web.Request], Awaitable[web.Response]]
"""Type alias for aiohttp request handlers."""

ROUTES: dict[str, Handler] = {
"/lean/v0/health": health.handle,
"/lean/v0/states/finalized": states.handle_finalized,
"/lean/v0/blocks/finalized": blocks.handle_finalized,
"/lean/v0/checkpoints/justified": checkpoints.handle_justified,
"/lean/v0/fork_choice": fork_choice.handle,
"/metrics": metrics.handle,
Expand Down
18 changes: 1 addition & 17 deletions src/lean_spec/subspecs/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

from aiohttp import web

from lean_spec.forks import LstarSpec, SignedBlock, Store
from lean_spec.types import Bytes32
from lean_spec.forks import LstarSpec, Store

from .aggregator_controller import AggregatorController
from .routes import ADMIN_ROUTES, ROUTES
Expand Down Expand Up @@ -72,17 +71,6 @@ class ApiServer:
store_getter: Callable[[], Store | None] | None = None
"""Callable that returns the current Store instance."""

signed_block_getter: Callable[[Bytes32], SignedBlock | None] | None = None
"""
Callable that returns the SignedBlock for a given block root, if available.

Used by ``/lean/v0/blocks/finalized`` to serve the anchor block alongside
the finalized state for checkpoint sync. Implementations must retain the
SignedBlock for at least ``store.latest_finalized.root``; ``Store.blocks``
holds only unsigned ``Block`` objects, so a separate signed-block source
is required (e.g. a long-lived anchor cache or a SignedBlock-aware store).
"""

aggregator_controller: AggregatorController | None = None
"""
Optional controller for toggling the aggregator role at runtime.
Expand Down Expand Up @@ -113,10 +101,6 @@ async def start(self) -> None:
# Store the store_getter in app for handlers that need store access
app["store_getter"] = self.store_getter

# Expose the signed-block lookup for endpoints serving SignedBlocks
# (e.g. /lean/v0/blocks/finalized for checkpoint-sync anchor block).
app["signed_block_getter"] = self.signed_block_getter

# Expose the fork spec for handlers that drive consensus computations.
app["spec"] = self.spec

Expand Down
2 changes: 1 addition & 1 deletion src/lean_spec/subspecs/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ async def _log_justified_finalized_periodically(self) -> None:
break
store = self.sync_service.store
peers_connected = sum(
1 for p in self.sync_service.peer_manager.get_all_peers() if p.is_connected()
1 for p in self.sync_service.peer_manager.peers.values() if p.is_connected()
)
metrics.lean_current_slot.set(self.clock.current_slot())
metrics.lean_connected_peers.set(peers_connected)
Expand Down
33 changes: 0 additions & 33 deletions src/lean_spec/subspecs/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,6 @@ def put_block(self, block: SpecBlockType, root: Bytes32) -> None:
"""
...

def has_block(self, root: Bytes32) -> bool:
"""
Check if a block exists in storage.

Args:
root: SSZ hash tree root of the block.

Returns:
True if block exists.
"""
...

# State Operations

def get_state(self, root: Bytes32) -> SpecStateType | None:
Expand All @@ -93,18 +81,6 @@ def put_state(self, state: SpecStateType, root: Bytes32) -> None:
"""
...

def has_state(self, root: Bytes32) -> bool:
"""
Check if a state exists in storage.

Args:
root: Block root hash associated with the state.

Returns:
True if state exists.
"""
...

# Checkpoint Operations

def get_justified_checkpoint(self) -> Checkpoint | None:
Expand Down Expand Up @@ -237,15 +213,6 @@ def put_genesis_time(self, genesis_time: Uint64) -> None:

# Transaction Control

def commit(self) -> None:
"""
Commit pending writes to durable storage.

All writes via put_* methods are buffered until commit() or batch_write().
Callers must explicitly commit after writes.
"""
...

@contextmanager
def batch_write(self) -> Generator[None]:
"""
Expand Down
38 changes: 0 additions & 38 deletions src/lean_spec/subspecs/storage/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,6 @@ def put_block(self, block: SpecBlockType, root: Bytes32) -> None:
except sqlite3.Error as e:
raise StorageWriteError(f"Failed to write block {root.hex()}: {e}") from e

def has_block(self, root: Bytes32) -> bool:
"""Check if a block exists in storage."""
try:
cursor = self._conn.cursor()

# SELECT 1 is an existence check optimization.
#
# We only care whether a row exists, not its contents.
# This avoids deserializing potentially large SSZ data.
cursor.execute(
f"SELECT 1 FROM {BLOCKS.TABLE_NAME} WHERE root = ?",
(bytes(root),),
)
return cursor.fetchone() is not None
except sqlite3.Error as e:
raise StorageReadError(f"Failed to check block existence {root.hex()}: {e}") from e

# State Operations

#
Expand Down Expand Up @@ -231,20 +214,6 @@ def put_state(self, state: SpecStateType, root: Bytes32) -> None:
except sqlite3.Error as e:
raise StorageWriteError(f"Failed to write state for block {root.hex()}: {e}") from e

def has_state(self, root: Bytes32) -> bool:
"""Check if a state exists in storage."""
try:
cursor = self._conn.cursor()
cursor.execute(
f"SELECT 1 FROM {STATES.TABLE_NAME} WHERE root = ?",
(bytes(root),),
)
return cursor.fetchone() is not None
except sqlite3.Error as e:
raise StorageReadError(
f"Failed to check state existence for block {root.hex()}: {e}"
) from e

# Checkpoint Operations

#
Expand Down Expand Up @@ -501,13 +470,6 @@ def put_genesis_time(self, genesis_time: Uint64) -> None:

# Transaction Control

def commit(self) -> None:
"""Commit pending writes to durable storage."""
try:
self._conn.commit()
except sqlite3.Error as e:
raise StorageWriteError(f"Failed to commit: {e}") from e

@contextmanager
def batch_write(self) -> Generator[None]:
"""
Expand Down
104 changes: 1 addition & 103 deletions src/lean_spec/subspecs/sync/checkpoint_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import httpx

from lean_spec.forks import SignedBlock, State
from lean_spec.forks import State
from lean_spec.subspecs.chain.config import VALIDATOR_REGISTRY_LIMIT
from lean_spec.subspecs.ssz.hash import hash_tree_root

Expand All @@ -35,16 +35,6 @@
FINALIZED_STATE_ENDPOINT: Final = "/lean/v0/states/finalized"
"""API endpoint for fetching finalized state. Follows Beacon API conventions."""

FINALIZED_BLOCK_ENDPOINT: Final = "/lean/v0/blocks/finalized"
"""API endpoint for fetching the SignedBlock matching the finalized state.

``Store.create_store`` requires both the finalized state and the anchor block
(it asserts ``anchor_block.state_root == hash_tree_root(state)`` and seeds
``store.blocks[anchor_root] = anchor_block``). The state endpoint alone is
insufficient; this endpoint ships the matching SignedBlock so the pair can
be obtained atomically via two requests.
"""


class CheckpointSyncError(Exception):
"""
Expand Down Expand Up @@ -113,98 +103,6 @@ async def fetch_finalized_state(url: str, state_class: type[State]) -> State:
raise CheckpointSyncError(f"Failed to fetch state: {e}") from e


async def fetch_finalized_block(url: str) -> SignedBlock:
"""
Fetch the SignedBlock matching the finalized state via checkpoint sync.

Retrieves the SSZ-encoded SignedBlock at ``store.latest_finalized.root``.
The caller is responsible for verifying that
``signed_block.block.state_root == hash_tree_root(state)`` before passing
the pair to ``Store.create_store``.

Args:
url: Base URL of the node API (e.g., "http://localhost:5052").

Returns:
The finalized SignedBlock object.

Raises:
CheckpointSyncError: If the request fails or block bytes are invalid.
"""
base_url = url.rstrip("/")
full_url = f"{base_url}{FINALIZED_BLOCK_ENDPOINT}"

logger.info("Fetching finalized signed block from %s", full_url)

headers = {"Accept": "application/octet-stream"}

try:
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
response = await client.get(full_url, headers=headers)
response.raise_for_status()

ssz_data = response.content
logger.info("Downloaded %d bytes of SSZ signed block data", len(ssz_data))

signed_block = SignedBlock.decode_bytes(ssz_data)
logger.info(
"Deserialized signed block at slot %s",
signed_block.block.slot,
)

return signed_block

except httpx.RequestError as exc:
raise CheckpointSyncError(
f"Network error while connecting to {exc.request.url}: {exc}"
) from exc
except httpx.HTTPStatusError as exc:
raise CheckpointSyncError(
f"HTTP error {exc.response.status_code}: {exc.response.text[:200]}"
) from exc
except Exception as e:
raise CheckpointSyncError(f"Failed to fetch signed block: {e}") from e


async def fetch_finalized_anchor(url: str, state_class: type[State]) -> tuple[State, SignedBlock]:
"""
Fetch the ``(state, signed_block)`` pair required by ``Store.create_store``.

Issues two requests in sequence: ``/lean/v0/states/finalized`` then
``/lean/v0/blocks/finalized``. Both endpoints serve the snapshot at
``store.latest_finalized.root``; on the server side these reads are
expected to be consistent against a single finalized checkpoint.

Verifies the block / state pairing before returning. If the server
advanced finalization between the two requests the pairing assertion
will fail and the caller should retry.

Args:
url: Base URL of the node API.
state_class: State class used to decode the state SSZ bytes.

Returns:
Tuple of (finalized state, finalized signed block).

Raises:
CheckpointSyncError: If either fetch fails, or the block's
``state_root`` does not equal ``hash_tree_root(state)``.
"""
state = await fetch_finalized_state(url, state_class)
signed_block = await fetch_finalized_block(url)

expected_state_root = hash_tree_root(state)
if signed_block.block.state_root != expected_state_root:
raise CheckpointSyncError(
"Anchor block / state mismatch: "
f"signed_block.block.state_root={signed_block.block.state_root.hex()} "
f"hash_tree_root(state)={expected_state_root.hex()}. "
"Server may have advanced finalization between requests; retry."
)

return state, signed_block


def verify_checkpoint_state(state: State) -> bool:
"""
Verify that a checkpoint state is structurally valid.
Expand Down
Loading
Loading