diff --git a/src/commands/node_manager_start.py b/src/commands/node_manager_start.py index b394b9d4..75868c1b 100644 --- a/src/commands/node_manager_start.py +++ b/src/commands/node_manager_start.py @@ -26,7 +26,7 @@ LOG_PLAIN, settings, ) -from src.node_manager.execution import create_operator_validators_scanner +from src.node_manager.execution import scan_node_manager_validators_events from src.node_manager.startup_check import startup_checks from src.node_manager.tasks import NodeManagerTask, StateSyncTask from src.validators.database import ( @@ -35,6 +35,7 @@ VaultValidatorCrud, ) from src.validators.keystores.load import load_keystore +from src.validators.tasks import load_genesis_validators logger = logging.getLogger(__name__) @@ -206,14 +207,19 @@ async def _start( VaultValidatorCrud().setup() CheckpointCrud().setup() - keystore = await load_keystore() - validators_scanner = create_operator_validators_scanner(operator_address) + # load network validators from ipfs dump + await load_genesis_validators() + keystore = await load_keystore() # start operator tasks chain_state = await get_chain_finalized_head() logger.info('Syncing validator events...') - await validators_scanner.process_new_events(chain_state.block_number) + await scan_node_manager_validators_events( + operator_address=operator_address, + block_number=chain_state.block_number, + is_startup=True, + ) logger.info('Updating oracles cache...') await update_oracles_cache() @@ -227,7 +233,6 @@ async def _start( NodeManagerTask( operator_address=operator_address, keystore=keystore, - validators_scanner=validators_scanner, ).run(interrupt_handler), StateSyncTask(operator_address).run(interrupt_handler), ) diff --git a/src/node_manager/execution.py b/src/node_manager/execution.py index 3d23279a..ea0a2a8d 100644 --- a/src/node_manager/execution.py +++ b/src/node_manager/execution.py @@ -17,6 +17,10 @@ from src.config.settings import settings from src.node_manager.typings import OperatorStateUpdateParams from src.validators.database import CheckpointCrud, VaultValidatorCrud +from src.validators.execution import ( + NetworkValidatorsProcessor, + NetworkValidatorsStartupProcessor, +) from src.validators.typings import VaultValidator logger = logging.getLogger(__name__) @@ -49,15 +53,27 @@ async def process_events(self, events: list[EventData], to_block: BlockNumber) - if validators: VaultValidatorCrud().save_vault_validators(validators) - CheckpointCrud().update_validators_checkpoint(to_block) + CheckpointCrud().update_validators_checkpoint(block_number=to_block) -def create_operator_validators_scanner( - operator_address: ChecksumAddress, -) -> EventScanner: - """Create a reusable EventScanner for NodesManager ValidatorsRegistered events.""" - processor = OperatorValidatorsProcessor(operator_address) - return EventScanner(processor, argument_filters={'operator': operator_address}) +async def scan_node_manager_validators_events( + operator_address: ChecksumAddress, block_number: BlockNumber, is_startup: bool +) -> None: + """Scans new vault and network validators for the given block number.""" + network_validators_processor: NetworkValidatorsStartupProcessor | NetworkValidatorsProcessor + if is_startup: + network_validators_processor = NetworkValidatorsStartupProcessor() + else: + network_validators_processor = NetworkValidatorsProcessor() + + network_validators_scanner = EventScanner(network_validators_processor) + await network_validators_scanner.process_new_events(block_number) + + operator_validators_processor = OperatorValidatorsProcessor(operator_address) + operator_validators_scanner = EventScanner( + operator_validators_processor, argument_filters={'operator': operator_address} + ) + await operator_validators_scanner.process_new_events(block_number) def _parse_public_keys(public_keys_bytes: bytes) -> list[HexStr]: diff --git a/src/node_manager/oracles.py b/src/node_manager/oracles.py index d8c23b2a..3d71c42e 100644 --- a/src/node_manager/oracles.py +++ b/src/node_manager/oracles.py @@ -11,7 +11,7 @@ from sw_utils.common import urljoin from sw_utils.typings import ProtocolConfig from web3 import Web3 -from web3.types import Gwei, Wei +from web3.types import Wei from src.common.contracts import validators_registry_contract from src.common.exceptions import ( @@ -241,7 +241,7 @@ async def create_approval_request( async def poll_funding_approval( - validator_fundings: dict[HexStr, Gwei], + validators: Sequence[Validator], operator_address: ChecksumAddress, protocol_config: ProtocolConfig, ) -> list[HexStr]: @@ -259,7 +259,7 @@ async def poll_funding_approval( if request is None or deadline is None or deadline <= current_timestamp: deadline = current_timestamp + protocol_config.signature_validity_period request = create_funding_request( - validator_fundings=validator_fundings, + validators=validators, operator_address=operator_address, deadline=deadline, ) @@ -276,19 +276,31 @@ async def poll_funding_approval( def create_funding_request( - validator_fundings: dict[HexStr, Gwei], + validators: Sequence[Validator], operator_address: ChecksumAddress, deadline: int, ) -> NodeManagerFundingRequest: """Build a NodesManager funding request for validator top-ups.""" - return NodeManagerFundingRequest( + + request = NodeManagerFundingRequest( operator_address=operator_address, - public_keys=list(validator_fundings.keys()), - amounts=[int(amount) for amount in validator_fundings.values()], + public_keys=[], + amounts=[], + deposit_signatures=[], deadline=deadline, validators_manager_signature=_sign_deadline(deadline), ) + for validator in validators: + if validator.deposit_signature is None: + raise ValueError('Deposit signature is required for validator') + + request.public_keys.append(validator.public_key) + request.deposit_signatures.append(validator.deposit_signature) + request.amounts.append(validator.amount) + + return request + # Generic oracle request helpers @@ -500,5 +512,5 @@ async def _send_request( def _sign_deadline(deadline: int) -> HexStr: """EIP-191 personal_sign of the deadline timestamp.""" - message = encode_defunct(text=str(deadline)) + message = encode_defunct(primitive=deadline.to_bytes(32, byteorder='big')) return HexStr(wallet.sign_message(message).signature.hex()) diff --git a/src/node_manager/register_validators.py b/src/node_manager/register_validators.py index 09aeb02b..3f5714d3 100644 --- a/src/node_manager/register_validators.py +++ b/src/node_manager/register_validators.py @@ -5,7 +5,6 @@ from sw_utils.typings import Bytes32 from web3 import Web3 from web3.exceptions import ContractLogicError -from web3.types import Gwei from src.common.clients import execution_client from src.common.contracts import nodes_manager_contract, validators_registry_contract @@ -93,17 +92,9 @@ async def register_validators( async def fund_validators( operator_address: ChecksumAddress, signatures: list[HexStr], - validator_fundings: dict[HexStr, Gwei], + validators: Sequence[Validator], ) -> HexStr | None: """Submit fundValidators transaction to NodesManager contract.""" - validators = [ - Validator( - public_key=public_key, - amount=amount, - deposit_signature=HexStr(Web3.to_hex(bytes(96))), - ) - for public_key, amount in validator_fundings.items() - ] tx_validators = [ Web3.to_bytes(tx_validator) for tx_validator in encode_tx_validator_list(validators=validators) diff --git a/src/node_manager/tasks.py b/src/node_manager/tasks.py index ff547db5..0ec32ce3 100644 --- a/src/node_manager/tasks.py +++ b/src/node_manager/tasks.py @@ -1,7 +1,7 @@ import logging from eth_typing import ChecksumAddress -from sw_utils import EventScanner, InterruptHandler +from sw_utils import InterruptHandler from sw_utils.typings import ProtocolConfig from web3 import Web3 from web3.types import Gwei, Wei @@ -16,6 +16,7 @@ from src.config.settings import settings from src.node_manager.execution import ( fetch_operator_state_from_ipfs, + scan_node_manager_validators_events, submit_state_sync_transaction, ) from src.node_manager.oracles import ( @@ -27,7 +28,10 @@ from src.validators.consensus import fetch_compounding_validators_balances from src.validators.keystores.base import BaseKeystore from src.validators.tasks import get_deposits_amounts, get_funding_amounts -from src.validators.utils import get_validators_for_registration +from src.validators.utils import ( + get_validators_for_funding, + get_validators_for_registration, +) logger = logging.getLogger(__name__) @@ -39,15 +43,17 @@ def __init__( self, operator_address: ChecksumAddress, keystore: BaseKeystore, - validators_scanner: EventScanner, ) -> None: self.operator_address = operator_address self.keystore = keystore - self.validators_scanner = validators_scanner async def process_block(self, interrupt_handler: InterruptHandler) -> None: chain_head = await get_chain_finalized_head() - await self.validators_scanner.process_new_events(chain_head.block_number) + await scan_node_manager_validators_events( + operator_address=self.operator_address, + block_number=chain_head.block_number, + is_startup=False, + ) if not await check_gas_price(high_priority=True): logger.debug('Gas price too high, skipping validators registration') @@ -143,16 +149,20 @@ async def _process_funding( if not validator_fundings: return amount + validators = await get_validators_for_funding(self.keystore, validator_fundings) + if not validators: + logger.warning('No available validators keystores for funding') + return amount + funded_total = Gwei(0) batch_limit = protocol_config.validators_approval_batch_limit # Process in batches - funding_items = list(validator_fundings.items()) - for i in range(0, len(funding_items), batch_limit): - batch = dict(funding_items[i : i + batch_limit]) + for i in range(0, len(validators), batch_limit): + batch = validators[i : i + batch_limit] signatures = await poll_funding_approval( - validator_fundings=batch, + validators=batch, operator_address=operator_address, protocol_config=protocol_config, ) @@ -160,13 +170,13 @@ async def _process_funding( tx_hash = await fund_validators( operator_address=self.operator_address, signatures=signatures, - validator_fundings=batch, + validators=batch, ) if tx_hash: - batch_total = sum(batch.values()) + batch_total = sum(v.amount for v in batch) funded_total = Gwei(funded_total + batch_total) - pub_keys = ', '.join(batch.keys()) + pub_keys = ', '.join([v.public_key for v in batch]) logger.info('Funded community vault validators %s: tx=%s', pub_keys, tx_hash) else: logger.warning('Community vault funding batch failed, stopping funding') diff --git a/src/node_manager/tests/test_oracles_http.py b/src/node_manager/tests/test_oracles_http.py index e429bb1a..227d5a47 100644 --- a/src/node_manager/tests/test_oracles_http.py +++ b/src/node_manager/tests/test_oracles_http.py @@ -235,6 +235,7 @@ def _make_funding_request() -> NodeManagerFundingRequest: operator_address=faker.eth_address(), public_keys=[faker.validator_public_key()], amounts=[ether_to_gwei(32)], + deposit_signatures=[faker.validator_signature()], deadline=1000, validators_manager_signature=faker.account_signature(), ) diff --git a/src/node_manager/tests/test_register_validators.py b/src/node_manager/tests/test_register_validators.py index d26af18b..b8b8ec9c 100644 --- a/src/node_manager/tests/test_register_validators.py +++ b/src/node_manager/tests/test_register_validators.py @@ -1,15 +1,15 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from eth_typing import ChecksumAddress, HexStr +from eth_typing import ChecksumAddress from sw_utils.tests.factories import faker from web3 import Web3 from web3.exceptions import ContractLogicError -from web3.types import Gwei from src.common.tests.utils import ether_to_gwei from src.node_manager.register_validators import fund_validators, register_validators from src.node_manager.typings import NodeManagerRegistrationOraclesApproval +from src.validators.typings import Validator MODULE = 'src.node_manager.register_validators' @@ -199,7 +199,7 @@ async def test_success( result = await fund_validators( operator_address=OPERATOR_ADDR, signatures=[faker.account_signature()], - validator_fundings=_make_validator_fundings(), + validators=_make_validators(), ) assert result is not None @@ -216,7 +216,7 @@ async def test_transaction_error_returns_none( result = await fund_validators( operator_address=OPERATOR_ADDR, signatures=[faker.account_signature()], - validator_fundings=_make_validator_fundings(), + validators=_make_validators(), ) assert result is None @@ -239,15 +239,19 @@ async def test_failed_tx_receipt_returns_none( result = await fund_validators( operator_address=OPERATOR_ADDR, signatures=[faker.account_signature()], - validator_fundings=_make_validator_fundings(), + validators=_make_validators(), ) assert result is None -def _make_validator_fundings() -> dict[HexStr, Gwei]: - return { - faker.validator_public_key(): ether_to_gwei(32), - } +def _make_validators() -> list[Validator]: + return [ + Validator( + public_key=faker.validator_public_key(), + amount=ether_to_gwei(32), + deposit_signature=faker.validator_signature(), + ) + ] def _make_approval() -> NodeManagerRegistrationOraclesApproval: diff --git a/src/node_manager/tests/test_tasks.py b/src/node_manager/tests/test_tasks.py index 534bd77f..7bdc27da 100644 --- a/src/node_manager/tests/test_tasks.py +++ b/src/node_manager/tests/test_tasks.py @@ -3,7 +3,6 @@ import pytest from eth_typing import ChecksumAddress from hexbytes import HexBytes -from sw_utils import EventScanner from sw_utils.tests.factories import faker from sw_utils.typings import ProtocolConfig from web3 import Web3 @@ -29,11 +28,11 @@ @pytest.fixture(autouse=True) def _mock_scan_events() -> None: - """Mock chain head for all tests calling process_block.""" + """Mock chain head and validator event scanning for all tests calling process_block.""" chain_head = MagicMock(block_number=100) with patch( f'{MODULE}.get_chain_finalized_head', new_callable=AsyncMock, return_value=chain_head - ): + ), patch(f'{MODULE}.scan_node_manager_validators_events', new_callable=AsyncMock): yield @@ -256,12 +255,14 @@ class TestProcessFunding: @patch(f'{MODULE}.fund_validators', new_callable=AsyncMock, return_value='0xtxhash') @patch(f'{MODULE}.poll_funding_approval', new_callable=AsyncMock, return_value=['0xsig']) + @patch(f'{MODULE}.get_validators_for_funding', new_callable=AsyncMock) @patch(f'{MODULE}.get_funding_amounts') @patch(f'{MODULE}.fetch_compounding_validators_balances', new_callable=AsyncMock) async def test_single_batch_success( self, mock_balances: AsyncMock, mock_funding_amounts: MagicMock, + mock_get_validators: AsyncMock, mock_poll: AsyncMock, mock_fund: AsyncMock, ) -> None: @@ -269,6 +270,7 @@ async def test_single_batch_success( public_key = faker.validator_public_key() mock_balances.return_value = {public_key: ether_to_gwei(32)} mock_funding_amounts.return_value = {public_key: ether_to_gwei(10)} + mock_get_validators.return_value = [_make_validator(public_key, ether_to_gwei(10))] task = _make_task() remaining = await task._process_funding( @@ -283,12 +285,14 @@ async def test_single_batch_success( @patch(f'{MODULE}.fund_validators', new_callable=AsyncMock, return_value='0xtxhash') @patch(f'{MODULE}.poll_funding_approval', new_callable=AsyncMock, return_value=['0xsig']) + @patch(f'{MODULE}.get_validators_for_funding', new_callable=AsyncMock) @patch(f'{MODULE}.get_funding_amounts') @patch(f'{MODULE}.fetch_compounding_validators_balances', new_callable=AsyncMock) async def test_multi_batch( self, mock_balances: AsyncMock, mock_funding_amounts: MagicMock, + mock_get_validators: AsyncMock, mock_poll: AsyncMock, mock_fund: AsyncMock, ) -> None: @@ -296,6 +300,7 @@ async def test_multi_batch( keys = [faker.validator_public_key() for _ in range(3)] mock_balances.return_value = {k: ether_to_gwei(32) for k in keys} mock_funding_amounts.return_value = {k: ether_to_gwei(10) for k in keys} + mock_get_validators.return_value = [_make_validator(k, ether_to_gwei(10)) for k in keys] config = _make_protocol_config() config.validators_approval_batch_limit = 2 @@ -313,12 +318,14 @@ async def test_multi_batch( @patch(f'{MODULE}.fund_validators', new_callable=AsyncMock) @patch(f'{MODULE}.poll_funding_approval', new_callable=AsyncMock, return_value=['0xsig']) + @patch(f'{MODULE}.get_validators_for_funding', new_callable=AsyncMock) @patch(f'{MODULE}.get_funding_amounts') @patch(f'{MODULE}.fetch_compounding_validators_balances', new_callable=AsyncMock) async def test_partial_failure_stops_funding( self, mock_balances: AsyncMock, mock_funding_amounts: MagicMock, + mock_get_validators: AsyncMock, mock_poll: AsyncMock, mock_fund: AsyncMock, ) -> None: @@ -326,6 +333,7 @@ async def test_partial_failure_stops_funding( keys = [faker.validator_public_key() for _ in range(3)] mock_balances.return_value = {k: ether_to_gwei(32) for k in keys} mock_funding_amounts.return_value = {k: ether_to_gwei(10) for k in keys} + mock_get_validators.return_value = [_make_validator(k, ether_to_gwei(10)) for k in keys] mock_fund.side_effect = ['0xtxhash', None] config = _make_protocol_config() @@ -562,10 +570,15 @@ def _make_protocol_config() -> MagicMock: def _make_task() -> NodeManagerTask: keystore = MagicMock() - validators_scanner = MagicMock(spec=EventScanner) - validators_scanner.process_new_events = AsyncMock() return NodeManagerTask( operator_address=OPERATOR_ADDR, keystore=keystore, - validators_scanner=validators_scanner, + ) + + +def _make_validator(public_key: str, amount: Gwei) -> Validator: + return Validator( + public_key=public_key, + amount=amount, + deposit_signature=faker.validator_signature(), ) diff --git a/src/node_manager/typings.py b/src/node_manager/typings.py index 62fe5db3..4f87d8f9 100644 --- a/src/node_manager/typings.py +++ b/src/node_manager/typings.py @@ -37,6 +37,7 @@ class NodeManagerFundingRequest: operator_address: ChecksumAddress public_keys: list[HexStr] amounts: list[int] + deposit_signatures: list[HexStr] deadline: int validators_manager_signature: HexStr diff --git a/src/validators/utils.py b/src/validators/utils.py index 60fdd7ec..3d1b0102 100644 --- a/src/validators/utils.py +++ b/src/validators/utils.py @@ -41,6 +41,25 @@ async def get_validators_for_registration( return validators +async def get_validators_for_funding( + keystore: BaseKeystore, + validator_fundings: dict[HexStr, Gwei], +) -> Sequence[Validator]: + """Returns list of validators for funding.""" + validators = [] + for public_key, amount in validator_fundings.items(): + deposit_data = await keystore.get_deposit_data(public_key=public_key, amount=amount) + validators.append( + Validator( + public_key=Web3.to_hex(deposit_data['pubkey']), + deposit_signature=Web3.to_hex(deposit_data['signature']), + amount=Gwei(int(deposit_data['amount'])), + ) + ) + + return validators + + def get_withdrawal_credentials() -> Bytes32: """Returns withdrawal credentials based on the validator type.""" if settings.validator_type == ValidatorType.V1: