Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions llm-access-store/src/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,26 @@ struct TieredDuckDbUsageState {
active_has_rows: bool,
active_writer: Option<PersistentUsageWriter>,
detail_store: Option<Arc<UsageEventDetailStore>>,
/// Serializes the append write path against retention's active-segment
/// rollover/discard. A retention cycle must not delete or roll the active
/// segment while an append holds its writer across the insert `.await` —
/// otherwise the in-flight writer is orphaned onto a deleted/rolled segment
/// and its rows (and all subsequent appends via the stale writer) are lost.
write_gate: Arc<tokio::sync::Mutex<()>>,
/// Test-only deterministic interleaving hook: when set, an append parks
/// (still holding `write_gate`) right after acquiring the gate, letting a
/// test observe whether retention is serialized behind it.
#[cfg(test)]
append_seam: Option<AppendSeam>,
}

/// Test-only one-shot handshake used to park an in-flight append at a known
/// point while it holds the tiered `write_gate`.
#[cfg(test)]
#[derive(Debug)]
struct AppendSeam {
reached: tokio::sync::oneshot::Sender<()>,
proceed: tokio::sync::oneshot::Receiver<()>,
}

#[cfg(feature = "duckdb-runtime")]
Expand Down
25 changes: 25 additions & 0 deletions llm-access-store/src/duckdb/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,31 @@ pub async fn append_usage_events_to_tiered(
catalog_backend: &Arc<TieredUsageCatalogBackend>,
rows: &[UsageEventRow],
) -> anyhow::Result<()> {
// Serialize against retention's active-segment rollover/discard (see
// `TieredDuckDbUsageState::write_gate`): hold the gate for the whole append
// so a concurrent retention cycle cannot delete/roll the active segment
// while this append holds its writer across the insert `.await` — which
// would orphan the writer onto a deleted segment and lose its rows.
let write_gate = {
let state = state
.lock()
.map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?;
Arc::clone(&state.write_gate)
};
let _write_guard = write_gate.lock_owned().await;
#[cfg(test)]
{
let seam = {
let mut state = state
.lock()
.map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?;
state.append_seam.take()
};
if let Some(seam) = seam {
let _ = seam.reached.send(());
let _ = seam.proceed.await;
}
}
let connection_config_snapshot = connection_config_snapshot(connection_config);
let mut writer = {
let mut state = state
Expand Down
3 changes: 3 additions & 0 deletions llm-access-store/src/duckdb/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ impl DuckDbUsageRepository {
active_has_rows,
active_writer: None,
detail_store,
write_gate: Arc::new(tokio::sync::Mutex::new(())),
#[cfg(test)]
append_seam: None,
})),
connection_config,
catalog_backend,
Expand Down
27 changes: 21 additions & 6 deletions llm-access-store/src/duckdb/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,27 @@ pub async fn prune_tiered_usage_analytics(
retention_days: u64,
) -> anyhow::Result<UsageAnalyticsPruneReport> {
let cutoff_ms = usage_analytics_retention_cutoff_ms(now_ms, retention_days);
let mut deleted_files = rollover_expired_active_segment(
config,
state,
connection_config_snapshot(connection_config),
cutoff_ms,
)?;
let mut deleted_files = {
// Serialize the active-segment rollover/discard against the append write
// path (see `TieredDuckDbUsageState::write_gate`): an append must not
// hold an in-flight writer for the active segment while this cycle
// deletes/rolls it. Only this rollover touches the active segment; the
// archived/detail cleanup below operates on other files and stays
// ungated so it never stalls appends.
let write_gate = {
let state = state
.lock()
.map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?;
Arc::clone(&state.write_gate)
};
let _write_guard = write_gate.lock_owned().await;
rollover_expired_active_segment(
config,
state,
connection_config_snapshot(connection_config),
cutoff_ms,
)?
};
let expired_segments = delete_expired_segments_from_catalog(catalog_backend, cutoff_ms)?;
for segment in &expired_segments {
deleted_files =
Expand Down
72 changes: 72 additions & 0 deletions llm-access-store/src/duckdb/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3065,3 +3065,75 @@ fn duckdb_initialization_drops_legacy_usage_art_indexes() {

std::fs::remove_dir_all(&root).expect("cleanup duckdb test directory");
}

#[cfg(feature = "duckdb-runtime")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn duckdb_tiered_append_serializes_retention_via_write_gate() {
let root = std::env::temp_dir()
.join(format!("llm-access-duckdb-test-{}-tiered-write-gate", std::process::id()));
let _ = std::fs::remove_dir_all(&root);
std::fs::create_dir_all(&root).expect("create tiered duckdb test directory");
// Large rollover threshold so the append's own size-based rollover never
// fires — this test only exercises append-vs-retention serialization.
let repo = std::sync::Arc::new(
super::DuckDbUsageRepository::open_tiered(super::TieredDuckDbUsageConfig {
active_dir: root.join("active"),
archive_dir: root.join("archive"),
rollover_bytes: 1 << 30,
details_dir: Some(details_store_dir(&root)),
})
.expect("open tiered duckdb usage db"),
);

// Install the seam so the spawned append parks — still holding the write
// gate — right after acquiring it.
let (reached_tx, reached_rx) = tokio::sync::oneshot::channel::<()>();
let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>();
match repo.inner.as_ref() {
super::DuckDbUsageRepositoryInner::Tiered {
state, ..
} => {
let mut state = state.lock().expect("lock tiered state");
state.append_seam = Some(super::AppendSeam {
reached: reached_tx,
proceed: proceed_rx,
});
},
_ => panic!("expected tiered repository"),
}

let append_repo = std::sync::Arc::clone(&repo);
let append_task = tokio::spawn(async move {
let mut event = test_usage_event();
event.event_id = "write-gate-inflight".to_string();
append_repo
.append_usage_event(&event)
.await
.expect("append parks at seam then completes");
});

// Wait until the append is parked while holding the gate.
reached_rx.await.expect("append reached the seam");

// Retention must block on the write gate while the append holds it.
let blocked = tokio::time::timeout(
std::time::Duration::from_millis(300),
repo.prune_usage_analytics(1_700_000_000_000, 30),
)
.await;
assert!(blocked.is_err(), "retention must block on the write gate while an append holds it");

// Release the parked append; it finishes and drops the gate.
proceed_tx.send(()).expect("signal append to proceed");
append_task.await.expect("append task joined");

// With the gate released, retention must no longer block.
let unblocked = tokio::time::timeout(
std::time::Duration::from_secs(5),
repo.prune_usage_analytics(1_700_000_000_000, 30),
)
.await;
assert!(unblocked.is_ok(), "retention must proceed once the append releases the gate");

std::fs::remove_dir_all(&root).expect("cleanup tiered duckdb test directory");
}
Loading