diff --git a/overlay/src/flood/inv_tracker.rs b/overlay/src/flood/inv_tracker.rs index 9735aa96a..a0edfe895 100644 --- a/overlay/src/flood/inv_tracker.rs +++ b/overlay/src/flood/inv_tracker.rs @@ -11,13 +11,35 @@ use std::num::NonZeroUsize; /// Default capacity for tracking TX sources pub const INV_TRACKER_CAPACITY: usize = 100_000; +#[derive(Debug)] +struct InvSource { + peers: Vec, + next_idx: usize, +} + +impl InvSource { + fn new() -> Self { + InvSource { + peers: Vec::new(), + next_idx: 0, + } + } + + fn next_peer(&mut self) -> Option { + if self.peers.is_empty() { + return None; + } + let peer = self.peers[self.next_idx % self.peers.len()]; + self.next_idx += 1; + Some(peer) + } +} + /// Tracks which peers have advertised which transactions #[derive(Debug)] pub struct InvTracker { - /// TX hash -> ordered list of peers who INV'd it - sources: LruCache<[u8; 32], Vec>, - /// TX hash -> next peer index for round-robin GETDATA - next_idx: LruCache<[u8; 32], usize>, + /// TX hash -> (ordered list of peers who INV'd it, peer index) + sources: LruCache<[u8; 32], InvSource>, } impl InvTracker { @@ -28,7 +50,6 @@ impl InvTracker { fn with_capacity(capacity: usize) -> Self { InvTracker { sources: LruCache::new(NonZeroUsize::new(capacity).unwrap()), - next_idx: LruCache::new(NonZeroUsize::new(capacity).unwrap()), } } @@ -36,10 +57,10 @@ impl InvTracker { pub fn record_source(&mut self, hash: [u8; 32], peer: PeerId) -> bool { let is_first = !self.sources.contains(&hash); - let sources = self.sources.get_or_insert_mut(hash, Vec::new); + let sources = self.sources.get_or_insert_mut(hash, InvSource::new); // Avoid duplicates - if !sources.contains(&peer) { - sources.push(peer); + if !sources.peers.contains(&peer) { + sources.peers.push(peer); } is_first @@ -47,23 +68,12 @@ impl InvTracker { /// Get the next peer to request from (round-robin) pub fn get_next_peer(&mut self, hash: &[u8; 32]) -> Option { - let sources = self.sources.get(hash)?; - if sources.is_empty() { - return None; - } - - let idx = *self.next_idx.get_or_insert(*hash, || 0); - let peer = sources[idx % sources.len()]; - - // Advance for next call - self.next_idx.put(*hash, idx + 1); - - Some(peer) + self.sources.get_mut(hash)?.next_peer() } /// Peek at sources without updating LRU (for read-only access) pub fn peek_sources(&self, hash: &[u8; 32]) -> Option<&Vec> { - self.sources.peek(hash) + self.sources.peek(hash).map(|source| &source.peers) } } diff --git a/overlay/src/flood/mempool.rs b/overlay/src/flood/mempool.rs index 99181e70e..a9fdc077a 100644 --- a/overlay/src/flood/mempool.rs +++ b/overlay/src/flood/mempool.rs @@ -201,7 +201,7 @@ impl Mempool { } /// Remove transactions that are too old. - pub fn evict_expired(&mut self) { + pub fn evict_expired(&mut self) -> usize { let now = Instant::now(); let to_remove: Vec = self .by_hash @@ -210,9 +210,11 @@ impl Mempool { .map(|tx| tx.hash) .collect(); + let count = to_remove.len(); for hash in to_remove { self.remove(&hash); } + count } /// Current number of transactions. diff --git a/overlay/src/flood/pending_requests.rs b/overlay/src/flood/pending_requests.rs index 2a7255d91..e96bc3f4b 100644 --- a/overlay/src/flood/pending_requests.rs +++ b/overlay/src/flood/pending_requests.rs @@ -98,18 +98,22 @@ impl PendingRequests { /// Process timeouts: returns (retry_list, give_up_list) /// - retry_list: hashes that timed out but haven't given up - /// - give_up_list: hashes that should be abandoned - pub fn process_timeouts(&self) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) { + /// - give_up_list: hashes that have been abandoned + pub fn process_timeouts(&mut self) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) { let mut retry = Vec::new(); let mut give_up = Vec::new(); - for (hash, req) in &self.requests { + self.requests.retain(|hash, req| { if req.should_give_up(self.total_timeout) { give_up.push(*hash); + false } else if req.is_timed_out(self.peer_timeout) { retry.push(*hash); + true + } else { + true } - } + }); (retry, give_up) } diff --git a/overlay/src/flood/txset.rs b/overlay/src/flood/txset.rs index 5acfab189..bd29a25a0 100644 --- a/overlay/src/flood/txset.rs +++ b/overlay/src/flood/txset.rs @@ -1,6 +1,6 @@ //! TX set cache for peer request/response handling. -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; /// 32-byte hash pub type Hash256 = [u8; 32]; @@ -20,6 +20,8 @@ pub struct CachedTxSet { pub struct TxSetCache { /// TX sets by hash by_hash: HashMap, + /// Order of hashes for eviction + order: VecDeque, /// Max cache size max_size: usize, } @@ -28,19 +30,24 @@ impl TxSetCache { pub fn new(max_size: usize) -> Self { Self { by_hash: HashMap::new(), + order: VecDeque::with_capacity(max_size), max_size, } } /// Insert a TX set into the cache. pub fn insert(&mut self, tx_set: CachedTxSet) { - if self.by_hash.len() >= self.max_size { - // Evict oldest (simple strategy - just remove one) - if let Some(&hash) = self.by_hash.keys().next() { - self.by_hash.remove(&hash); + let hash = tx_set.hash; + if self.by_hash.insert(tx_set.hash, tx_set).is_none() { + if self.by_hash.len() > self.max_size { + // Evict oldest; technically, this makes the minimum cache size 1, but that's fine + // for our use case + if let Some(to_evict) = self.order.pop_front() { + self.by_hash.remove(&to_evict); + } } + self.order.push_back(hash); } - self.by_hash.insert(tx_set.hash, tx_set); } /// Get a TX set by hash. @@ -50,6 +57,13 @@ impl TxSetCache { /// Remove TX sets for ledgers before the given sequence. pub fn evict_before(&mut self, ledger_seq: u32) { + self.order.retain(|hash| { + if let Some(tx_set) = self.by_hash.get(hash) { + tx_set.ledger_seq >= ledger_seq + } else { + false + } + }); self.by_hash.retain(|_, v| v.ledger_seq >= ledger_seq); } @@ -129,6 +143,14 @@ mod tests { }); assert_eq!(cache.len(), 2, "Cache should stay at capacity"); + assert!( + cache.get(&[1u8; 32]).is_none(), + "Oldest inserted item should be evicted" + ); + assert!( + cache.get(&[2u8; 32]).is_some(), + "Second-oldest item should remain" + ); assert!( cache.get(&[3u8; 32]).is_some(), "New item should be present" @@ -157,4 +179,81 @@ mod tests { assert_eq!(retrieved.ledger_seq, 200, "Should have newer data"); assert_eq!(retrieved.xdr, vec![4, 5, 6]); } + + #[test] + fn test_cache_overwrite_when_full_keeps_size() { + let mut cache = TxSetCache::new(1); // Cache holds exactly one entry + + cache.insert(CachedTxSet { + hash: [1u8; 32], + xdr: vec![1, 2, 3], + ledger_seq: 100, + }); + assert_eq!(cache.len(), 1, "Cache is full"); + + // Overwrite the only (existing) hash while the cache is full - must + // not evict the entry or shrink the cache below capacity. + cache.insert(CachedTxSet { + hash: [1u8; 32], + xdr: vec![4, 5, 6], + ledger_seq: 102, + }); + + assert_eq!(cache.len(), 1, "Overwrite should not shrink the cache"); + let retrieved = cache + .get(&[1u8; 32]) + .expect("Overwritten item should remain present"); + assert_eq!(retrieved.ledger_seq, 102, "Should have newer data"); + assert_eq!(retrieved.xdr, vec![4, 5, 6]); + } + + #[test] + fn test_overwrite_then_evict_before_keeps_order_in_sync() { + let mut cache = TxSetCache::new(2); + + // Insert, then overwrite the same hash with a newer ledger_seq. + cache.insert(CachedTxSet { + hash: [1u8; 32], + xdr: vec![1, 2, 3], + ledger_seq: 100, + }); + cache.insert(CachedTxSet { + hash: [1u8; 32], + xdr: vec![4, 5, 6], + ledger_seq: 200, + }); + + // The entry's effective ledger_seq is now 200, so evicting before 150 + // must NOT remove it. + cache.evict_before(150); + assert!( + cache.get(&[1u8; 32]).is_some(), + "Overwritten entry (seq 200) must survive evict_before(150)" + ); + assert_eq!(cache.len(), 1); + + // `order` must still track the surviving entry: filling the cache and + // overflowing it should evict hash [1u8; 32] as the oldest, leaving the + // cache exactly at capacity (not one over). + cache.insert(CachedTxSet { + hash: [2u8; 32], + xdr: vec![], + ledger_seq: 300, + }); + cache.insert(CachedTxSet { + hash: [3u8; 32], + xdr: vec![], + ledger_seq: 400, + }); + + assert_eq!( + cache.len(), + 2, + "Cache must stay at capacity; eviction tracking must not have leaked" + ); + assert!( + cache.get(&[1u8; 32]).is_none(), + "Oldest entry should have been evicted by the capacity path" + ); + } } diff --git a/overlay/src/integrated.rs b/overlay/src/integrated.rs index e690753c0..de1d3ef5c 100644 --- a/overlay/src/integrated.rs +++ b/overlay/src/integrated.rs @@ -121,7 +121,11 @@ impl Overlay { for hash in tx_hashes { mempool.remove(&hash); } - info!("Removed {} TXs from mempool", count); + let expired = mempool.evict_expired(); + info!( + "Removed {} (requested) + {} (expired) TXs from mempool", + count, expired + ); // Signal completion if caller is waiting if let Some(tx) = reply { let _ = tx.send(()).await; diff --git a/overlay/src/libp2p_overlay.rs b/overlay/src/libp2p_overlay.rs index 2ace73859..ecb4aa195 100644 --- a/overlay/src/libp2p_overlay.rs +++ b/overlay/src/libp2p_overlay.rs @@ -2060,7 +2060,7 @@ async fn inv_getdata_housekeeping_task(state: Arc) { // 2. Handle GETDATA timeouts let (to_retry, gave_up) = { - let pending = state.pending_getdata.write().await; + let mut pending = state.pending_getdata.write().await; pending.process_timeouts() };