Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions lance/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,12 @@ pub fn read_segment_zero_copy(
// Try zero-copy path first (mmap on Linux)
// Uses Arc-based mmap sharing - no data copy, only refcount increment
#[cfg(target_os = "linux")]
if reader.supports_zero_copy() {
if let Some(data) = reader.slice_bytes(offset, to_read) {
let next_offset = offset + to_read as u64;
lnc_metrics::increment_zero_copy_sends();
return Ok((data, next_offset));
}
if reader.supports_zero_copy()
&& let Some(data) = reader.slice_bytes(offset, to_read)
{
let next_offset = offset + to_read as u64;
lnc_metrics::increment_zero_copy_sends();
return Ok((data, next_offset));
}

// Fallback: read into buffer
Expand Down
37 changes: 17 additions & 20 deletions lance/src/server/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,12 +580,11 @@ where
// metadata visibility during leadership churn / follower catch-up windows.
let should_forward_to_leader =
command_handlers::is_topic_metadata_operation(command) || command == ControlCommand::Fetch;
if should_forward_to_leader {
if let Some(action) =
if should_forward_to_leader
&& let Some(action) =
try_forward_control_to_leader(ctx, command, buffer, consumed, read_offset).await
{
return action;
}
{
return action;
}

// Handle locally
Expand Down Expand Up @@ -899,22 +898,20 @@ where
{
// Topic metadata commands are leader-authoritative to avoid stale
// follower registry responses during startup/election churn.
if command_handlers::is_topic_metadata_operation(command) {
if let Some(coord) = cluster {
if !coord.is_leader_authoritative() {
if coord.is_leader() {
lnc_metrics::increment_cluster_elected_not_ready_rejects();
return send_error(
stream,
"FORWARD_FAILED: Leader not ready (apply/metadata catch-up)",
)
.await;
}

return send_not_leader_error(stream, coord.leader_addr().map(|a| a.to_string()))
.await;
}
if command_handlers::is_topic_metadata_operation(command)
&& let Some(coord) = cluster
&& !coord.is_leader_authoritative()
{
if coord.is_leader() {
lnc_metrics::increment_cluster_elected_not_ready_rejects();
return send_error(
stream,
"FORWARD_FAILED: Leader not ready (apply/metadata catch-up)",
)
.await;
}

return send_not_leader_error(stream, coord.leader_addr().map(|a| a.to_string())).await;
}

// Build context for handlers
Expand Down
99 changes: 50 additions & 49 deletions lance/src/server/ingestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,41 +174,41 @@ pub fn write_replicated_data_enriched(
return lnc_io::SegmentWriter::open(&segment_path);
}

if leader_write_offset > 0 {
if let Some(closed_path) = find_closed_segment_variant(topic_dir, segment_name) {
if !segment_path.exists() {
match std::fs::rename(&closed_path, &segment_path) {
Ok(()) => {
tracing::debug!(
target: "lance::ingestion",
segment = %segment_name,
from = %closed_path.display(),
to = %segment_path.display(),
"Renamed closed follower segment to leader canonical name"
);
return lnc_io::SegmentWriter::open(&segment_path);
},
Err(e) => {
tracing::warn!(
target: "lance::ingestion",
segment = %segment_name,
from = %closed_path.display(),
to = %segment_path.display(),
error = %e,
"Failed to canonicalize closed follower segment name, reopening variant"
);
},
}
if leader_write_offset > 0
&& let Some(closed_path) = find_closed_segment_variant(topic_dir, segment_name)
{
if !segment_path.exists() {
match std::fs::rename(&closed_path, &segment_path) {
Ok(()) => {
tracing::debug!(
target: "lance::ingestion",
segment = %segment_name,
from = %closed_path.display(),
to = %segment_path.display(),
"Renamed closed follower segment to leader canonical name"
);
return lnc_io::SegmentWriter::open(&segment_path);
},
Err(e) => {
tracing::warn!(
target: "lance::ingestion",
segment = %segment_name,
from = %closed_path.display(),
to = %segment_path.display(),
error = %e,
"Failed to canonicalize closed follower segment name, reopening variant"
);
},
}

tracing::debug!(
target: "lance::ingestion",
segment = %segment_name,
closed_segment = %closed_path.display(),
"Reopening closed follower segment for replay"
);
return lnc_io::SegmentWriter::open(&closed_path);
}

tracing::debug!(
target: "lance::ingestion",
segment = %segment_name,
closed_segment = %closed_path.display(),
"Reopening closed follower segment for replay"
);
return lnc_io::SegmentWriter::open(&closed_path);
}

lnc_io::SegmentWriter::create_named(topic_dir, segment_name)
Expand Down Expand Up @@ -258,23 +258,24 @@ pub fn write_replicated_data_enriched(
};

// Verify the segment name matches what the leader expects
if let Some(current_name) = topic_writer.writer.filename() {
if current_name != entry.segment_name && !entry.flags.new_segment() {
tracing::warn!(
target: "lance::ingestion",
topic_id,
expected = %entry.segment_name,
actual = %current_name,
"Segment name mismatch — follower may need resync"
);
if let Some(current_name) = topic_writer.writer.filename()
&& current_name != entry.segment_name
&& !entry.flags.new_segment()
{
tracing::warn!(
target: "lance::ingestion",
topic_id,
expected = %entry.segment_name,
actual = %current_name,
"Segment name mismatch — follower may need resync"
);

// Recover in-place by switching to the leader-dictated segment stream.
// Without this, followers can stay pinned to a stale writer key and repeatedly
// fail append catch-up for the same divergent segment lineage.
topic_writer.writer =
open_or_create_leader_segment(&topic_dir, &entry.segment_name, entry.write_offset)?;
topic_writer.index_builder.clear();
}
// Recover in-place by switching to the leader-dictated segment stream.
// Without this, followers can stay pinned to a stale writer key and repeatedly
// fail append catch-up for the same divergent segment lineage.
topic_writer.writer =
open_or_create_leader_segment(&topic_dir, &entry.segment_name, entry.write_offset)?;
topic_writer.index_builder.clear();
}

// Check for Raft conflict even with cached writer (leader may have rolled back),
Expand Down
87 changes: 42 additions & 45 deletions lance/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,10 @@ pub async fn run(
if let Ok(entries) = std::fs::read_dir(&local_data_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().is_some_and(|ext| ext == "lnc") {
if let Ok(meta) = std::fs::metadata(&path) {
total += meta.len();
}
if path.extension().is_some_and(|ext| ext == "lnc")
&& let Ok(meta) = std::fs::metadata(&path)
{
total += meta.len();
}
}
}
Expand Down Expand Up @@ -736,47 +736,44 @@ pub async fn run(

// Proactively request bulk resync from current leader to recover
// any missed data-plane events from channel lag.
if !resync_actor.is_active() {
if let Some(leader_id) = event_coord.leader_id().await {
if leader_id != event_config.node_id {
let throttled = last_resync_leader == Some(leader_id)
&& last_resync_attempt
.is_some_and(|t| t.elapsed() < resync_min_interval);
if throttled {
debug!(
target: "lance::resync",
skipped = n,
leader_id,
min_retry_secs = resync_min_interval.as_secs(),
"Skipping lag-triggered resync (cooldown active)"
);
continue;
}
if !resync_actor.is_active()
&& let Some(leader_id) = event_coord.leader_id().await
&& leader_id != event_config.node_id
{
let throttled = last_resync_leader == Some(leader_id)
&& last_resync_attempt
.is_some_and(|t| t.elapsed() < resync_min_interval);
if throttled {
debug!(
target: "lance::resync",
skipped = n,
leader_id,
min_retry_secs = resync_min_interval.as_secs(),
"Skipping lag-triggered resync (cooldown active)"
);
continue;
}

if let Some(leader_repl_addr) =
event_coord.peer_addr(leader_id).await
{
warn!(
target: "lance::resync",
skipped = n,
leader_id,
leader_addr = %leader_repl_addr,
"Lagged apply receiver - initiating follower bulk resync"
);
last_resync_attempt = Some(Instant::now());
last_resync_leader = Some(leader_id);
if let Err(e) =
resync_actor.initiate_resync(leader_repl_addr, 0).await
{
warn!(
target: "lance::resync",
skipped = n,
error = %e,
"Bulk resync after lagged receiver failed"
);
resync_actor.reset();
}
}
if let Some(leader_repl_addr) = event_coord.peer_addr(leader_id).await {
warn!(
target: "lance::resync",
skipped = n,
leader_id,
leader_addr = %leader_repl_addr,
"Lagged apply receiver - initiating follower bulk resync"
);
last_resync_attempt = Some(Instant::now());
last_resync_leader = Some(leader_id);
if let Err(e) =
resync_actor.initiate_resync(leader_repl_addr, 0).await
{
warn!(
target: "lance::resync",
skipped = n,
error = %e,
"Bulk resync after lagged receiver failed"
);
resync_actor.reset();
}
}
}
Expand Down Expand Up @@ -1039,7 +1036,7 @@ pub async fn run(

// Re-resolve peer DNS every 30s (60 ticks × 500ms) to handle pod IP changes
dns_refresh_counter += 1;
if dns_refresh_counter % 60 == 0 {
if dns_refresh_counter.is_multiple_of(60) {
coord_for_watcher.refresh_peer_addresses().await;
}

Expand Down
58 changes: 29 additions & 29 deletions lance/src/server/multi_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,18 +280,18 @@ fn run_ingestion_actor_sync(
let write_done_tx = request.write_done_tx.take();

// WAL-first append
if let Some(ref mut w) = wal {
if let Err(e) = w.append(&payload) {
tracing::error!(
target: "lance::ingestion",
actor_id,
topic_id,
error = %e,
"WAL append failed — dropping request"
);
drop(write_done_tx);
continue;
}
if let Some(ref mut w) = wal
&& let Err(e) = w.append(&payload)
{
tracing::error!(
target: "lance::ingestion",
actor_id,
topic_id,
error = %e,
"WAL append failed — dropping request"
);
drop(write_done_tx);
continue;
}

match process_request_sync(
Expand Down Expand Up @@ -479,29 +479,29 @@ fn flush_and_signal_sync(
}

// Sync WAL before segment fsyncs (batched, not per-append)
if let Some(w) = wal {
if let Err(e) = w.sync() {
tracing::error!(
target: "lance::ingestion",
error = %e,
"WAL batch sync failed"
);
}
if let Some(w) = wal
&& let Err(e) = w.sync()
{
tracing::error!(
target: "lance::ingestion",
error = %e,
"WAL batch sync failed"
);
}
// If WAL is enabled and synced above, segment fsync is redundant on the ACK
// hot path. Crash recovery can replay WAL + committed Raft log to rebuild
// segment bytes, so we preserve durability while avoiding duplicate flush cost.
if wal.is_none() {
for topic_id in dirty_topics.iter() {
if let Some(tw) = topic_writers.get_mut(topic_id) {
if let Err(e) = tw.writer.fsync() {
tracing::error!(
target: "lance::ingestion",
topic_id,
error = %e,
"Batch fsync failed"
);
}
if let Some(tw) = topic_writers.get_mut(topic_id)
&& let Err(e) = tw.writer.fsync()
{
tracing::error!(
target: "lance::ingestion",
topic_id,
error = %e,
"Batch fsync failed"
);
}
}
}
Expand Down
Loading
Loading