Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions example/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ async fn main() {
tracing::info!("Last block average fee rate: {:#}", avg_fee_rate);
break;
},
Event::BlocksDisconnected { accepted: _, disconnected: _} => {
tracing::warn!("Some blocks were reorganized")
},
Event::IndexedFilter(_) => (),
_ => (),
}
}
Expand Down
25 changes: 3 additions & 22 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,19 @@

use bitcoin::Network;

use super::{client::Client, config::NodeConfig, node::Node};

Check failure on line 6 in src/builder.rs

View workflow job for this annotation

GitHub Actions / features

unused imports: `client::Client` and `node::Node`
use crate::chain::checkpoints::HeaderCheckpoint;
#[cfg(feature = "rusqlite")]
use crate::db::error::SqlInitializationError;
#[cfg(feature = "rusqlite")]
use crate::db::sqlite::{headers::SqliteHeaderDb, peers::SqlitePeerDb};
use crate::db::sqlite::peers::SqlitePeerDb;
use crate::network::dns::{DnsResolver, DNS_RESOLVER_PORT};
use crate::network::ConnectionType;
use crate::{
chain::checkpoints::HeaderCheckpoint,
db::traits::{HeaderStore, PeerStore},
};
use crate::{PeerStoreSizeConfig, TrustedPeer};

#[cfg(feature = "rusqlite")]
/// The default node returned from the [`Builder`].
pub type NodeDefault = Node<SqliteHeaderDb, SqlitePeerDb>;
pub type NodeDefault = Node<SqlitePeerDb>;

const MIN_PEERS: u8 = 1;
const MAX_PEERS: u8 = 15;
Expand Down Expand Up @@ -171,26 +168,10 @@
#[cfg(feature = "rusqlite")]
pub fn build(&mut self) -> Result<(NodeDefault, Client), SqlInitializationError> {
let peer_store = SqlitePeerDb::new(self.network, self.config.data_path.clone())?;
let header_store = SqliteHeaderDb::new(self.network, self.config.data_path.clone())?;
Ok(Node::new(
self.network,
core::mem::take(&mut self.config),
peer_store,
header_store,
))
}

/// Consume the node builder by using custom database implementations, receiving a [`Node`] and [`Client`].
pub fn build_with_databases<H: HeaderStore + 'static, P: PeerStore + 'static>(
&mut self,
peer_store: P,
header_store: H,
) -> (Node<H, P>, Client) {
Node::new(
self.network,
core::mem::take(&mut self.config),
peer_store,
header_store,
)
}
}
159 changes: 14 additions & 145 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
extern crate alloc;
use std::{collections::BTreeMap, ops::Range, sync::Arc};
use std::{collections::BTreeMap, sync::Arc};

use bitcoin::{
block::Header,
Expand All @@ -14,40 +14,30 @@ use super::{
error::{CFHeaderSyncError, CFilterSyncError, HeaderSyncError},
graph::{AcceptHeaderChanges, BlockTree, HeaderRejection},
CFHeaderChanges, Filter, FilterCheck, FilterHeaderRequest, FilterRequest, FilterRequestState,
HeaderChainChanges, HeightExt, HeightMonitor, PeerId,
};
use crate::IndexedFilter;
use crate::{
chain::header_batch::HeadersBatch,
db::{traits::HeaderStore, BlockHeaderChanges},
dialog::Dialog,
error::HeaderPersistenceError,
messages::{Event, Warning},
Info, Progress,
HeaderChainChanges, HeightMonitor, PeerId,
};
use crate::{chain::header_batch::HeadersBatch, dialog::Dialog, messages::Event, Info, Progress};
use crate::{db::BlockHeaderChanges, IndexedFilter};

const REORG_LOOKBACK: u32 = 7;
const FILTER_BASIC: u8 = 0x00;
const CF_HEADER_BATCH_SIZE: u32 = 1_999;
const FILTER_BATCH_SIZE: u32 = 999;

#[derive(Debug)]
pub(crate) struct Chain<H: HeaderStore> {
pub(crate) struct Chain {
pub(crate) header_chain: BlockTree,
request_state: FilterRequestState,
network: Network,
db: H,
heights: Arc<Mutex<HeightMonitor>>,
dialog: Arc<Dialog>,
}

impl<H: HeaderStore> Chain<H> {
impl Chain {
pub(crate) fn new(
network: Network,
anchor: Option<HeaderCheckpoint>,
dialog: Arc<Dialog>,
height_monitor: Arc<Mutex<HeightMonitor>>,
db: H,
quorum_required: u8,
) -> Self {
let header_chain = match anchor {
Expand All @@ -58,7 +48,6 @@ impl<H: HeaderStore> Chain<H> {
header_chain,
request_state: FilterRequestState::new(quorum_required),
network,
db,
heights: height_monitor,
dialog,
}
Expand All @@ -84,81 +73,6 @@ impl<H: HeaderStore> Chain<H> {
}
}

// Load in headers, ideally allowing the difficulty adjustment to be audited and
// reorganizations to be handled gracefully.
pub(crate) async fn load_headers(&mut self) -> Result<(), HeaderPersistenceError<H::Error>> {
// The original height the user requested a scan after
let scan_height = self.header_chain.height();
// The header relevant to compute the next adjustment
let last_adjustment = scan_height.last_epoch_start(self.network);
// Seven blocks ago
let reorg = scan_height.saturating_sub(REORG_LOOKBACK);
// To handle adjustments and reorgs, we would have the minimum of each of these heights
let min_interesting_height = last_adjustment.min(reorg);
let max_interesting_height = last_adjustment.max(reorg);
// Get the maximum of the two interesting heights. In case the minimum is not available
if let Some(header) = self
.db
.header_at(max_interesting_height)
.await
.ok()
.flatten()
{
self.header_chain =
BlockTree::from_header(max_interesting_height, header, self.network);
}
// If this succeeds, both reorgs and difficulty adjustments can be handled gracefully.
if let Some(header) = self
.db
.header_at(min_interesting_height)
.await
.ok()
.flatten()
{
self.header_chain =
BlockTree::from_header(min_interesting_height, header, self.network);
}
// Now that the block tree is updated to the appropriate start, load in the rest of
// the history from this point onward. This is either: from the user start height,
// from the last difficulty adjustment, or seven blocks ago, depending on what the
// header store was able to provide.
let loaded_headers = self
.db
.load(self.header_chain.height().increment()..)
.await
.map_err(HeaderPersistenceError::Database)?;
for (height, header) in loaded_headers {
let apply_header_changes = self.header_chain.accept_header(header);
match apply_header_changes {
AcceptHeaderChanges::Accepted { connected_at } => {
if height.ne(&connected_at.height) {
self.dialog.send_warning(Warning::CorruptedHeaders);
return Err(HeaderPersistenceError::HeadersDoNotLink);
}
}
AcceptHeaderChanges::Rejected(reject_reason) => match reject_reason {
HeaderRejection::UnknownPrevHash(_) => {
return Err(HeaderPersistenceError::CannotLocateHistory);
}
HeaderRejection::InvalidPow {
expected: _,
got: _,
} => {
crate::debug!(
"Unexpected invalid proof of work when importing a block header"
);
}
},
_ => (),
}
}
// Because the user requested a scan after the `scan_height`, the filters below this point
// may be assumed as checked. Note that in a reorg, filters below this height may still be
// retrieved, as this only considers the canonical chain as checked.
self.header_chain.assume_checked_to(scan_height);
Ok(())
}

// Sync the chain with headers from a peer, adjusting to reorgs if needed
pub(crate) async fn sync_chain(
&mut self,
Expand All @@ -182,7 +96,8 @@ impl<H: HeaderStore> Chain<H> {
connected_at.height,
connected_at.header.block_hash()
));
self.db.stage(BlockHeaderChanges::Connected(connected_at));
self.dialog
.send_event(Event::Chain(BlockHeaderChanges::Connected(connected_at)));
}
AcceptHeaderChanges::Duplicate => (),
AcceptHeaderChanges::ExtendedFork { connected_at } => {
Expand All @@ -201,15 +116,11 @@ impl<H: HeaderStore> Chain<H> {
.map(|index| index.header.block_hash())
.collect();
reorged_hashes = Some(removed_hashes);
self.db.stage(BlockHeaderChanges::Reorganized {
accepted: accepted.clone(),
reorganized: disconnected.clone(),
});
let disconnected_event = Event::BlocksDisconnected {
let changes = BlockHeaderChanges::Reorganized {
accepted,
disconnected,
reorganized: disconnected,
};
self.dialog.send_event(disconnected_event);
self.dialog.send_event(Event::Chain(changes));
}
AcceptHeaderChanges::Rejected(rejected_header) => match rejected_header {
HeaderRejection::InvalidPow {
Expand All @@ -223,11 +134,6 @@ impl<H: HeaderStore> Chain<H> {
},
}
}
if let Err(e) = self.db.write().await {
self.dialog.send_warning(Warning::FailedPersistence {
warning: format!("Could not save headers to disk: {e}"),
});
}
match reorged_hashes {
Some(reorgs) => {
self.clear_compact_filter_queue();
Expand Down Expand Up @@ -471,42 +377,6 @@ impl<H: HeaderStore> Chain<H> {
self.header_chain.filters_synced()
}

// Fetch a header from the cache or disk.
pub(crate) async fn fetch_header(
&mut self,
height: u32,
) -> Result<Option<Header>, HeaderPersistenceError<H::Error>> {
match self.header_chain.header_at_height(height) {
Some(header) => Ok(Some(header)),
None => {
let header_opt = self.db.header_at(height).await;
if header_opt.is_err() {
self.dialog
.send_warning(Warning::FailedPersistence {
warning: format!(
"Unexpected error fetching a header from the header store at height {height}"
),
});
}
header_opt.map_err(HeaderPersistenceError::Database)
}
}
}

pub(crate) async fn fetch_header_range(
&mut self,
range: Range<u32>,
) -> Result<BTreeMap<u32, Header>, HeaderPersistenceError<H::Error>> {
let range_opt = self.db.load(range).await;
if range_opt.is_err() {
self.dialog.send_warning(Warning::FailedPersistence {
warning: "Unexpected error fetching a range of headers from the header store"
.to_string(),
});
}
range_opt.map_err(HeaderPersistenceError::Database)
}

// Reset the compact filter queue because we received a new block
pub(crate) fn clear_compact_filter_queue(&mut self) {
self.request_state.agreement_state.reset_agreements();
Expand Down Expand Up @@ -554,7 +424,7 @@ mod tests {
block::Header,
consensus::deserialize,
p2p::message_filter::{CFHeaders, CFilter},
BlockHash, FilterHash, FilterHeader,
BlockHash, FilterHash, FilterHeader, Network,
};
use corepc_node::serde_json;
use tokio::sync::Mutex;
Expand All @@ -573,16 +443,15 @@ mod tests {
anchor: HeaderCheckpoint,
height_monitor: Arc<Mutex<HeightMonitor>>,
peers: u8,
) -> Chain<()> {
) -> Chain {
let (info_tx, _) = tokio::sync::mpsc::channel::<Info>(1);
let (warn_tx, _) = tokio::sync::mpsc::unbounded_channel::<Warning>();
let (event_tx, _) = tokio::sync::mpsc::unbounded_channel::<Event>();
Chain::new(
bitcoin::Network::Regtest,
Network::Regtest,
Some(anchor),
Arc::new(Dialog::new(info_tx, warn_tx, event_tx)),
height_monitor,
(),
peers,
)
}
Expand Down
22 changes: 0 additions & 22 deletions src/chain/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub(crate) enum HeaderSyncError {
MiscalculatedDifficulty,
InvalidBits,
FloatingHeaders,
DbError,
}

impl Display for HeaderSyncError {
Expand All @@ -38,7 +37,6 @@ impl Display for HeaderSyncError {
f,
"the peer sent us a chain that does not connect to any header of ours."
),
HeaderSyncError::DbError => write!(f, "the database could not load a fork."),
HeaderSyncError::InvalidBits => write!(
f,
"the target work does not adhere to basic transition requirements."
Expand Down Expand Up @@ -99,7 +97,6 @@ pub enum CFilterSyncError {
UnrequestedStophash,
UnknownFilterHash,
MisalignedFilterHash,
Filter(FilterError),
}

impl core::fmt::Display for CFilterSyncError {
Expand All @@ -119,31 +116,12 @@ impl core::fmt::Display for CFilterSyncError {
f,
"the filter hash from our header chain and this filter hash do not match."
),
CFilterSyncError::Filter(_) => write!(
f,
"the filter experienced an IO error checking for Script inclusions."
),
}
}
}

impl_sourceless_error!(CFilterSyncError);

#[derive(Debug)]
pub enum FilterError {
IORead,
}

impl core::fmt::Display for FilterError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FilterError::IORead => write!(f, "unable to read from the filter contents buffer."),
}
}
}

impl_sourceless_error!(FilterError);

#[derive(Debug)]
pub(crate) enum BlockScanError {
NoBlockHash,
Expand Down
Loading
Loading