diff --git a/lance/src/consumer.rs b/lance/src/consumer.rs index b88b394..91204af 100644 --- a/lance/src/consumer.rs +++ b/lance/src/consumer.rs @@ -245,12 +245,12 @@ pub fn read_segment_zero_copy( // Try zero-copy path first (mmap on Linux) // Uses Arc-based mmap sharing - no data copy, only refcount increment #[cfg(target_os = "linux")] - if reader.supports_zero_copy() { - if let Some(data) = reader.slice_bytes(offset, to_read) { - let next_offset = offset + to_read as u64; - lnc_metrics::increment_zero_copy_sends(); - return Ok((data, next_offset)); - } + if reader.supports_zero_copy() + && let Some(data) = reader.slice_bytes(offset, to_read) + { + let next_offset = offset + to_read as u64; + lnc_metrics::increment_zero_copy_sends(); + return Ok((data, next_offset)); } // Fallback: read into buffer diff --git a/lance/src/server/connection.rs b/lance/src/server/connection.rs index 12725a4..bfe9548 100644 --- a/lance/src/server/connection.rs +++ b/lance/src/server/connection.rs @@ -580,12 +580,11 @@ where // metadata visibility during leadership churn / follower catch-up windows. let should_forward_to_leader = command_handlers::is_topic_metadata_operation(command) || command == ControlCommand::Fetch; - if should_forward_to_leader { - if let Some(action) = + if should_forward_to_leader + && let Some(action) = try_forward_control_to_leader(ctx, command, buffer, consumed, read_offset).await - { - return action; - } + { + return action; } // Handle locally @@ -899,22 +898,20 @@ where { // Topic metadata commands are leader-authoritative to avoid stale // follower registry responses during startup/election churn. - if command_handlers::is_topic_metadata_operation(command) { - if let Some(coord) = cluster { - if !coord.is_leader_authoritative() { - if coord.is_leader() { - lnc_metrics::increment_cluster_elected_not_ready_rejects(); - return send_error( - stream, - "FORWARD_FAILED: Leader not ready (apply/metadata catch-up)", - ) - .await; - } - - return send_not_leader_error(stream, coord.leader_addr().map(|a| a.to_string())) - .await; - } + if command_handlers::is_topic_metadata_operation(command) + && let Some(coord) = cluster + && !coord.is_leader_authoritative() + { + if coord.is_leader() { + lnc_metrics::increment_cluster_elected_not_ready_rejects(); + return send_error( + stream, + "FORWARD_FAILED: Leader not ready (apply/metadata catch-up)", + ) + .await; } + + return send_not_leader_error(stream, coord.leader_addr().map(|a| a.to_string())).await; } // Build context for handlers diff --git a/lance/src/server/ingestion.rs b/lance/src/server/ingestion.rs index 4934b1c..9cb3a67 100644 --- a/lance/src/server/ingestion.rs +++ b/lance/src/server/ingestion.rs @@ -174,41 +174,41 @@ pub fn write_replicated_data_enriched( return lnc_io::SegmentWriter::open(&segment_path); } - if leader_write_offset > 0 { - if let Some(closed_path) = find_closed_segment_variant(topic_dir, segment_name) { - if !segment_path.exists() { - match std::fs::rename(&closed_path, &segment_path) { - Ok(()) => { - tracing::debug!( - target: "lance::ingestion", - segment = %segment_name, - from = %closed_path.display(), - to = %segment_path.display(), - "Renamed closed follower segment to leader canonical name" - ); - return lnc_io::SegmentWriter::open(&segment_path); - }, - Err(e) => { - tracing::warn!( - target: "lance::ingestion", - segment = %segment_name, - from = %closed_path.display(), - to = %segment_path.display(), - error = %e, - "Failed to canonicalize closed follower segment name, reopening variant" - ); - }, - } + if leader_write_offset > 0 + && let Some(closed_path) = find_closed_segment_variant(topic_dir, segment_name) + { + if !segment_path.exists() { + match std::fs::rename(&closed_path, &segment_path) { + Ok(()) => { + tracing::debug!( + target: "lance::ingestion", + segment = %segment_name, + from = %closed_path.display(), + to = %segment_path.display(), + "Renamed closed follower segment to leader canonical name" + ); + return lnc_io::SegmentWriter::open(&segment_path); + }, + Err(e) => { + tracing::warn!( + target: "lance::ingestion", + segment = %segment_name, + from = %closed_path.display(), + to = %segment_path.display(), + error = %e, + "Failed to canonicalize closed follower segment name, reopening variant" + ); + }, } - - tracing::debug!( - target: "lance::ingestion", - segment = %segment_name, - closed_segment = %closed_path.display(), - "Reopening closed follower segment for replay" - ); - return lnc_io::SegmentWriter::open(&closed_path); } + + tracing::debug!( + target: "lance::ingestion", + segment = %segment_name, + closed_segment = %closed_path.display(), + "Reopening closed follower segment for replay" + ); + return lnc_io::SegmentWriter::open(&closed_path); } lnc_io::SegmentWriter::create_named(topic_dir, segment_name) @@ -258,23 +258,24 @@ pub fn write_replicated_data_enriched( }; // Verify the segment name matches what the leader expects - if let Some(current_name) = topic_writer.writer.filename() { - if current_name != entry.segment_name && !entry.flags.new_segment() { - tracing::warn!( - target: "lance::ingestion", - topic_id, - expected = %entry.segment_name, - actual = %current_name, - "Segment name mismatch — follower may need resync" - ); + if let Some(current_name) = topic_writer.writer.filename() + && current_name != entry.segment_name + && !entry.flags.new_segment() + { + tracing::warn!( + target: "lance::ingestion", + topic_id, + expected = %entry.segment_name, + actual = %current_name, + "Segment name mismatch — follower may need resync" + ); - // Recover in-place by switching to the leader-dictated segment stream. - // Without this, followers can stay pinned to a stale writer key and repeatedly - // fail append catch-up for the same divergent segment lineage. - topic_writer.writer = - open_or_create_leader_segment(&topic_dir, &entry.segment_name, entry.write_offset)?; - topic_writer.index_builder.clear(); - } + // Recover in-place by switching to the leader-dictated segment stream. + // Without this, followers can stay pinned to a stale writer key and repeatedly + // fail append catch-up for the same divergent segment lineage. + topic_writer.writer = + open_or_create_leader_segment(&topic_dir, &entry.segment_name, entry.write_offset)?; + topic_writer.index_builder.clear(); } // Check for Raft conflict even with cached writer (leader may have rolled back), diff --git a/lance/src/server/mod.rs b/lance/src/server/mod.rs index cc18dce..c48286b 100644 --- a/lance/src/server/mod.rs +++ b/lance/src/server/mod.rs @@ -673,10 +673,10 @@ pub async fn run( if let Ok(entries) = std::fs::read_dir(&local_data_dir) { for entry in entries.flatten() { let path = entry.path(); - if path.extension().is_some_and(|ext| ext == "lnc") { - if let Ok(meta) = std::fs::metadata(&path) { - total += meta.len(); - } + if path.extension().is_some_and(|ext| ext == "lnc") + && let Ok(meta) = std::fs::metadata(&path) + { + total += meta.len(); } } } @@ -736,47 +736,44 @@ pub async fn run( // Proactively request bulk resync from current leader to recover // any missed data-plane events from channel lag. - if !resync_actor.is_active() { - if let Some(leader_id) = event_coord.leader_id().await { - if leader_id != event_config.node_id { - let throttled = last_resync_leader == Some(leader_id) - && last_resync_attempt - .is_some_and(|t| t.elapsed() < resync_min_interval); - if throttled { - debug!( - target: "lance::resync", - skipped = n, - leader_id, - min_retry_secs = resync_min_interval.as_secs(), - "Skipping lag-triggered resync (cooldown active)" - ); - continue; - } + if !resync_actor.is_active() + && let Some(leader_id) = event_coord.leader_id().await + && leader_id != event_config.node_id + { + let throttled = last_resync_leader == Some(leader_id) + && last_resync_attempt + .is_some_and(|t| t.elapsed() < resync_min_interval); + if throttled { + debug!( + target: "lance::resync", + skipped = n, + leader_id, + min_retry_secs = resync_min_interval.as_secs(), + "Skipping lag-triggered resync (cooldown active)" + ); + continue; + } - if let Some(leader_repl_addr) = - event_coord.peer_addr(leader_id).await - { - warn!( - target: "lance::resync", - skipped = n, - leader_id, - leader_addr = %leader_repl_addr, - "Lagged apply receiver - initiating follower bulk resync" - ); - last_resync_attempt = Some(Instant::now()); - last_resync_leader = Some(leader_id); - if let Err(e) = - resync_actor.initiate_resync(leader_repl_addr, 0).await - { - warn!( - target: "lance::resync", - skipped = n, - error = %e, - "Bulk resync after lagged receiver failed" - ); - resync_actor.reset(); - } - } + if let Some(leader_repl_addr) = event_coord.peer_addr(leader_id).await { + warn!( + target: "lance::resync", + skipped = n, + leader_id, + leader_addr = %leader_repl_addr, + "Lagged apply receiver - initiating follower bulk resync" + ); + last_resync_attempt = Some(Instant::now()); + last_resync_leader = Some(leader_id); + if let Err(e) = + resync_actor.initiate_resync(leader_repl_addr, 0).await + { + warn!( + target: "lance::resync", + skipped = n, + error = %e, + "Bulk resync after lagged receiver failed" + ); + resync_actor.reset(); } } } @@ -1039,7 +1036,7 @@ pub async fn run( // Re-resolve peer DNS every 30s (60 ticks × 500ms) to handle pod IP changes dns_refresh_counter += 1; - if dns_refresh_counter % 60 == 0 { + if dns_refresh_counter.is_multiple_of(60) { coord_for_watcher.refresh_peer_addresses().await; } diff --git a/lance/src/server/multi_actor.rs b/lance/src/server/multi_actor.rs index 3af1631..2df6db7 100644 --- a/lance/src/server/multi_actor.rs +++ b/lance/src/server/multi_actor.rs @@ -280,18 +280,18 @@ fn run_ingestion_actor_sync( let write_done_tx = request.write_done_tx.take(); // WAL-first append - if let Some(ref mut w) = wal { - if let Err(e) = w.append(&payload) { - tracing::error!( - target: "lance::ingestion", - actor_id, - topic_id, - error = %e, - "WAL append failed — dropping request" - ); - drop(write_done_tx); - continue; - } + if let Some(ref mut w) = wal + && let Err(e) = w.append(&payload) + { + tracing::error!( + target: "lance::ingestion", + actor_id, + topic_id, + error = %e, + "WAL append failed — dropping request" + ); + drop(write_done_tx); + continue; } match process_request_sync( @@ -479,29 +479,29 @@ fn flush_and_signal_sync( } // Sync WAL before segment fsyncs (batched, not per-append) - if let Some(w) = wal { - if let Err(e) = w.sync() { - tracing::error!( - target: "lance::ingestion", - error = %e, - "WAL batch sync failed" - ); - } + if let Some(w) = wal + && let Err(e) = w.sync() + { + tracing::error!( + target: "lance::ingestion", + error = %e, + "WAL batch sync failed" + ); } // If WAL is enabled and synced above, segment fsync is redundant on the ACK // hot path. Crash recovery can replay WAL + committed Raft log to rebuild // segment bytes, so we preserve durability while avoiding duplicate flush cost. if wal.is_none() { for topic_id in dirty_topics.iter() { - if let Some(tw) = topic_writers.get_mut(topic_id) { - if let Err(e) = tw.writer.fsync() { - tracing::error!( - target: "lance::ingestion", - topic_id, - error = %e, - "Batch fsync failed" - ); - } + if let Some(tw) = topic_writers.get_mut(topic_id) + && let Err(e) = tw.writer.fsync() + { + tracing::error!( + target: "lance::ingestion", + topic_id, + error = %e, + "Batch fsync failed" + ); } } } diff --git a/lance/src/server/recovery.rs b/lance/src/server/recovery.rs index 657e1c9..5745371 100644 --- a/lance/src/server/recovery.rs +++ b/lance/src/server/recovery.rs @@ -37,25 +37,24 @@ fn cleanup_empty_segments_recursive(dir: &std::path::Path) -> Result<()> { continue; } - if path.extension().is_some_and(|ext| ext == "lnc") { - if let Ok(metadata) = std::fs::metadata(&path) { - if metadata.len() == 0 { - if let Err(e) = std::fs::remove_file(&path) { - debug!( - target: "lance::server", - path = %path.display(), - error = %e, - "Failed to remove empty segment" - ); - } else { - removed_count += 1; - debug!( - target: "lance::server", - path = %path.display(), - "Removed empty segment file" - ); - } - } + if path.extension().is_some_and(|ext| ext == "lnc") + && let Ok(metadata) = std::fs::metadata(&path) + && metadata.len() == 0 + { + if let Err(e) = std::fs::remove_file(&path) { + debug!( + target: "lance::server", + path = %path.display(), + error = %e, + "Failed to remove empty segment" + ); + } else { + removed_count += 1; + debug!( + target: "lance::server", + path = %path.display(), + "Removed empty segment file" + ); } } } diff --git a/lance/src/server/retention.rs b/lance/src/server/retention.rs index e7456ea..a0d2f5c 100644 --- a/lance/src/server/retention.rs +++ b/lance/src/server/retention.rs @@ -297,10 +297,10 @@ fn collect_segments(topic_dir: &Path) -> Result> { let entry = entry?; let path = entry.path(); - if path.extension().is_some_and(|ext| ext == "lnc") { - if let Some(info) = SegmentInfo::from_path(&path) { - segments.push(info); - } + if path.extension().is_some_and(|ext| ext == "lnc") + && let Some(info) = SegmentInfo::from_path(&path) + { + segments.push(info); } } diff --git a/lance/src/server/writer.rs b/lance/src/server/writer.rs index fb9d822..68cce95 100644 --- a/lance/src/server/writer.rs +++ b/lance/src/server/writer.rs @@ -98,14 +98,12 @@ pub fn find_next_segment_index(segments_dir: &std::path::Path) -> Result { let entry = entry?; let path = entry.path(); - if path.extension().is_some_and(|ext| ext == "lnc") { - if let Some(filename) = path.file_stem().and_then(|s| s.to_str()) { - if let Some(index_str) = filename.split('_').next() { - if let Ok(index) = index_str.parse::() { - max_index = Some(max_index.map_or(index, |cur: u64| cur.max(index))); - } - } - } + if path.extension().is_some_and(|ext| ext == "lnc") + && let Some(filename) = path.file_stem().and_then(|s| s.to_str()) + && let Some(index_str) = filename.split('_').next() + && let Ok(index) = index_str.parse::() + { + max_index = Some(max_index.map_or(index, |cur: u64| cur.max(index))); } } } diff --git a/lance/src/subscription.rs b/lance/src/subscription.rs index 00824e7..511a5fa 100644 --- a/lance/src/subscription.rs +++ b/lance/src/subscription.rs @@ -143,13 +143,13 @@ impl SubscriptionManager { } // Also update the subscription's current offset if it exists - if let Ok(mut subs) = self.subscriptions.write() { - if let Some(info) = subs.get_mut(&key) { - if offset > info.current_offset { - info.current_offset = offset; - } - info.touch(); + if let Ok(mut subs) = self.subscriptions.write() + && let Some(info) = subs.get_mut(&key) + { + if offset > info.current_offset { + info.current_offset = offset; } + info.touch(); } debug!( diff --git a/lance/src/topic.rs b/lance/src/topic.rs index 218b315..c580a4a 100644 --- a/lance/src/topic.rs +++ b/lance/src/topic.rs @@ -338,31 +338,31 @@ impl TopicRegistry { // Reconcile stale mappings after election churn / partial ops: // if this name currently points at a different ID, remove old ID row. - if let Some(existing_id_for_name) = by_name.get(name).copied() { - if existing_id_for_name != id { - by_id.remove(&existing_id_for_name); - warn!( - target: "lance::topic", - topic_name = %name, - old_topic_id = existing_id_for_name, - new_topic_id = id, - "Replacing stale replicated topic mapping by name" - ); - } + if let Some(existing_id_for_name) = by_name.get(name).copied() + && existing_id_for_name != id + { + by_id.remove(&existing_id_for_name); + warn!( + target: "lance::topic", + topic_name = %name, + old_topic_id = existing_id_for_name, + new_topic_id = id, + "Replacing stale replicated topic mapping by name" + ); } // If this ID was previously associated to another name, remove that name mapping. - if let Some(existing_meta_for_id) = by_id.get(&id) { - if existing_meta_for_id.name != name { - by_name.remove(&existing_meta_for_id.name); - warn!( - target: "lance::topic", - topic_id = id, - old_topic_name = %existing_meta_for_id.name, - new_topic_name = %name, - "Replacing stale replicated topic mapping by id" - ); - } + if let Some(existing_meta_for_id) = by_id.get(&id) + && existing_meta_for_id.name != name + { + by_name.remove(&existing_meta_for_id.name); + warn!( + target: "lance::topic", + topic_id = id, + old_topic_name = %existing_meta_for_id.name, + new_topic_name = %name, + "Replacing stale replicated topic mapping by id" + ); } by_name.insert(name.to_string(), id); @@ -413,13 +413,13 @@ impl TopicRegistry { .get_topic_by_id(id) .ok_or(TopicIdentityError::UnknownTopic)?; - if let Some(expected) = expected_epoch { - if metadata.topic_epoch != expected { - return Err(TopicIdentityError::StaleEpoch { - expected, - actual: metadata.topic_epoch, - }); - } + if let Some(expected) = expected_epoch + && metadata.topic_epoch != expected + { + return Err(TopicIdentityError::StaleEpoch { + expected, + actual: metadata.topic_epoch, + }); } Ok(metadata) @@ -512,11 +512,11 @@ impl TopicRegistry { if !metadata_path.exists() { continue; } - if let Ok(meta) = TopicMetadata::load(&metadata_path) { - if meta.id == id { - found_metadata = Some(meta); - break; - } + if let Ok(meta) = TopicMetadata::load(&metadata_path) + && meta.id == id + { + found_metadata = Some(meta); + break; } } @@ -547,15 +547,15 @@ impl TopicRegistry { }, }; - if let Some(existing_id_for_name) = by_name.get(&metadata.name).copied() { - if existing_id_for_name != metadata.id { - by_id.remove(&existing_id_for_name); - } + if let Some(existing_id_for_name) = by_name.get(&metadata.name).copied() + && existing_id_for_name != metadata.id + { + by_id.remove(&existing_id_for_name); } - if let Some(existing_meta_for_id) = by_id.get(&metadata.id) { - if existing_meta_for_id.name != metadata.name { - by_name.remove(&existing_meta_for_id.name); - } + if let Some(existing_meta_for_id) = by_id.get(&metadata.id) + && existing_meta_for_id.name != metadata.name + { + by_name.remove(&existing_meta_for_id.name); } by_name.insert(metadata.name.clone(), metadata.id); @@ -602,10 +602,10 @@ impl TopicRegistry { /// numeric ID for backwards compatibility with legacy data. pub fn get_topic_dir(&self, topic_id: u32) -> PathBuf { // Resolve topic name from the in-memory registry - if let Ok(by_id) = self.topics_by_id.read() { - if let Some(meta) = by_id.get(&topic_id) { - return self.data_dir.join("segments").join(&meta.name); - } + if let Ok(by_id) = self.topics_by_id.read() + && let Some(meta) = by_id.get(&topic_id) + { + return self.data_dir.join("segments").join(&meta.name); } // Fallback: topic not yet registered (shouldn't happen in normal // operation but keeps callers from panicking during startup races). @@ -954,17 +954,16 @@ impl TopicRegistry { let index_path = topic_dir.join("sparse.idx"); // Touch the index file to update modification time - if index_path.exists() { - if let Ok(file) = std::fs::OpenOptions::new().write(true).open(&index_path) { - if file.sync_all().is_ok() { - flushed_count += 1; - debug!( - target: "lance::topic", - topic_id = topic_id, - "Flushed sparse index" - ); - } - } + if index_path.exists() + && let Ok(file) = std::fs::OpenOptions::new().write(true).open(&index_path) + && file.sync_all().is_ok() + { + flushed_count += 1; + debug!( + target: "lance::topic", + topic_id = topic_id, + "Flushed sparse index" + ); } } diff --git a/lnc-bench/src/main.rs b/lnc-bench/src/main.rs index f89d9c9..ed6bdf2 100644 --- a/lnc-bench/src/main.rs +++ b/lnc-bench/src/main.rs @@ -554,7 +554,7 @@ async fn producer_task( Err(e) => { let prev = m.total_errors.fetch_add(1, Ordering::Relaxed); // Log first error and every 10000th to avoid spam - if prev == 0 || prev % 10000 == 0 { + if prev == 0 || prev.is_multiple_of(10000) { warn!("send error (count={}): {}", prev + 1, e); } if matches!(e, lnc_client::ClientError::ServerBackpressure) { @@ -564,7 +564,7 @@ async fn producer_task( }, Err(_) => { let prev = m.total_errors.fetch_add(1, Ordering::Relaxed); - if prev == 0 || prev % 10000 == 0 { + if prev == 0 || prev.is_multiple_of(10000) { warn!( "send timeout after {}s (count={})", PRODUCE_TIMEOUT_SECS, diff --git a/lnc-chaos/src/main.rs b/lnc-chaos/src/main.rs index 74ed8b4..627017c 100644 --- a/lnc-chaos/src/main.rs +++ b/lnc-chaos/src/main.rs @@ -527,7 +527,7 @@ async fn consumer_task( Ok(Some(result)) if !result.end_of_stream => { seek_fetches += 1; consecutive_idle = 0; - if seek_fetches % 100 == 0 { + if seek_fetches.is_multiple_of(100) { info!( offset = consumer.current_offset(), seek_fetches, "[{label}] Still seeking to end of existing data..." @@ -1062,10 +1062,10 @@ async fn reset_infrastructure( ]) .output() .await; - if let Ok(o) = out { - if o.status.success() { - info!("║ Deleted service/{svc}"); - } + if let Ok(o) = out + && o.status.success() + { + info!("║ Deleted service/{svc}"); } } } diff --git a/lnc-client/src/error.rs b/lnc-client/src/error.rs index 18c4d34..a7efc65 100644 --- a/lnc-client/src/error.rs +++ b/lnc-client/src/error.rs @@ -91,10 +91,10 @@ pub fn parse_not_leader_error(msg: &str) -> Option> { } // Parse "NOT_LEADER: redirect to X.X.X.X:PORT" - if let Some(addr_str) = msg.strip_prefix("NOT_LEADER: redirect to ") { - if let Ok(addr) = addr_str.trim().parse::() { - return Some(Some(addr)); - } + if let Some(addr_str) = msg.strip_prefix("NOT_LEADER: redirect to ") + && let Ok(addr) = addr_str.trim().parse::() + { + return Some(Some(addr)); } Some(None) diff --git a/lnc-client/src/grouped.rs b/lnc-client/src/grouped.rs index d53cc30..e22779d 100644 --- a/lnc-client/src/grouped.rs +++ b/lnc-client/src/grouped.rs @@ -681,11 +681,12 @@ impl GroupCoordinator { for (worker, old_topics) in existing { if assignments.contains_key(worker) { for topic in old_topics { - if topic_set.contains(topic) && !assigned.contains(topic) { - if let Some(worker_topics) = assignments.get_mut(worker) { - worker_topics.push(*topic); - assigned.insert(*topic); - } + if topic_set.contains(topic) + && !assigned.contains(topic) + && let Some(worker_topics) = assignments.get_mut(worker) + { + worker_topics.push(*topic); + assigned.insert(*topic); } } } @@ -705,10 +706,10 @@ impl GroupCoordinator { .min_by_key(|(_, topics)| topics.len()) .map(|(w, _)| w.clone()); - if let Some(worker) = min_worker { - if let Some(worker_topics) = assignments.get_mut(&worker) { - worker_topics.push(topic); - } + if let Some(worker) = min_worker + && let Some(worker_topics) = assignments.get_mut(&worker) + { + worker_topics.push(topic); } } diff --git a/lnc-client/src/offset.rs b/lnc-client/src/offset.rs index 3f784b8..1eea218 100644 --- a/lnc-client/src/offset.rs +++ b/lnc-client/src/offset.rs @@ -189,14 +189,14 @@ impl LockFileOffsetStore { for entry in entries.flatten() { let path = entry.path(); - if let Some(name) = path.file_name().and_then(|n| n.to_str()) { - if name.ends_with(".offset") { - // Parse topic-{id}-consumer-{id}.offset - if let Some((topic_id, consumer_id)) = Self::parse_offset_filename(name) { - if let Ok(offset) = Self::read_offset_file(&path) { - cache.insert((topic_id, consumer_id), offset); - } - } + if let Some(name) = path.file_name().and_then(|n| n.to_str()) + && name.ends_with(".offset") + { + // Parse topic-{id}-consumer-{id}.offset + if let Some((topic_id, consumer_id)) = Self::parse_offset_filename(name) + && let Ok(offset) = Self::read_offset_file(&path) + { + cache.insert((topic_id, consumer_id), offset); } } } @@ -457,10 +457,10 @@ impl OffsetStore for HookedOffsetStore { let offset = self.inner.load(topic_id, consumer_id)?; // Track for previous_offset in commits - if let Some(off) = offset { - if let Ok(mut prev) = self.previous_offsets.write() { - prev.insert((topic_id, consumer_id), off); - } + if let Some(off) = offset + && let Ok(mut prev) = self.previous_offsets.write() + { + prev.insert((topic_id, consumer_id), off); } Ok(offset) diff --git a/lnc-client/src/producer.rs b/lnc-client/src/producer.rs index 48ae51b..196431b 100644 --- a/lnc-client/src/producer.rs +++ b/lnc-client/src/producer.rs @@ -603,10 +603,10 @@ impl Producer { batches.remove(&topic_id) }; - if let Some(batch) = batch { - if !batch.is_empty() { - self.send_batch(batch).await?; - } + if let Some(batch) = batch + && !batch.is_empty() + { + self.send_batch(batch).await?; } Ok(()) @@ -738,10 +738,10 @@ impl Producer { batches_write.remove(&topic_id) }; - if let Some(batch) = batch { - if !batch.is_empty() { - Self::send_batch_static(client, metrics, batch).await?; - } + if let Some(batch) = batch + && !batch.is_empty() + { + Self::send_batch_static(client, metrics, batch).await?; } } diff --git a/lnc-client/src/standalone.rs b/lnc-client/src/standalone.rs index 90cdeec..b91c0d1 100644 --- a/lnc-client/src/standalone.rs +++ b/lnc-client/src/standalone.rs @@ -304,12 +304,12 @@ impl StandaloneConsumer { let mut client = LanceClient::connect(client_config).await?; // Resolve topic name to ID when the caller used the name-based constructor. - if config.topic_id == 0 { - if let Some(ref name) = config.topic_name.clone() { - validate_topic_name(name)?; - let topic_info = client.create_topic(name).await?; - config.topic_id = topic_info.id; - } + if config.topic_id == 0 + && let Some(ref name) = config.topic_name.clone() + { + validate_topic_name(name)?; + let topic_info = client.create_topic(name).await?; + config.topic_id = topic_info.id; } Self::from_client(client, config).await @@ -505,14 +505,14 @@ impl StandaloneConsumer { /// Check and perform auto-commit if interval has elapsed async fn maybe_auto_commit(&mut self) -> Result<()> { - if let Some(interval) = self.config.auto_commit_interval { - if self.last_commit_time.elapsed() >= interval { - if self.pending_offset > self.committed_offset { - self.commit().await?; - } else { - // Update time even if no commit needed - self.last_commit_time = Instant::now(); - } + if let Some(interval) = self.config.auto_commit_interval + && self.last_commit_time.elapsed() >= interval + { + if self.pending_offset > self.committed_offset { + self.commit().await?; + } else { + // Update time even if no commit needed + self.last_commit_time = Instant::now(); } } Ok(()) diff --git a/lnc-core/src/backpressure.rs b/lnc-core/src/backpressure.rs index 25d5195..dc234f3 100644 --- a/lnc-core/src/backpressure.rs +++ b/lnc-core/src/backpressure.rs @@ -119,10 +119,10 @@ impl BackpressureMonitor { for line in content.lines() { if line.starts_with("MemTotal:") { let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.len() >= 2 { - if let Ok(kb) = parts[1].parse::() { - return kb * 1024; - } + if parts.len() >= 2 + && let Ok(kb) = parts[1].parse::() + { + return kb * 1024; } } } diff --git a/lnc-core/src/buffer.rs b/lnc-core/src/buffer.rs index 9faf7e2..43048e0 100644 --- a/lnc-core/src/buffer.rs +++ b/lnc-core/src/buffer.rs @@ -359,7 +359,7 @@ mod tests { #[test] fn test_aligned_buffer_creation() { let buffer = AlignedBuffer::new(8192).unwrap(); - assert!(buffer.as_ptr() as usize % PAGE_SIZE == 0); + assert!((buffer.as_ptr() as usize).is_multiple_of(PAGE_SIZE)); assert!(buffer.capacity() >= 8192); } diff --git a/lnc-index/src/sparse.rs b/lnc-index/src/sparse.rs index 3fa0983..24df070 100644 --- a/lnc-index/src/sparse.rs +++ b/lnc-index/src/sparse.rs @@ -123,7 +123,7 @@ impl SparseIndexWriter { } pub fn maybe_add_entry(&mut self, sort_key: SortKey, byte_offset: u64) -> bool { - if self.record_count % self.interval == 0 { + if self.record_count.is_multiple_of(self.interval) { self.entries.push(IndexEntry::new(sort_key, byte_offset)); self.record_count += 1; true diff --git a/lnc-io/src/segment.rs b/lnc-io/src/segment.rs index 9b7d1cd..d95f7e3 100644 --- a/lnc-io/src/segment.rs +++ b/lnc-io/src/segment.rs @@ -1109,19 +1109,19 @@ pub fn close_unclosed_segments(segments_dir: &Path) -> Result> { continue; } - if path.extension().is_some_and(|ext| ext == "lnc") { - if let Some(filename) = path.file_stem().and_then(|s| s.to_str()) { - // Active segments don't have '-' in the name - if !filename.contains('-') { - // Use current timestamp as end timestamp for unclosed segments - let end_timestamp = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .unwrap_or(0); - - let new_path = rename_to_closed_segment(&path, end_timestamp)?; - closed.push(new_path); - } + if path.extension().is_some_and(|ext| ext == "lnc") + && let Some(filename) = path.file_stem().and_then(|s| s.to_str()) + { + // Active segments don't have '-' in the name + if !filename.contains('-') { + // Use current timestamp as end timestamp for unclosed segments + let end_timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + + let new_path = rename_to_closed_segment(&path, end_timestamp)?; + closed.push(new_path); } } } diff --git a/lnc-io/src/tier.rs b/lnc-io/src/tier.rs index 6e11609..b5572c1 100644 --- a/lnc-io/src/tier.rs +++ b/lnc-io/src/tier.rs @@ -160,22 +160,22 @@ impl TieredStorageManager { // Ensure tier directories exist fs::create_dir_all(&config.hot.path)?; - if let Some(ref warm) = config.warm { - if warm.enabled { - fs::create_dir_all(&warm.path)?; - } + if let Some(ref warm) = config.warm + && warm.enabled + { + fs::create_dir_all(&warm.path)?; } - if let Some(ref cold) = config.cold { - if cold.enabled { - fs::create_dir_all(&cold.path)?; - } + if let Some(ref cold) = config.cold + && cold.enabled + { + fs::create_dir_all(&cold.path)?; } - if let Some(ref archive) = config.archive { - if archive.enabled { - fs::create_dir_all(&archive.path)?; - } + if let Some(ref archive) = config.archive + && archive.enabled + { + fs::create_dir_all(&archive.path)?; } Ok(Self { config }) @@ -278,10 +278,10 @@ impl TieredStorageManager { }; // Check age threshold - if let Some(max_age) = tier_config.max_age { - if segment.age > max_age { - return true; - } + if let Some(max_age) = tier_config.max_age + && segment.age > max_age + { + return true; } false diff --git a/lnc-io/src/uring.rs b/lnc-io/src/uring.rs index deec163..05ea410 100644 --- a/lnc-io/src/uring.rs +++ b/lnc-io/src/uring.rs @@ -2002,15 +2002,15 @@ impl AsyncIoPoller { cpu_affinity: Option, ) -> std::thread::JoinHandle<()> { std::thread::spawn(move || { - if let Some(cpu) = cpu_affinity { - if let Err(e) = lnc_core::pin_thread_to_cpu(cpu) { - tracing::warn!( - target: "lance::io", - cpu, - error = %e, - "Failed to pin poller thread to CPU" - ); - } + if let Some(cpu) = cpu_affinity + && let Err(e) = lnc_core::pin_thread_to_cpu(cpu) + { + tracing::warn!( + target: "lance::io", + cpu, + error = %e, + "Failed to pin poller thread to CPU" + ); } self.run(poll_interval_us); diff --git a/lnc-io/src/wal.rs b/lnc-io/src/wal.rs index 43bbd35..52b9cbd 100644 --- a/lnc-io/src/wal.rs +++ b/lnc-io/src/wal.rs @@ -79,12 +79,11 @@ impl Wal { if let Ok(entries) = std::fs::read_dir(&self.config.dir) { for entry in entries.flatten() { - if let Some(name) = entry.file_name().to_str() { - if let Some(id_str) = name.strip_suffix(".wal") { - if let Ok(id) = id_str.parse::() { - max_segment_id = max_segment_id.max(id); - } - } + if let Some(name) = entry.file_name().to_str() + && let Some(id_str) = name.strip_suffix(".wal") + && let Ok(id) = id_str.parse::() + { + max_segment_id = max_segment_id.max(id); } } } diff --git a/lnc-replication/src/actor.rs b/lnc-replication/src/actor.rs index e012393..6033323 100644 --- a/lnc-replication/src/actor.rs +++ b/lnc-replication/src/actor.rs @@ -162,12 +162,11 @@ impl ReplicationActor { } pub fn update_follower_health(&mut self, node_id: u16, latency: Duration) { - if let Some(follower) = self.followers.get_mut(&node_id) { - if let Some(new_status) = follower.record_latency(latency) { - if new_status == FollowerStatus::Evicted || new_status == FollowerStatus::Healthy { - self.recalculate_quorum(); - } - } + if let Some(follower) = self.followers.get_mut(&node_id) + && let Some(new_status) = follower.record_latency(latency) + && (new_status == FollowerStatus::Evicted || new_status == FollowerStatus::Healthy) + { + self.recalculate_quorum(); } } diff --git a/lnc-replication/src/cluster.rs b/lnc-replication/src/cluster.rs index 51c88f0..1d0145b 100644 --- a/lnc-replication/src/cluster.rs +++ b/lnc-replication/src/cluster.rs @@ -356,16 +356,16 @@ impl ClusterCoordinator { // CONTROL PLANE DECOUPLING: Use try_write to avoid blocking control plane // heartbeats/elections. If contended, skip index update - heartbeat will fix it. - if response.term <= leader_term { - if let Ok(mut raft_guard) = raft.try_write() { - if response.success { - let _ = raft_guard.update_match_index(peer_id, response.match_index); - } else { - let _ = raft_guard.backoff_next_index(peer_id, response.match_index); - } + if response.term <= leader_term + && let Ok(mut raft_guard) = raft.try_write() + { + if response.success { + let _ = raft_guard.update_match_index(peer_id, response.match_index); + } else { + let _ = raft_guard.backoff_next_index(peer_id, response.match_index); } - // If try_write fails, heartbeat fanout will eventually update indices } + // If try_write fails, heartbeat fanout will eventually update indices Ok(response) } @@ -594,14 +594,14 @@ impl ClusterCoordinator { } }; - if let Some(handle) = finished { - if let Err(e) = handle.await { - warn!( - target: "lance::cluster", - error = %e, - "Election round task terminated unexpectedly" - ); - } + if let Some(handle) = finished + && let Err(e) = handle.await + { + warn!( + target: "lance::cluster", + error = %e, + "Election round task terminated unexpectedly" + ); } } @@ -903,11 +903,11 @@ impl ClusterCoordinator { } // Re-resolve DNS - if let Ok(mut addrs) = tokio::net::lookup_host(&host_port).await { - if let Some(addr) = addrs.next() { - // add_peer now upserts — updates address if changed - self.peers.add_peer(node_id, addr).await; - } + if let Ok(mut addrs) = tokio::net::lookup_host(&host_port).await + && let Some(addr) = addrs.next() + { + // add_peer now upserts — updates address if changed + self.peers.add_peer(node_id, addr).await; } } } @@ -981,25 +981,24 @@ impl ClusterCoordinator { if *peer_id == my_node_id { continue; } - if data_plane_ids.contains(peer_id) { - if let Some(control_addr) = self.peers.get_peer_addr(*peer_id).await { - let expected_dp_addr = SocketAddr::new(control_addr.ip(), 1995); - if let Some(current_dp_addr) = - self.data_plane_manager.get_follower_addr(*peer_id).await - { - if current_dp_addr != expected_dp_addr { - info!( - target: "lance::cluster", - node_id = peer_id, - old_addr = %current_dp_addr, - new_addr = %expected_dp_addr, - "Updating stale data plane address for follower" - ); - self.data_plane_manager - .update_follower_address(*peer_id, expected_dp_addr) - .await; - } - } + if data_plane_ids.contains(peer_id) + && let Some(control_addr) = self.peers.get_peer_addr(*peer_id).await + { + let expected_dp_addr = SocketAddr::new(control_addr.ip(), 1995); + if let Some(current_dp_addr) = + self.data_plane_manager.get_follower_addr(*peer_id).await + && current_dp_addr != expected_dp_addr + { + info!( + target: "lance::cluster", + node_id = peer_id, + old_addr = %current_dp_addr, + new_addr = %expected_dp_addr, + "Updating stale data plane address for follower" + ); + self.data_plane_manager + .update_follower_address(*peer_id, expected_dp_addr) + .await; } } } @@ -1634,17 +1633,17 @@ impl ClusterCoordinator { }; // Emit to broadcast channel if we have an event - if let Some(evt) = event { - if self.event_tx.send(evt).is_err() { - // No active receivers — stop applying to avoid unbounded work. - // The Applier will retry on the next commit_notify signal. - trace!( - target: "lance::apply", - index = entry.index, - "No active event subscribers, pausing apply loop" - ); - break; - } + if let Some(evt) = event + && self.event_tx.send(evt).is_err() + { + // No active receivers — stop applying to avoid unbounded work. + // The Applier will retry on the next commit_notify signal. + trace!( + target: "lance::apply", + index = entry.index, + "No active event subscribers, pausing apply loop" + ); + break; } // Advance last_applied entry-by-entry for crash safety. @@ -2260,15 +2259,15 @@ impl ClusterCoordinator { Ok(response) => { // CONTROL PLANE DECOUPLING: Use try_write to avoid blocking // other heartbeat tasks. If contended, skip - next tick will fix. - if response.term <= leader_term { - if let Ok(mut raft_guard) = raft.try_write() { - if response.success { - let _ = raft_guard - .update_match_index(peer_id, response.match_index); - } else { - let _ = raft_guard - .backoff_next_index(peer_id, response.match_index); - } + if response.term <= leader_term + && let Ok(mut raft_guard) = raft.try_write() + { + if response.success { + let _ = raft_guard + .update_match_index(peer_id, response.match_index); + } else { + let _ = raft_guard + .backoff_next_index(peer_id, response.match_index); } } Ok(response) @@ -2772,34 +2771,34 @@ async fn handle_peer_connection( // IMPORTANT: We only persist when Raft accepted the append. Persisting // rejected requests (stale term / mismatched prev_log) can incorrectly // truncate follower logs and violate committed-entry safety. - if resp.success && !req.entries.is_empty() { - if let Err(e) = persist_append_entries_blocking( + if resp.success + && !req.entries.is_empty() + && let Err(e) = persist_append_entries_blocking( log_store.clone(), req.prev_log_index, req.prev_log_term, req.entries.clone(), ) .await - { - warn!( - target: "lance::cluster", - error = %e, - "Failed to persist accepted append entries" - ); - // Do not acknowledge success if durability failed. - resp.success = false; - // On conflict/mismatch we must report the last index *before* the - // rejected prev_log so leaders can backtrack next_index. Returning - // old_last_log_index here can pin retries to the same mismatched - // prev_log_index and exhaust catch-up budget. - resp.match_index = req.prev_log_index.saturating_sub(1); - let mut raft_guard = raft.write().await; - raft_guard.rollback_failed_append( - old_last_log_index, - old_last_log_term, - old_commit, - ); - } + { + warn!( + target: "lance::cluster", + error = %e, + "Failed to persist accepted append entries" + ); + // Do not acknowledge success if durability failed. + resp.success = false; + // On conflict/mismatch we must report the last index *before* the + // rejected prev_log so leaders can backtrack next_index. Returning + // old_last_log_index here can pin retries to the same mismatched + // prev_log_index and exhaust catch-up budget. + resp.match_index = req.prev_log_index.saturating_sub(1); + let mut raft_guard = raft.write().await; + raft_guard.rollback_failed_append( + old_last_log_index, + old_last_log_term, + old_commit, + ); } // If commit_index advanced, wake the Apply Loop immediately diff --git a/lnc-replication/src/discovery.rs b/lnc-replication/src/discovery.rs index c1589a0..8f6285d 100644 --- a/lnc-replication/src/discovery.rs +++ b/lnc-replication/src/discovery.rs @@ -432,16 +432,16 @@ pub fn resolve_node_id() -> Option { } // Priority 2: Parse from hostname (StatefulSet pattern: lance-0, lance-1) - if let Ok(hostname) = std::env::var("HOSTNAME") { - if let Some(id) = parse_node_id_from_hostname(&hostname) { - tracing::info!( - target: "lance::discovery", - node_id = id, - hostname = %hostname, - "Node ID resolved from HOSTNAME env-var" - ); - return Some(id); - } + if let Ok(hostname) = std::env::var("HOSTNAME") + && let Some(id) = parse_node_id_from_hostname(&hostname) + { + tracing::info!( + target: "lance::discovery", + node_id = id, + hostname = %hostname, + "Node ID resolved from HOSTNAME env-var" + ); + return Some(id); } // Priority 3: Cannot determine @@ -465,17 +465,16 @@ pub fn validate_node_id_consistency(configured_node_id: u16) -> Result<(), Strin } // Check hostname-derived ID - if let Ok(hostname) = std::env::var("HOSTNAME") { - if let Some(hostname_id) = parse_node_id_from_hostname(&hostname) { - if hostname_id != configured_node_id { - return Err(format!( - "Node ID mismatch: configured node_id={} but hostname '{}' implies node_id={}. \ + if let Ok(hostname) = std::env::var("HOSTNAME") + && let Some(hostname_id) = parse_node_id_from_hostname(&hostname) + && hostname_id != configured_node_id + { + return Err(format!( + "Node ID mismatch: configured node_id={} but hostname '{}' implies node_id={}. \ This will corrupt Raft persistent state. Either fix the StatefulSet ordinal \ or set LANCE_NODE_ID={} to override.", - configured_node_id, hostname, hostname_id, configured_node_id - )); - } - } + configured_node_id, hostname, hostname_id, configured_node_id + )); } Ok(()) diff --git a/lnc-replication/src/peer.rs b/lnc-replication/src/peer.rs index 367da63..e0e86cc 100644 --- a/lnc-replication/src/peer.rs +++ b/lnc-replication/src/peer.rs @@ -669,15 +669,16 @@ impl PeerManager { for (peer_id, conn) in peers { let mut conn = conn.lock().await; - if !conn.is_connected() && conn.should_reconnect() { - if let Err(e) = conn.connect().await { - warn!( - target: "lance::replication", - peer_id, - error = %e, - "Failed to connect to peer" - ); - } + if !conn.is_connected() + && conn.should_reconnect() + && let Err(e) = conn.connect().await + { + warn!( + target: "lance::replication", + peer_id, + error = %e, + "Failed to connect to peer" + ); } } } diff --git a/lnc-replication/src/quorum.rs b/lnc-replication/src/quorum.rs index cccd348..beced3f 100644 --- a/lnc-replication/src/quorum.rs +++ b/lnc-replication/src/quorum.rs @@ -368,18 +368,18 @@ impl AsyncQuorumManager { pub async fn record_nack(&self, write_id: u64, node_id: u16) { let mut shard = self.pending.shard_for(write_id).write().await; - if let Some(write) = shard.get_mut(&write_id) { - if let Some(result) = write.tracker.record_nack() { - tracing::warn!( - target: "lance::replication", - write_id, - node_id, - "Quorum failed - too many NACKs" - ); - - if let Some(write) = shard.remove(&write_id) { - let _ = write.result_tx.send(result); - } + if let Some(write) = shard.get_mut(&write_id) + && let Some(result) = write.tracker.record_nack() + { + tracing::warn!( + target: "lance::replication", + write_id, + node_id, + "Quorum failed - too many NACKs" + ); + + if let Some(write) = shard.remove(&write_id) { + let _ = write.result_tx.send(result); } } } diff --git a/lnc-replication/src/resync.rs b/lnc-replication/src/resync.rs index f0ed5c1..f8968d4 100644 --- a/lnc-replication/src/resync.rs +++ b/lnc-replication/src/resync.rs @@ -1029,18 +1029,16 @@ impl ResyncActor { } // Bandwidth throttling - if self.config.max_bandwidth_bytes_per_sec > 0 { - if let Some(ref progress) = self.progress { - if progress.transfer_rate_bps > self.config.max_bandwidth_bytes_per_sec as f64 { - let overshoot = progress.transfer_rate_bps - - self.config.max_bandwidth_bytes_per_sec as f64; - let throttle_ms = (overshoot - / self.config.max_bandwidth_bytes_per_sec as f64 - * 100.0) as u64; - if throttle_ms > 0 { - tokio::time::sleep(Duration::from_millis(throttle_ms.min(1000))).await; - } - } + if self.config.max_bandwidth_bytes_per_sec > 0 + && let Some(ref progress) = self.progress + && progress.transfer_rate_bps > self.config.max_bandwidth_bytes_per_sec as f64 + { + let overshoot = + progress.transfer_rate_bps - self.config.max_bandwidth_bytes_per_sec as f64; + let throttle_ms = + (overshoot / self.config.max_bandwidth_bytes_per_sec as f64 * 100.0) as u64; + if throttle_ms > 0 { + tokio::time::sleep(Duration::from_millis(throttle_ms.min(1000))).await; } } } diff --git a/lnc-replication/src/segment.rs b/lnc-replication/src/segment.rs index 8a94631..000e387 100644 --- a/lnc-replication/src/segment.rs +++ b/lnc-replication/src/segment.rs @@ -721,13 +721,12 @@ impl SegmentManager { /// Find segment containing the given index pub fn find_segment(&self, index: u64) -> Option<(usize, usize)> { for (seg_idx, segment) in self.segments.iter().enumerate() { - if index >= segment.start_index() { - if let Some(end_idx) = segment.end_index() { - if index <= end_idx { - let local_offset = (index - segment.start_index()) as usize; - return Some((seg_idx, local_offset)); - } - } + if index >= segment.start_index() + && let Some(end_idx) = segment.end_index() + && index <= end_idx + { + let local_offset = (index - segment.start_index()) as usize; + return Some((seg_idx, local_offset)); } } None @@ -787,10 +786,10 @@ impl SegmentManager { // Find segments that are entirely before to_index for (idx, segment) in self.segments.iter().enumerate() { - if let Some(end_idx) = segment.end_index() { - if end_idx < to_index { - segments_to_remove.push(idx); - } + if let Some(end_idx) = segment.end_index() + && end_idx < to_index + { + segments_to_remove.push(idx); } } @@ -886,10 +885,10 @@ impl SegmentManager { // Update active segment index if we removed it if Some(idx) == self.active_segment_idx { self.active_segment_idx = None; - } else if let Some(active_idx) = self.active_segment_idx { - if idx < active_idx { - self.active_segment_idx = Some(active_idx - 1); - } + } else if let Some(active_idx) = self.active_segment_idx + && idx < active_idx + { + self.active_segment_idx = Some(active_idx - 1); } } }