diff --git a/llm-access-store/src/duckdb.rs b/llm-access-store/src/duckdb.rs index 662fe68..3c3e955 100644 --- a/llm-access-store/src/duckdb.rs +++ b/llm-access-store/src/duckdb.rs @@ -519,6 +519,26 @@ struct TieredDuckDbUsageState { active_has_rows: bool, active_writer: Option, detail_store: Option>, + /// Serializes the append write path against retention's active-segment + /// rollover/discard. A retention cycle must not delete or roll the active + /// segment while an append holds its writer across the insert `.await` — + /// otherwise the in-flight writer is orphaned onto a deleted/rolled segment + /// and its rows (and all subsequent appends via the stale writer) are lost. + write_gate: Arc>, + /// Test-only deterministic interleaving hook: when set, an append parks + /// (still holding `write_gate`) right after acquiring the gate, letting a + /// test observe whether retention is serialized behind it. + #[cfg(test)] + append_seam: Option, +} + +/// Test-only one-shot handshake used to park an in-flight append at a known +/// point while it holds the tiered `write_gate`. +#[cfg(test)] +#[derive(Debug)] +struct AppendSeam { + reached: tokio::sync::oneshot::Sender<()>, + proceed: tokio::sync::oneshot::Receiver<()>, } #[cfg(feature = "duckdb-runtime")] diff --git a/llm-access-store/src/duckdb/append.rs b/llm-access-store/src/duckdb/append.rs index 62d9ff4..f993d3d 100644 --- a/llm-access-store/src/duckdb/append.rs +++ b/llm-access-store/src/duckdb/append.rs @@ -126,6 +126,31 @@ pub async fn append_usage_events_to_tiered( catalog_backend: &Arc, rows: &[UsageEventRow], ) -> anyhow::Result<()> { + // Serialize against retention's active-segment rollover/discard (see + // `TieredDuckDbUsageState::write_gate`): hold the gate for the whole append + // so a concurrent retention cycle cannot delete/roll the active segment + // while this append holds its writer across the insert `.await` — which + // would orphan the writer onto a deleted segment and lose its rows. + let write_gate = { + let state = state + .lock() + .map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?; + Arc::clone(&state.write_gate) + }; + let _write_guard = write_gate.lock_owned().await; + #[cfg(test)] + { + let seam = { + let mut state = state + .lock() + .map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?; + state.append_seam.take() + }; + if let Some(seam) = seam { + let _ = seam.reached.send(()); + let _ = seam.proceed.await; + } + } let connection_config_snapshot = connection_config_snapshot(connection_config); let mut writer = { let mut state = state diff --git a/llm-access-store/src/duckdb/repository.rs b/llm-access-store/src/duckdb/repository.rs index 7cc42e8..2fb5eec 100644 --- a/llm-access-store/src/duckdb/repository.rs +++ b/llm-access-store/src/duckdb/repository.rs @@ -173,6 +173,9 @@ impl DuckDbUsageRepository { active_has_rows, active_writer: None, detail_store, + write_gate: Arc::new(tokio::sync::Mutex::new(())), + #[cfg(test)] + append_seam: None, })), connection_config, catalog_backend, diff --git a/llm-access-store/src/duckdb/retention.rs b/llm-access-store/src/duckdb/retention.rs index 9db9c21..d78d12a 100644 --- a/llm-access-store/src/duckdb/retention.rs +++ b/llm-access-store/src/duckdb/retention.rs @@ -38,12 +38,27 @@ pub async fn prune_tiered_usage_analytics( retention_days: u64, ) -> anyhow::Result { let cutoff_ms = usage_analytics_retention_cutoff_ms(now_ms, retention_days); - let mut deleted_files = rollover_expired_active_segment( - config, - state, - connection_config_snapshot(connection_config), - cutoff_ms, - )?; + let mut deleted_files = { + // Serialize the active-segment rollover/discard against the append write + // path (see `TieredDuckDbUsageState::write_gate`): an append must not + // hold an in-flight writer for the active segment while this cycle + // deletes/rolls it. Only this rollover touches the active segment; the + // archived/detail cleanup below operates on other files and stays + // ungated so it never stalls appends. + let write_gate = { + let state = state + .lock() + .map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?; + Arc::clone(&state.write_gate) + }; + let _write_guard = write_gate.lock_owned().await; + rollover_expired_active_segment( + config, + state, + connection_config_snapshot(connection_config), + cutoff_ms, + )? + }; let expired_segments = delete_expired_segments_from_catalog(catalog_backend, cutoff_ms)?; for segment in &expired_segments { deleted_files = diff --git a/llm-access-store/src/duckdb/tests.rs b/llm-access-store/src/duckdb/tests.rs index 2ae266d..2fc061f 100644 --- a/llm-access-store/src/duckdb/tests.rs +++ b/llm-access-store/src/duckdb/tests.rs @@ -3065,3 +3065,75 @@ fn duckdb_initialization_drops_legacy_usage_art_indexes() { std::fs::remove_dir_all(&root).expect("cleanup duckdb test directory"); } + +#[cfg(feature = "duckdb-runtime")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn duckdb_tiered_append_serializes_retention_via_write_gate() { + let root = std::env::temp_dir() + .join(format!("llm-access-duckdb-test-{}-tiered-write-gate", std::process::id())); + let _ = std::fs::remove_dir_all(&root); + std::fs::create_dir_all(&root).expect("create tiered duckdb test directory"); + // Large rollover threshold so the append's own size-based rollover never + // fires — this test only exercises append-vs-retention serialization. + let repo = std::sync::Arc::new( + super::DuckDbUsageRepository::open_tiered(super::TieredDuckDbUsageConfig { + active_dir: root.join("active"), + archive_dir: root.join("archive"), + rollover_bytes: 1 << 30, + details_dir: Some(details_store_dir(&root)), + }) + .expect("open tiered duckdb usage db"), + ); + + // Install the seam so the spawned append parks — still holding the write + // gate — right after acquiring it. + let (reached_tx, reached_rx) = tokio::sync::oneshot::channel::<()>(); + let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>(); + match repo.inner.as_ref() { + super::DuckDbUsageRepositoryInner::Tiered { + state, .. + } => { + let mut state = state.lock().expect("lock tiered state"); + state.append_seam = Some(super::AppendSeam { + reached: reached_tx, + proceed: proceed_rx, + }); + }, + _ => panic!("expected tiered repository"), + } + + let append_repo = std::sync::Arc::clone(&repo); + let append_task = tokio::spawn(async move { + let mut event = test_usage_event(); + event.event_id = "write-gate-inflight".to_string(); + append_repo + .append_usage_event(&event) + .await + .expect("append parks at seam then completes"); + }); + + // Wait until the append is parked while holding the gate. + reached_rx.await.expect("append reached the seam"); + + // Retention must block on the write gate while the append holds it. + let blocked = tokio::time::timeout( + std::time::Duration::from_millis(300), + repo.prune_usage_analytics(1_700_000_000_000, 30), + ) + .await; + assert!(blocked.is_err(), "retention must block on the write gate while an append holds it"); + + // Release the parked append; it finishes and drops the gate. + proceed_tx.send(()).expect("signal append to proceed"); + append_task.await.expect("append task joined"); + + // With the gate released, retention must no longer block. + let unblocked = tokio::time::timeout( + std::time::Duration::from_secs(5), + repo.prune_usage_analytics(1_700_000_000_000, 30), + ) + .await; + assert!(unblocked.is_ok(), "retention must proceed once the append releases the gate"); + + std::fs::remove_dir_all(&root).expect("cleanup tiered duckdb test directory"); +}