diff --git a/example/bitcoin.rs b/example/bitcoin.rs index b7984e0b..66ee55bb 100644 --- a/example/bitcoin.rs +++ b/example/bitcoin.rs @@ -62,9 +62,7 @@ async fn main() { tracing::info!("Last block average fee rate: {:#}", avg_fee_rate); break; }, - Event::BlocksDisconnected { accepted: _, disconnected: _} => { - tracing::warn!("Some blocks were reorganized") - }, + Event::IndexedFilter(_) => (), _ => (), } } diff --git a/src/builder.rs b/src/builder.rs index 23f9348a..de039587 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -4,21 +4,18 @@ use std::{path::PathBuf, time::Duration}; use bitcoin::Network; use super::{client::Client, config::NodeConfig, node::Node}; +use crate::chain::checkpoints::HeaderCheckpoint; #[cfg(feature = "rusqlite")] use crate::db::error::SqlInitializationError; #[cfg(feature = "rusqlite")] -use crate::db::sqlite::{headers::SqliteHeaderDb, peers::SqlitePeerDb}; +use crate::db::sqlite::peers::SqlitePeerDb; use crate::network::dns::{DnsResolver, DNS_RESOLVER_PORT}; use crate::network::ConnectionType; -use crate::{ - chain::checkpoints::HeaderCheckpoint, - db::traits::{HeaderStore, PeerStore}, -}; use crate::{PeerStoreSizeConfig, TrustedPeer}; #[cfg(feature = "rusqlite")] /// The default node returned from the [`Builder`]. -pub type NodeDefault = Node; +pub type NodeDefault = Node; const MIN_PEERS: u8 = 1; const MAX_PEERS: u8 = 15; @@ -171,26 +168,10 @@ impl Builder { #[cfg(feature = "rusqlite")] pub fn build(&mut self) -> Result<(NodeDefault, Client), SqlInitializationError> { let peer_store = SqlitePeerDb::new(self.network, self.config.data_path.clone())?; - let header_store = SqliteHeaderDb::new(self.network, self.config.data_path.clone())?; Ok(Node::new( self.network, core::mem::take(&mut self.config), peer_store, - header_store, )) } - - /// Consume the node builder by using custom database implementations, receiving a [`Node`] and [`Client`]. - pub fn build_with_databases( - &mut self, - peer_store: P, - header_store: H, - ) -> (Node, Client) { - Node::new( - self.network, - core::mem::take(&mut self.config), - peer_store, - header_store, - ) - } } diff --git a/src/chain/chain.rs b/src/chain/chain.rs index 06df3ef9..199f8599 100644 --- a/src/chain/chain.rs +++ b/src/chain/chain.rs @@ -1,5 +1,5 @@ extern crate alloc; -use std::{collections::BTreeMap, ops::Range, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc}; use bitcoin::{ block::Header, @@ -14,40 +14,30 @@ use super::{ error::{CFHeaderSyncError, CFilterSyncError, HeaderSyncError}, graph::{AcceptHeaderChanges, BlockTree, HeaderRejection}, CFHeaderChanges, Filter, FilterCheck, FilterHeaderRequest, FilterRequest, FilterRequestState, - HeaderChainChanges, HeightExt, HeightMonitor, PeerId, -}; -use crate::IndexedFilter; -use crate::{ - chain::header_batch::HeadersBatch, - db::{traits::HeaderStore, BlockHeaderChanges}, - dialog::Dialog, - error::HeaderPersistenceError, - messages::{Event, Warning}, - Info, Progress, + HeaderChainChanges, HeightMonitor, PeerId, }; +use crate::{chain::header_batch::HeadersBatch, dialog::Dialog, messages::Event, Info, Progress}; +use crate::{db::BlockHeaderChanges, IndexedFilter}; -const REORG_LOOKBACK: u32 = 7; const FILTER_BASIC: u8 = 0x00; const CF_HEADER_BATCH_SIZE: u32 = 1_999; const FILTER_BATCH_SIZE: u32 = 999; #[derive(Debug)] -pub(crate) struct Chain { +pub(crate) struct Chain { pub(crate) header_chain: BlockTree, request_state: FilterRequestState, network: Network, - db: H, heights: Arc>, dialog: Arc, } -impl Chain { +impl Chain { pub(crate) fn new( network: Network, anchor: Option, dialog: Arc, height_monitor: Arc>, - db: H, quorum_required: u8, ) -> Self { let header_chain = match anchor { @@ -58,7 +48,6 @@ impl Chain { header_chain, request_state: FilterRequestState::new(quorum_required), network, - db, heights: height_monitor, dialog, } @@ -84,81 +73,6 @@ impl Chain { } } - // Load in headers, ideally allowing the difficulty adjustment to be audited and - // reorganizations to be handled gracefully. - pub(crate) async fn load_headers(&mut self) -> Result<(), HeaderPersistenceError> { - // The original height the user requested a scan after - let scan_height = self.header_chain.height(); - // The header relevant to compute the next adjustment - let last_adjustment = scan_height.last_epoch_start(self.network); - // Seven blocks ago - let reorg = scan_height.saturating_sub(REORG_LOOKBACK); - // To handle adjustments and reorgs, we would have the minimum of each of these heights - let min_interesting_height = last_adjustment.min(reorg); - let max_interesting_height = last_adjustment.max(reorg); - // Get the maximum of the two interesting heights. In case the minimum is not available - if let Some(header) = self - .db - .header_at(max_interesting_height) - .await - .ok() - .flatten() - { - self.header_chain = - BlockTree::from_header(max_interesting_height, header, self.network); - } - // If this succeeds, both reorgs and difficulty adjustments can be handled gracefully. - if let Some(header) = self - .db - .header_at(min_interesting_height) - .await - .ok() - .flatten() - { - self.header_chain = - BlockTree::from_header(min_interesting_height, header, self.network); - } - // Now that the block tree is updated to the appropriate start, load in the rest of - // the history from this point onward. This is either: from the user start height, - // from the last difficulty adjustment, or seven blocks ago, depending on what the - // header store was able to provide. - let loaded_headers = self - .db - .load(self.header_chain.height().increment()..) - .await - .map_err(HeaderPersistenceError::Database)?; - for (height, header) in loaded_headers { - let apply_header_changes = self.header_chain.accept_header(header); - match apply_header_changes { - AcceptHeaderChanges::Accepted { connected_at } => { - if height.ne(&connected_at.height) { - self.dialog.send_warning(Warning::CorruptedHeaders); - return Err(HeaderPersistenceError::HeadersDoNotLink); - } - } - AcceptHeaderChanges::Rejected(reject_reason) => match reject_reason { - HeaderRejection::UnknownPrevHash(_) => { - return Err(HeaderPersistenceError::CannotLocateHistory); - } - HeaderRejection::InvalidPow { - expected: _, - got: _, - } => { - crate::debug!( - "Unexpected invalid proof of work when importing a block header" - ); - } - }, - _ => (), - } - } - // Because the user requested a scan after the `scan_height`, the filters below this point - // may be assumed as checked. Note that in a reorg, filters below this height may still be - // retrieved, as this only considers the canonical chain as checked. - self.header_chain.assume_checked_to(scan_height); - Ok(()) - } - // Sync the chain with headers from a peer, adjusting to reorgs if needed pub(crate) async fn sync_chain( &mut self, @@ -182,7 +96,8 @@ impl Chain { connected_at.height, connected_at.header.block_hash() )); - self.db.stage(BlockHeaderChanges::Connected(connected_at)); + self.dialog + .send_event(Event::Chain(BlockHeaderChanges::Connected(connected_at))); } AcceptHeaderChanges::Duplicate => (), AcceptHeaderChanges::ExtendedFork { connected_at } => { @@ -201,15 +116,11 @@ impl Chain { .map(|index| index.header.block_hash()) .collect(); reorged_hashes = Some(removed_hashes); - self.db.stage(BlockHeaderChanges::Reorganized { - accepted: accepted.clone(), - reorganized: disconnected.clone(), - }); - let disconnected_event = Event::BlocksDisconnected { + let changes = BlockHeaderChanges::Reorganized { accepted, - disconnected, + reorganized: disconnected, }; - self.dialog.send_event(disconnected_event); + self.dialog.send_event(Event::Chain(changes)); } AcceptHeaderChanges::Rejected(rejected_header) => match rejected_header { HeaderRejection::InvalidPow { @@ -223,11 +134,6 @@ impl Chain { }, } } - if let Err(e) = self.db.write().await { - self.dialog.send_warning(Warning::FailedPersistence { - warning: format!("Could not save headers to disk: {e}"), - }); - } match reorged_hashes { Some(reorgs) => { self.clear_compact_filter_queue(); @@ -471,42 +377,6 @@ impl Chain { self.header_chain.filters_synced() } - // Fetch a header from the cache or disk. - pub(crate) async fn fetch_header( - &mut self, - height: u32, - ) -> Result, HeaderPersistenceError> { - match self.header_chain.header_at_height(height) { - Some(header) => Ok(Some(header)), - None => { - let header_opt = self.db.header_at(height).await; - if header_opt.is_err() { - self.dialog - .send_warning(Warning::FailedPersistence { - warning: format!( - "Unexpected error fetching a header from the header store at height {height}" - ), - }); - } - header_opt.map_err(HeaderPersistenceError::Database) - } - } - } - - pub(crate) async fn fetch_header_range( - &mut self, - range: Range, - ) -> Result, HeaderPersistenceError> { - let range_opt = self.db.load(range).await; - if range_opt.is_err() { - self.dialog.send_warning(Warning::FailedPersistence { - warning: "Unexpected error fetching a range of headers from the header store" - .to_string(), - }); - } - range_opt.map_err(HeaderPersistenceError::Database) - } - // Reset the compact filter queue because we received a new block pub(crate) fn clear_compact_filter_queue(&mut self) { self.request_state.agreement_state.reset_agreements(); @@ -554,7 +424,7 @@ mod tests { block::Header, consensus::deserialize, p2p::message_filter::{CFHeaders, CFilter}, - BlockHash, FilterHash, FilterHeader, + BlockHash, FilterHash, FilterHeader, Network, }; use corepc_node::serde_json; use tokio::sync::Mutex; @@ -573,16 +443,15 @@ mod tests { anchor: HeaderCheckpoint, height_monitor: Arc>, peers: u8, - ) -> Chain<()> { + ) -> Chain { let (info_tx, _) = tokio::sync::mpsc::channel::(1); let (warn_tx, _) = tokio::sync::mpsc::unbounded_channel::(); let (event_tx, _) = tokio::sync::mpsc::unbounded_channel::(); Chain::new( - bitcoin::Network::Regtest, + Network::Regtest, Some(anchor), Arc::new(Dialog::new(info_tx, warn_tx, event_tx)), height_monitor, - (), peers, ) } diff --git a/src/chain/error.rs b/src/chain/error.rs index bb41944d..8df4e319 100644 --- a/src/chain/error.rs +++ b/src/chain/error.rs @@ -12,7 +12,6 @@ pub(crate) enum HeaderSyncError { MiscalculatedDifficulty, InvalidBits, FloatingHeaders, - DbError, } impl Display for HeaderSyncError { @@ -38,7 +37,6 @@ impl Display for HeaderSyncError { f, "the peer sent us a chain that does not connect to any header of ours." ), - HeaderSyncError::DbError => write!(f, "the database could not load a fork."), HeaderSyncError::InvalidBits => write!( f, "the target work does not adhere to basic transition requirements." @@ -99,7 +97,6 @@ pub enum CFilterSyncError { UnrequestedStophash, UnknownFilterHash, MisalignedFilterHash, - Filter(FilterError), } impl core::fmt::Display for CFilterSyncError { @@ -119,31 +116,12 @@ impl core::fmt::Display for CFilterSyncError { f, "the filter hash from our header chain and this filter hash do not match." ), - CFilterSyncError::Filter(_) => write!( - f, - "the filter experienced an IO error checking for Script inclusions." - ), } } } impl_sourceless_error!(CFilterSyncError); -#[derive(Debug)] -pub enum FilterError { - IORead, -} - -impl core::fmt::Display for FilterError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - FilterError::IORead => write!(f, "unable to read from the filter contents buffer."), - } - } -} - -impl_sourceless_error!(FilterError); - #[derive(Debug)] pub(crate) enum BlockScanError { NoBlockHash, diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 2c323621..8ea11d40 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -26,7 +26,6 @@ use bitcoin::{ use crate::network::PeerId; use cfheader_batch::CFHeaderBatch; -use error::FilterError; type Height = u32; @@ -186,13 +185,10 @@ impl Filter { &self.block_hash } - pub fn contains_any<'a>( - &'a self, - scripts: impl Iterator, - ) -> Result { + pub fn contains_any<'a>(&'a self, scripts: impl Iterator) -> bool { self.block_filter .match_any(&self.block_hash, scripts.map(|script| script.to_bytes())) - .map_err(|_| FilterError::IORead) + .expect("vec readers cannot fail") } pub fn into_filter(self) -> BlockFilter { @@ -242,8 +238,6 @@ trait HeightExt: Clone + Copy + std::hash::Hash + PartialEq + Eq + PartialOrd + fn from_u64_checked(height: u64) -> Option; fn is_adjustment_multiple(&self, params: impl AsRef) -> bool; - - fn last_epoch_start(&self, params: impl AsRef) -> Self; } impl HeightExt for u32 { @@ -258,12 +252,6 @@ impl HeightExt for u32 { fn from_u64_checked(height: u64) -> Option { height.try_into().ok() } - - fn last_epoch_start(&self, params: impl AsRef) -> Self { - let diff_adjustment_interval = params.as_ref().difficulty_adjustment_interval() as u32; - let floor = self / diff_adjustment_interval; - floor * diff_adjustment_interval - } } // Emulation of `GetBlockSubsidy` in Bitcoin Core: https://github.com/bitcoin/bitcoin/blob/master/src/validation.cpp#L1944 @@ -280,7 +268,6 @@ pub(crate) fn block_subsidy(height: u32) -> Amount { #[cfg(test)] mod tests { use super::*; - use bitcoin::Network; #[test] fn test_height_monitor() { @@ -309,18 +296,6 @@ mod tests { assert!(height_monitor.max().unwrap().eq(&12)); } - #[test] - fn test_height_ext() { - assert!(2016.is_adjustment_multiple(Network::Bitcoin)); - assert!(4032.is_adjustment_multiple(Network::Bitcoin)); - let height = 2300; - assert_eq!(height.last_epoch_start(Network::Bitcoin), 2016); - let height = 4033; - assert_eq!(height.last_epoch_start(Network::Bitcoin), 4032); - let height = 4032; - assert_eq!(height.last_epoch_start(Network::Bitcoin), 4032); - } - #[test] fn test_subsidy_calculation() { let first_subsidy = block_subsidy(2); diff --git a/src/client.rs b/src/client.rs index 5854fda0..d0c192c0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,6 @@ -use bitcoin::{block::Header, BlockHash, FeeRate}; use bitcoin::{Amount, Transaction}; -use std::{collections::BTreeMap, ops::Range, time::Duration}; +use bitcoin::{BlockHash, FeeRate}; +use std::time::Duration; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; @@ -11,7 +11,7 @@ use crate::{Event, Info, TrustedPeer, TxBroadcast, Warning}; use super::{error::FetchBlockError, IndexedBlock}; use super::{ - error::{ClientError, FetchFeeRateError, FetchHeaderError}, + error::{ClientError, FetchFeeRateError}, messages::ClientMessage, }; @@ -116,43 +116,6 @@ impl Requester { rx.await.map_err(|_| FetchFeeRateError::RecvError) } - /// Get a header at the specified height, if it exists. - /// - /// # Note - /// - /// The height of the chain is the canonical index of the header in the chain. - /// For example, the genesis block is at a height of zero. - /// - /// # Errors - /// - /// If the node has stopped running. - pub async fn get_header(&self, height: u32) -> Result { - let (tx, rx) = tokio::sync::oneshot::channel::>(); - let message = ClientRequest::new(height, tx); - self.ntx - .send(ClientMessage::GetHeader(message)) - .map_err(|_| FetchHeaderError::SendError)?; - rx.await.map_err(|_| FetchHeaderError::RecvError)? - } - - /// Get a range of headers by the specified range. - /// - /// # Errors - /// - /// If the node has stopped running. - pub async fn get_header_range( - &self, - range: Range, - ) -> Result, FetchHeaderError> { - let (tx, rx) = - tokio::sync::oneshot::channel::, FetchHeaderError>>(); - let message = ClientRequest::new(range, tx); - self.ntx - .send(ClientMessage::GetHeaderBatch(message)) - .map_err(|_| FetchHeaderError::SendError)?; - rx.await.map_err(|_| FetchHeaderError::RecvError)? - } - /// Request a block be fetched. Note that this method will request a block /// from a connected peer's inventory, and may take an indefinite amount of /// time, until a peer responds. diff --git a/src/db/sqlite/headers.rs b/src/db/sqlite/headers.rs index c6d6cfbb..26fc297b 100644 --- a/src/db/sqlite/headers.rs +++ b/src/db/sqlite/headers.rs @@ -10,9 +10,7 @@ use rusqlite::{params, params_from_iter, Connection, Result}; use tokio::sync::Mutex; use crate::db::error::{SqlHeaderStoreError, SqlInitializationError}; -use crate::db::traits::HeaderStore; use crate::db::BlockHeaderChanges; -use crate::prelude::FutureResult; use super::{DATA_DIR, DEFAULT_CWD}; @@ -84,7 +82,8 @@ impl SqliteHeaderDb { Ok(()) } - async fn load<'a>( + /// Load a range of headers by their range of heights. + pub async fn load<'a>( &mut self, range: impl RangeBounds + Send + Sync + 'a, ) -> Result, SqlHeaderStoreError> { @@ -137,7 +136,8 @@ impl SqliteHeaderDb { Ok(headers) } - fn stage(&mut self, changes: BlockHeaderChanges) { + /// State changes to the block header tree. + pub fn stage(&mut self, changes: BlockHeaderChanges) { match changes { BlockHeaderChanges::Connected(indexed_header) => { self.accepted @@ -161,7 +161,8 @@ impl SqliteHeaderDb { } } - async fn write(&mut self) -> Result<(), SqlHeaderStoreError> { + /// Flush the changes. + pub async fn write(&mut self) -> Result<(), SqlHeaderStoreError> { let mut write_lock = self.conn.lock().await; let tx = write_lock.transaction()?; for removed in core::mem::take(&mut self.disconnected) { @@ -180,7 +181,8 @@ impl SqliteHeaderDb { Ok(()) } - async fn height_of( + /// Fetch the height of a block hash. + pub async fn height_of( &mut self, block_hash: &BlockHash, ) -> Result, SqlHeaderStoreError> { @@ -191,7 +193,8 @@ impl SqliteHeaderDb { Ok(row) } - async fn hash_at(&mut self, height: u32) -> Result, SqlHeaderStoreError> { + /// Fetch the hash at a particular height. + pub async fn hash_at(&mut self, height: u32) -> Result, SqlHeaderStoreError> { let write_lock = self.conn.lock().await; let stmt = "SELECT block_hash FROM headers WHERE height = ?1"; let row: Option<[u8; 32]> = @@ -202,7 +205,8 @@ impl SqliteHeaderDb { } } - async fn header_at(&mut self, height: u32) -> Result, SqlHeaderStoreError> { + /// Get the header at a particular height. + pub async fn header_at(&mut self, height: u32) -> Result, SqlHeaderStoreError> { let write_lock = self.conn.lock().await; let stmt = "SELECT * FROM headers WHERE height = ?1"; let query = write_lock.query_row(stmt, params![height], |row| { @@ -220,40 +224,6 @@ impl SqliteHeaderDb { } } -impl HeaderStore for SqliteHeaderDb { - type Error = SqlHeaderStoreError; - - fn load<'a>( - &'a mut self, - range: impl RangeBounds + Send + Sync + 'a, - ) -> FutureResult<'a, BTreeMap, Self::Error> { - Box::pin(self.load(range)) - } - - fn stage(&mut self, changes: BlockHeaderChanges) { - self.stage(changes) - } - - fn write(&mut self) -> FutureResult<'_, (), Self::Error> { - Box::pin(self.write()) - } - - fn height_of<'a>( - &'a mut self, - hash: &'a BlockHash, - ) -> FutureResult<'a, Option, Self::Error> { - Box::pin(self.height_of(hash)) - } - - fn hash_at(&mut self, height: u32) -> FutureResult<'_, Option, Self::Error> { - Box::pin(self.hash_at(height)) - } - - fn header_at(&mut self, height: u32) -> FutureResult<'_, Option
, Self::Error> { - Box::pin(self.header_at(height)) - } -} - #[cfg(test)] mod tests { use crate::chain::IndexedHeader; diff --git a/src/db/traits.rs b/src/db/traits.rs index 635495d6..4297ce58 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -1,41 +1,9 @@ use std::fmt::Debug; -use std::ops::RangeBounds; -use std::{collections::BTreeMap, fmt::Display}; - -use bitcoin::{block::Header, BlockHash}; +use std::fmt::Display; use crate::prelude::FutureResult; -use super::{BlockHeaderChanges, PersistedPeer}; - -/// Methods required to persist the chain of block headers. -pub trait HeaderStore: Debug + Send + Sync { - /// Errors that may occur within a [`HeaderStore`]. - type Error: Debug + Display; - /// Load the headers of the canonical chain for the specified range. - fn load<'a>( - &'a mut self, - range: impl RangeBounds + Send + Sync + 'a, - ) -> FutureResult<'a, BTreeMap, Self::Error>; - - /// Stage changes to the chain to be written in the future. - fn stage(&mut self, changes: BlockHeaderChanges); - - /// Commit the changes by writing them to disk. - fn write(&mut self) -> FutureResult<'_, (), Self::Error>; - - /// Return the height of a block hash in the database, if it exists. - fn height_of<'a>( - &'a mut self, - hash: &'a BlockHash, - ) -> FutureResult<'a, Option, Self::Error>; - - /// Return the hash at the height in the database, if it exists. - fn hash_at(&mut self, height: u32) -> FutureResult<'_, Option, Self::Error>; - - /// Return the header at the height in the database, if it exists. - fn header_at(&mut self, height: u32) -> FutureResult<'_, Option
, Self::Error>; -} +use super::PersistedPeer; /// Methods that define a list of peers on the Bitcoin P2P network. pub trait PeerStore: Debug + Send + Sync { @@ -54,7 +22,6 @@ pub trait PeerStore: Debug + Send + Sync { #[cfg(test)] mod test { use super::*; - use std::convert::Infallible; /// Errors for the [`PeerStore`](crate) of unit type. #[derive(Debug)] @@ -94,50 +61,4 @@ mod test { Box::pin(do_num_unbanned()) } } - - impl HeaderStore for () { - type Error = Infallible; - fn load<'a>( - &'a mut self, - _range: impl RangeBounds + Send + Sync + 'a, - ) -> FutureResult<'a, BTreeMap, Self::Error> { - async fn do_load() -> Result, Infallible> { - Ok(BTreeMap::new()) - } - Box::pin(do_load()) - } - - fn stage(&mut self, _changes: BlockHeaderChanges) {} - - fn write(&mut self) -> FutureResult<'_, (), Self::Error> { - async fn do_write() -> Result<(), Infallible> { - Ok(()) - } - Box::pin(do_write()) - } - - fn height_of<'a>( - &'a mut self, - _block_hash: &'a BlockHash, - ) -> FutureResult<'a, Option, Self::Error> { - async fn do_height_of() -> Result, Infallible> { - Ok(None) - } - Box::pin(do_height_of()) - } - - fn hash_at(&mut self, _height: u32) -> FutureResult<'_, Option, Self::Error> { - async fn do_hast_at() -> Result, Infallible> { - Ok(None) - } - Box::pin(do_hast_at()) - } - - fn header_at(&mut self, _height: u32) -> FutureResult<'_, Option
, Self::Error> { - async fn do_header_at() -> Result, Infallible> { - Ok(None) - } - Box::pin(do_header_at()) - } - } } diff --git a/src/error.rs b/src/error.rs index a2ed98bc..9e6111cf 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,38 +1,29 @@ use std::fmt::{Debug, Display}; -use crate::impl_sourceless_error; +use crate::{db::error::SqlHeaderStoreError, impl_sourceless_error}; /// Errors that prevent the node from running. #[derive(Debug)] -pub enum NodeError { - /// The persistence layer experienced a critical error. - HeaderDatabase(HeaderPersistenceError), +pub enum NodeError { /// The persistence layer experienced a critical error. PeerDatabase(PeerManagerError

), } -impl core::fmt::Display for NodeError { +impl core::fmt::Display for NodeError

{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - NodeError::HeaderDatabase(e) => write!(f, "block headers: {e}"), NodeError::PeerDatabase(e) => write!(f, "peer manager: {e}"), } } } -impl std::error::Error for NodeError { +impl std::error::Error for NodeError

{ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { None } } -impl From> for NodeError { - fn from(value: HeaderPersistenceError) -> Self { - NodeError::HeaderDatabase(value) - } -} - -impl From> for NodeError { +impl From> for NodeError

{ fn from(value: PeerManagerError

) -> Self { NodeError::PeerDatabase(value) } @@ -69,7 +60,7 @@ impl From

for PeerManagerError

{ /// Errors with the block header representation that prevent the node from operating. #[derive(Debug)] -pub enum HeaderPersistenceError { +pub enum HeaderPersistenceError { /// The block headers do not point to each other in a list. HeadersDoNotLink, /// Some predefined checkpoint does not match. @@ -77,10 +68,10 @@ pub enum HeaderPersistenceError { /// A user tried to retrieve headers too far in the past for what is in their database. CannotLocateHistory, /// A database error. - Database(H), + Database(SqlHeaderStoreError), } -impl core::fmt::Display for HeaderPersistenceError { +impl core::fmt::Display for HeaderPersistenceError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { HeaderPersistenceError::HeadersDoNotLink => write!(f, "the headers loaded from persistence do not link together."), @@ -91,13 +82,9 @@ impl core::fmt::Display for HeaderPersistenceError { } } -impl std::error::Error for HeaderPersistenceError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - None - } -} +impl_sourceless_error!(HeaderPersistenceError); -/// Errors occuring when the client is talking to the node. +/// Errors occurring when the client is talking to the node. #[derive(Debug)] pub enum ClientError { /// The channel to the node was likely closed and dropped from memory. @@ -116,17 +103,12 @@ impl core::fmt::Display for ClientError { impl_sourceless_error!(ClientError); -/// Errors occuring when the client is fetching headers from the node. +/// Errors occurring when the client is fetching headers from the node. #[derive(Debug)] pub enum FetchHeaderError { /// The channel to the node was likely closed and dropped from memory. /// This implies the node is not running. SendError, - /// The database operation failed while attempting to find the header. - DatabaseOptFailed { - /// The message from the backend describing the failure. - error: String, - }, /// The channel to the client was likely closed by the node and dropped from memory. RecvError, /// The header at the requested height does not yet exist. @@ -139,12 +121,6 @@ impl core::fmt::Display for FetchHeaderError { FetchHeaderError::SendError => { write!(f, "the receiver of this message was dropped from memory.") } - FetchHeaderError::DatabaseOptFailed { error } => { - write!( - f, - "the database operation failed while attempting to find the header: {error}" - ) - } FetchHeaderError::RecvError => write!( f, "the channel to the client was likely closed by the node and dropped from memory." @@ -158,17 +134,12 @@ impl core::fmt::Display for FetchHeaderError { impl_sourceless_error!(FetchHeaderError); -/// Errors occuring when the client is fetching blocks from the node. +/// Errors occurring when the client is fetching blocks from the node. #[derive(Debug)] pub enum FetchBlockError { /// The channel to the node was likely closed and dropped from memory. /// This implies the node is not running. SendError, - /// The database operation failed while attempting to find the header. - DatabaseOptFailed { - /// The message from the backend describing the failure. - error: String, - }, /// The channel to the client was likely closed by the node and dropped from memory. RecvError, /// The hash is not a member of the chain of most work. @@ -181,12 +152,6 @@ impl core::fmt::Display for FetchBlockError { FetchBlockError::SendError => { write!(f, "the receiver of this message was dropped from memory.") } - FetchBlockError::DatabaseOptFailed { error } => { - write!( - f, - "the database operation failed while attempting to find the header: {error}" - ) - } FetchBlockError::RecvError => write!( f, "the channel to the client was likely closed by the node and dropped from memory." diff --git a/src/lib.rs b/src/lib.rs index 09d78b04..a1472eae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,7 +88,7 @@ pub use chain::checkpoints::HeaderCheckpoint; pub use db::sqlite::{headers::SqliteHeaderDb, peers::SqlitePeerDb}; #[doc(inline)] -pub use db::traits::{HeaderStore, PeerStore}; +pub use db::traits::PeerStore; #[doc(inline)] pub use tokio::sync::mpsc::Receiver; @@ -154,9 +154,7 @@ impl IndexedFilter { /// Does the filter contain a positive match for any of the provided scripts pub fn contains_any<'a>(&'a self, scripts: impl Iterator) -> bool { - self.filter - .contains_any(scripts) - .expect("vec reader is infallible") + self.filter.contains_any(scripts) } /// Consume the index and get underlying block filter. diff --git a/src/messages.rs b/src/messages.rs index fa50d305..6bcc709d 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -1,14 +1,15 @@ -use std::{collections::BTreeMap, ops::Range, time::Duration}; +use std::{collections::BTreeMap, time::Duration}; use bitcoin::{block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, Wtxid}; +use crate::db::BlockHeaderChanges; use crate::IndexedFilter; use crate::{ chain::{checkpoints::HeaderCheckpoint, IndexedHeader}, IndexedBlock, TrustedPeer, TxBroadcast, }; -use super::error::{FetchBlockError, FetchHeaderError}; +use super::error::FetchBlockError; /// Informational messages emitted by a node #[derive(Debug, Clone)] @@ -63,12 +64,7 @@ pub enum Event { /// The node is fully synced, having scanned the requested range. FiltersSynced(SyncUpdate), /// Blocks were reorganized out of the chain. - BlocksDisconnected { - /// Blocks that were accepted to the chain of most work in ascending order by height. - accepted: Vec, - /// Blocks that were disconnected from the chain of most work in ascending order by height. - disconnected: Vec, - }, + Chain(BlockHeaderChanges), /// A compact block filter with associated height and block hash. IndexedFilter(IndexedFilter), } @@ -158,10 +154,6 @@ pub(crate) enum ClientMessage { SetDuration(Duration), /// Add another known peer to connect to. AddPeer(TrustedPeer), - /// Request a header from a specified height. - GetHeader(ClientRequest>), - /// Request a range of headers. - GetHeaderBatch(ClientRequest, Result, FetchHeaderError>>), /// Request the broadcast minimum fee rate. GetBroadcastMinFeeRate(ClientRequest<(), FeeRate>), /// Send an empty message to see if the node is running. diff --git a/src/node.rs b/src/node.rs index 5bc26984..7bfdac57 100644 --- a/src/node.rs +++ b/src/node.rs @@ -26,11 +26,11 @@ use crate::{ block_queue::{BlockQueue, BlockRecipient, ProcessBlockResponse}, chain::Chain, checkpoints::HeaderCheckpoint, - error::{CFilterSyncError, HeaderSyncError}, + error::HeaderSyncError, CFHeaderChanges, FilterCheck, HeaderChainChanges, HeightMonitor, }, - db::traits::{HeaderStore, PeerStore}, - error::{FetchBlockError, FetchHeaderError}, + db::traits::PeerStore, + error::FetchBlockError, network::{peer_map::PeerMap, LastBlockMonitor, PeerId}, IndexedBlock, NodeState, TxBroadcast, TxBroadcastPolicy, }; @@ -51,9 +51,9 @@ type PeerRequirement = usize; /// A compact block filter node. Nodes download Bitcoin block headers, block filters, and blocks to send relevant events to a client. #[derive(Debug)] -pub struct Node { +pub struct Node { state: NodeState, - chain: Chain, + chain: Chain, peer_map: PeerMap

, required_peers: PeerRequirement, dialog: Arc

, @@ -62,13 +62,8 @@ pub struct Node { peer_recv: Receiver, } -impl Node { - pub(crate) fn new( - network: Network, - config: NodeConfig, - peer_store: P, - header_store: H, - ) -> (Self, Client) { +impl Node

{ + pub(crate) fn new(network: Network, config: NodeConfig, peer_store: P) -> (Self, Client) { let NodeConfig { required_peers, white_list, @@ -110,7 +105,6 @@ impl Node { header_checkpoint, Arc::clone(&dialog), height_monitor, - header_store, required_peers, ); ( @@ -133,13 +127,12 @@ impl Node { /// # Errors /// /// A node will cease running if a fatal error is encountered with either the [`PeerStore`] or [`HeaderStore`]. - pub async fn run(mut self) -> Result<(), NodeError> { + pub async fn run(mut self) -> Result<(), NodeError> { crate::debug!("Starting node"); crate::debug!(format!( "Configured connection requirement: {} peers", self.required_peers )); - self.fetch_headers().await?; let mut last_block = LastBlockMonitor::new(); let mut interval = tokio::time::interval(LOOP_TIMEOUT); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -246,22 +239,6 @@ impl Node { ClientMessage::AddPeer(peer) => { self.peer_map.add_trusted_peer(peer); }, - ClientMessage::GetHeader(request) => { - let (height, oneshot) = request.into_values(); - let header_opt = self.chain.fetch_header(height).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() }).and_then(|opt| opt.ok_or(FetchHeaderError::UnknownHeight)); - let send_result = oneshot.send(header_opt); - if send_result.is_err() { - self.dialog.send_warning(Warning::ChannelDropped); - }; - }, - ClientMessage::GetHeaderBatch(request) => { - let (range, oneshot) = request.into_values(); - let range_opt = self.chain.fetch_header_range(range).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() }); - let send_result = oneshot.send(range_opt); - if send_result.is_err() { - self.dialog.send_warning(Warning::ChannelDropped); - }; - }, ClientMessage::GetBroadcastMinFeeRate(request) => { let (_, oneshot) = request.into_values(); let fee_rate = self.peer_map.broadcast_min(); @@ -280,7 +257,7 @@ impl Node { } // Connect to a new peer if we are not connected to enough - async fn dispatch(&mut self) -> Result<(), NodeError> { + async fn dispatch(&mut self) -> Result<(), NodeError> { self.peer_map.clean().await; let live = self.peer_map.live(); let required = self.next_required_peers(); @@ -401,7 +378,7 @@ impl Node { &mut self, nonce: PeerId, version_message: VersionMessage, - ) -> Result> { + ) -> Result> { if version_message.version < WTXID_VERSION { return Ok(MainThreadMessage::Disconnect); } @@ -543,13 +520,8 @@ impl Node { self.dialog.send_warning(Warning::UnexpectedSyncError { warning: format!("Compact filter syncing encountered an error: {e}"), }); - match e { - CFilterSyncError::Filter(_) => Some(MainThreadMessage::Disconnect), - _ => { - self.peer_map.ban(peer_id).await; - Some(MainThreadMessage::Disconnect) - } - } + self.peer_map.ban(peer_id).await; + Some(MainThreadMessage::Disconnect) } } } @@ -668,13 +640,4 @@ impl Node { } } } - - // When the application starts, fetch any headers we know about from the database. - async fn fetch_headers(&mut self) -> Result<(), NodeError> { - crate::debug!("Attempting to load headers from the database"); - self.chain - .load_headers() - .await - .map_err(NodeError::HeaderDatabase) - } } diff --git a/tests/core.rs b/tests/core.rs index ec5bd135..767bcbe3 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -5,9 +5,7 @@ use std::{ }; use bip157::{ - chain::checkpoints::HeaderCheckpoint, client::Client, lookup_host, node::Node, Address, - BlockHash, Event, Info, ServiceFlags, SqliteHeaderDb, SqlitePeerDb, Transaction, TrustedPeer, - Warning, + chain::checkpoints::HeaderCheckpoint, client::Client, db::BlockHeaderChanges, lookup_host, node::Node, Address, BlockHash, Event, Info, ServiceFlags, SqlitePeerDb, Transaction, TrustedPeer, Warning }; use bitcoin::{ absolute, @@ -52,7 +50,7 @@ fn new_node( socket_addr: SocketAddrV4, tempdir_path: PathBuf, checkpoint: Option, -) -> (Node, Client) { +) -> (Node, Client) { let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port())); let mut trusted: TrustedPeer = host.into(); trusted.set_services(ServiceFlags::P2P_V2); @@ -151,10 +149,10 @@ async fn live_reorg() { // Make sure the reorg was caught while let Some(message) = channel.recv().await { match message { - bip157::messages::Event::BlocksDisconnected { + Event::Chain(BlockHeaderChanges::Reorganized { accepted: _, - disconnected: blocks, - } => { + reorganized: blocks, + }) => { assert_eq!(blocks.len(), 1); assert_eq!(blocks.first().unwrap().header.block_hash(), old_best); assert_eq!(old_height as u32, blocks.first().unwrap().height); @@ -193,18 +191,16 @@ async fn live_reorg_additional_sync() { // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); - let fetched_header = requester.get_header(10).await.unwrap(); - assert_eq!(old_best, fetched_header.block_hash()); invalidate_block(rpc, &best).await; mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the reorg was caught while let Some(message) = channel.recv().await { match message { - bip157::messages::Event::BlocksDisconnected { + Event::Chain(BlockHeaderChanges::Reorganized { accepted: _, - disconnected: blocks, - } => { + reorganized: blocks, + }) => { assert_eq!(blocks.len(), 1); assert_eq!(blocks.first().unwrap().header.block_hash(), old_best); assert_eq!(old_height as u32, blocks.first().unwrap().height); @@ -242,10 +238,7 @@ async fn various_client_methods() { } = client; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); sync_assert(&best, &mut channel).await; - let batch = requester.get_header_range(10_000..10_002).await.unwrap(); - assert!(batch.is_empty()); let _ = requester.broadcast_min_feerate().await.unwrap(); - let _ = requester.get_header(3).await.unwrap(); assert!(requester.is_running()); requester.shutdown().unwrap(); rpc.stop().unwrap(); @@ -270,8 +263,6 @@ async fn stop_reorg_resync() { } = client; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); sync_assert(&best, &mut channel).await; - let batch = requester.get_header_range(0..10).await.unwrap(); - assert!(!batch.is_empty()); requester.shutdown().unwrap(); // Reorganize the blocks let old_best = best; @@ -292,10 +283,10 @@ async fn stop_reorg_resync() { // Make sure the reorganization is caught after a cold start while let Some(message) = channel.recv().await { match message { - bip157::messages::Event::BlocksDisconnected { + Event::Chain(BlockHeaderChanges::Reorganized { accepted: _, - disconnected: blocks, - } => { + reorganized: blocks, + }) => { assert_eq!(blocks.len(), 1); assert_eq!(blocks.first().unwrap().header.block_hash(), old_best); assert_eq!(old_height as u32, blocks.first().unwrap().height); @@ -370,10 +361,10 @@ async fn stop_reorg_two_resync() { let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); while let Some(message) = channel.recv().await { match message { - bip157::messages::Event::BlocksDisconnected { + Event::Chain(BlockHeaderChanges::Reorganized { accepted: _, - disconnected: blocks, - } => { + reorganized: blocks, + }) => { assert_eq!(blocks.len(), 2); assert_eq!(blocks.last().unwrap().header.block_hash(), old_best); assert_eq!(old_height as u32, blocks.last().unwrap().height); @@ -450,10 +441,10 @@ async fn stop_reorg_start_on_orphan() { // Ensure SQL is able to catch the fork by loading in headers from the database while let Some(message) = channel.recv().await { match message { - bip157::messages::Event::BlocksDisconnected { + Event::Chain(BlockHeaderChanges::Reorganized { accepted: _, - disconnected: blocks, - } => { + reorganized: blocks, + }) => { assert_eq!(blocks.len(), 1); assert_eq!(blocks.first().unwrap().header.block_hash(), old_best); assert_eq!(old_height as u32, blocks.first().unwrap().height);