Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions overlay/src/flood/inv_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,35 @@ use std::num::NonZeroUsize;
/// Default capacity for tracking TX sources
pub const INV_TRACKER_CAPACITY: usize = 100_000;

#[derive(Debug)]
struct InvSource {
peers: Vec<PeerId>,
next_idx: usize,
}

impl InvSource {
fn new() -> Self {
InvSource {
peers: Vec::new(),
next_idx: 0,
}
}

fn next_peer(&mut self) -> Option<PeerId> {
if self.peers.is_empty() {
return None;
}
let peer = self.peers[self.next_idx % self.peers.len()];
self.next_idx += 1;
Some(peer)
}
}

/// Tracks which peers have advertised which transactions
#[derive(Debug)]
pub struct InvTracker {
/// TX hash -> ordered list of peers who INV'd it
sources: LruCache<[u8; 32], Vec<PeerId>>,
/// TX hash -> next peer index for round-robin GETDATA
next_idx: LruCache<[u8; 32], usize>,
/// TX hash -> (ordered list of peers who INV'd it, peer index)
sources: LruCache<[u8; 32], InvSource>,
}

impl InvTracker {
Expand All @@ -28,42 +50,30 @@ impl InvTracker {
fn with_capacity(capacity: usize) -> Self {
InvTracker {
sources: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
next_idx: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
}
}

/// Record that a peer has advertised a TX. Returns true if this is the first INV for this TX.
pub fn record_source(&mut self, hash: [u8; 32], peer: PeerId) -> bool {
let is_first = !self.sources.contains(&hash);

let sources = self.sources.get_or_insert_mut(hash, Vec::new);
let sources = self.sources.get_or_insert_mut(hash, InvSource::new);
// Avoid duplicates
if !sources.contains(&peer) {
sources.push(peer);
if !sources.peers.contains(&peer) {
sources.peers.push(peer);
}

is_first
}

/// Get the next peer to request from (round-robin)
pub fn get_next_peer(&mut self, hash: &[u8; 32]) -> Option<PeerId> {
let sources = self.sources.get(hash)?;
if sources.is_empty() {
return None;
}

let idx = *self.next_idx.get_or_insert(*hash, || 0);
let peer = sources[idx % sources.len()];

// Advance for next call
self.next_idx.put(*hash, idx + 1);

Some(peer)
self.sources.get_mut(hash)?.next_peer()
}

/// Peek at sources without updating LRU (for read-only access)
pub fn peek_sources(&self, hash: &[u8; 32]) -> Option<&Vec<PeerId>> {
self.sources.peek(hash)
self.sources.peek(hash).map(|source| &source.peers)
}
}

Expand Down
4 changes: 3 additions & 1 deletion overlay/src/flood/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl Mempool {
}

/// Remove transactions that are too old.
pub fn evict_expired(&mut self) {
pub fn evict_expired(&mut self) -> usize {
let now = Instant::now();
let to_remove: Vec<TxHash> = self
.by_hash
Expand All @@ -210,9 +210,11 @@ impl Mempool {
.map(|tx| tx.hash)
.collect();

let count = to_remove.len();
for hash in to_remove {
self.remove(&hash);
}
count
}

/// Current number of transactions.
Expand Down
12 changes: 8 additions & 4 deletions overlay/src/flood/pending_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,22 @@ impl PendingRequests {

/// Process timeouts: returns (retry_list, give_up_list)
/// - retry_list: hashes that timed out but haven't given up
/// - give_up_list: hashes that should be abandoned
pub fn process_timeouts(&self) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
/// - give_up_list: hashes that have been abandoned
pub fn process_timeouts(&mut self) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
let mut retry = Vec::new();
let mut give_up = Vec::new();

for (hash, req) in &self.requests {
self.requests.retain(|hash, req| {
if req.should_give_up(self.total_timeout) {
give_up.push(*hash);
false
} else if req.is_timed_out(self.peer_timeout) {
retry.push(*hash);
true
} else {
true
}
}
});

(retry, give_up)
}
Expand Down
111 changes: 105 additions & 6 deletions overlay/src/flood/txset.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! TX set cache for peer request/response handling.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};

/// 32-byte hash
pub type Hash256 = [u8; 32];
Expand All @@ -20,6 +20,8 @@ pub struct CachedTxSet {
pub struct TxSetCache {
/// TX sets by hash
by_hash: HashMap<Hash256, CachedTxSet>,
/// Order of hashes for eviction
order: VecDeque<Hash256>,
/// Max cache size
max_size: usize,
}
Expand All @@ -28,19 +30,24 @@ impl TxSetCache {
pub fn new(max_size: usize) -> Self {
Self {
by_hash: HashMap::new(),
order: VecDeque::with_capacity(max_size),
max_size,
}
}

/// Insert a TX set into the cache.
pub fn insert(&mut self, tx_set: CachedTxSet) {
if self.by_hash.len() >= self.max_size {
// Evict oldest (simple strategy - just remove one)
if let Some(&hash) = self.by_hash.keys().next() {
self.by_hash.remove(&hash);
let hash = tx_set.hash;
if self.by_hash.insert(tx_set.hash, tx_set).is_none() {
if self.by_hash.len() > self.max_size {
// Evict oldest; technically, this makes the minimum cache size 1, but that's fine
// for our use case
if let Some(to_evict) = self.order.pop_front() {
self.by_hash.remove(&to_evict);
}
}
self.order.push_back(hash);
}
self.by_hash.insert(tx_set.hash, tx_set);
}

/// Get a TX set by hash.
Expand All @@ -50,6 +57,13 @@ impl TxSetCache {

/// Remove TX sets for ledgers before the given sequence.
pub fn evict_before(&mut self, ledger_seq: u32) {
self.order.retain(|hash| {
if let Some(tx_set) = self.by_hash.get(hash) {
tx_set.ledger_seq >= ledger_seq
} else {
false
}
});
self.by_hash.retain(|_, v| v.ledger_seq >= ledger_seq);
}

Expand Down Expand Up @@ -129,6 +143,14 @@ mod tests {
});

assert_eq!(cache.len(), 2, "Cache should stay at capacity");
assert!(
cache.get(&[1u8; 32]).is_none(),
"Oldest inserted item should be evicted"
);
assert!(
cache.get(&[2u8; 32]).is_some(),
"Second-oldest item should remain"
);
assert!(
cache.get(&[3u8; 32]).is_some(),
"New item should be present"
Expand Down Expand Up @@ -157,4 +179,81 @@ mod tests {
assert_eq!(retrieved.ledger_seq, 200, "Should have newer data");
assert_eq!(retrieved.xdr, vec![4, 5, 6]);
}

#[test]
fn test_cache_overwrite_when_full_keeps_size() {
let mut cache = TxSetCache::new(1); // Cache holds exactly one entry

cache.insert(CachedTxSet {
hash: [1u8; 32],
xdr: vec![1, 2, 3],
ledger_seq: 100,
});
assert_eq!(cache.len(), 1, "Cache is full");

// Overwrite the only (existing) hash while the cache is full - must
// not evict the entry or shrink the cache below capacity.
cache.insert(CachedTxSet {
hash: [1u8; 32],
xdr: vec![4, 5, 6],
ledger_seq: 102,
});

assert_eq!(cache.len(), 1, "Overwrite should not shrink the cache");
let retrieved = cache
.get(&[1u8; 32])
.expect("Overwritten item should remain present");
assert_eq!(retrieved.ledger_seq, 102, "Should have newer data");
assert_eq!(retrieved.xdr, vec![4, 5, 6]);
}

#[test]
fn test_overwrite_then_evict_before_keeps_order_in_sync() {
let mut cache = TxSetCache::new(2);

// Insert, then overwrite the same hash with a newer ledger_seq.
cache.insert(CachedTxSet {
hash: [1u8; 32],
xdr: vec![1, 2, 3],
ledger_seq: 100,
});
cache.insert(CachedTxSet {
hash: [1u8; 32],
xdr: vec![4, 5, 6],
ledger_seq: 200,
});

// The entry's effective ledger_seq is now 200, so evicting before 150
// must NOT remove it.
cache.evict_before(150);
assert!(
cache.get(&[1u8; 32]).is_some(),
"Overwritten entry (seq 200) must survive evict_before(150)"
);
assert_eq!(cache.len(), 1);

// `order` must still track the surviving entry: filling the cache and
// overflowing it should evict hash [1u8; 32] as the oldest, leaving the
// cache exactly at capacity (not one over).
cache.insert(CachedTxSet {
hash: [2u8; 32],
xdr: vec![],
ledger_seq: 300,
});
cache.insert(CachedTxSet {
hash: [3u8; 32],
xdr: vec![],
ledger_seq: 400,
});

assert_eq!(
cache.len(),
2,
"Cache must stay at capacity; eviction tracking must not have leaked"
);
assert!(
cache.get(&[1u8; 32]).is_none(),
"Oldest entry should have been evicted by the capacity path"
);
}
}
6 changes: 5 additions & 1 deletion overlay/src/integrated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ impl Overlay {
for hash in tx_hashes {
mempool.remove(&hash);
}
info!("Removed {} TXs from mempool", count);
let expired = mempool.evict_expired();
info!(
"Removed {} (requested) + {} (expired) TXs from mempool",
count, expired
);
// Signal completion if caller is waiting
if let Some(tx) = reply {
let _ = tx.send(()).await;
Expand Down
2 changes: 1 addition & 1 deletion overlay/src/libp2p_overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2060,7 +2060,7 @@ async fn inv_getdata_housekeeping_task(state: Arc<SharedState>) {

// 2. Handle GETDATA timeouts
let (to_retry, gave_up) = {
let pending = state.pending_getdata.write().await;
let mut pending = state.pending_getdata.write().await;
pending.process_timeouts()
};

Expand Down
Loading