Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8,780 changes: 141 additions & 8,639 deletions llm-access-store/src/duckdb.rs

Large diffs are not rendered by default.

202 changes: 202 additions & 0 deletions llm-access-store/src/duckdb/append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
//! Usage-event append path into the tiered store, key-rollup
//! aggregation, and detail publish.

use std::{
collections::{BTreeMap, HashSet},
fs,
path::Path,
sync::{Arc, Mutex},
};

use anyhow::{anyhow, Context};
use llm_access_core::usage::UsageEvent;

use super::{
connection::connection_config_snapshot,
retention::{duckdb_wal_path, rollover_active_segment},
DuckDbUsageRepository, PersistentUsageWriter, SharedDuckDbUsageConnectionConfig,
TieredDuckDbUsageConfig, TieredDuckDbUsageState, TieredUsageCatalogBackend, UsageEventRow,
};
use crate::KeyUsageRollupSummary;

#[cfg(feature = "duckdb-runtime")]
pub fn key_usage_rollups_from_path(path: &Path) -> anyhow::Result<Vec<KeyUsageRollupSummary>> {
let conn = DuckDbUsageRepository::open_read_only_conn(path)?;
key_usage_rollups_from_conn(&conn)
}
#[cfg(feature = "duckdb-runtime")]
fn key_usage_rollups_from_conn(
conn: &duckdb::Connection,
) -> anyhow::Result<Vec<KeyUsageRollupSummary>> {
let mut stmt = conn
.prepare(
"SELECT
key_id,
CAST(COALESCE(sum(input_uncached_tokens), 0) AS BIGINT),
CAST(COALESCE(sum(input_cached_tokens), 0) AS BIGINT),
CAST(COALESCE(sum(output_tokens), 0) AS BIGINT),
CAST(COALESCE(sum(billable_tokens), 0) AS BIGINT),
CAST(COALESCE(sum(COALESCE(try_cast(credit_usage AS DOUBLE), 0)), 0) AS VARCHAR),
CAST(COALESCE(sum(CASE WHEN credit_usage_missing THEN 1 ELSE 0 END), 0) AS BIGINT),
max(created_at_ms)
FROM usage_events
GROUP BY key_id",
)
.context("prepare duckdb key usage rollup query")?;
let rows = stmt
.query_map([], |row| {
Ok(KeyUsageRollupSummary {
key_id: row.get(0)?,
input_uncached_tokens: row.get(1)?,
input_cached_tokens: row.get(2)?,
output_tokens: row.get(3)?,
billable_tokens: row.get(4)?,
credit_total: row.get(5)?,
credit_missing_events: row.get(6)?,
last_used_at_ms: row.get(7)?,
})
})
.context("query duckdb key usage rollups")?;
rows.collect::<Result<Vec<_>, _>>()
.context("collect duckdb key usage rollups")
}
#[cfg(feature = "duckdb-runtime")]
pub fn key_usage_rollups_from_tiered(
_config: &TieredDuckDbUsageConfig,
state: &Mutex<TieredDuckDbUsageState>,
catalog_backend: &TieredUsageCatalogBackend,
) -> anyhow::Result<Vec<KeyUsageRollupSummary>> {
let mut combined = BTreeMap::<String, KeyUsageRollupSummary>::new();
{
let state = state
.lock()
.map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?;
let conn = DuckDbUsageRepository::open_read_only_conn(&state.active_path)?;
for rollup in key_usage_rollups_from_conn(&conn)? {
merge_key_rollup(&mut combined, rollup);
}
}
for rollup in catalog_backend.archived_key_usage_rollups()? {
merge_key_rollup(&mut combined, rollup);
}
Ok(combined.into_values().collect())
}
#[cfg(feature = "duckdb-runtime")]
pub fn merge_key_rollup(
combined: &mut BTreeMap<String, KeyUsageRollupSummary>,
rollup: KeyUsageRollupSummary,
) {
let entry = combined
.entry(rollup.key_id.clone())
.or_insert_with(|| KeyUsageRollupSummary {
key_id: rollup.key_id.clone(),
input_uncached_tokens: 0,
input_cached_tokens: 0,
output_tokens: 0,
billable_tokens: 0,
credit_total: "0".to_string(),
credit_missing_events: 0,
last_used_at_ms: None,
});
entry.input_uncached_tokens = entry
.input_uncached_tokens
.saturating_add(rollup.input_uncached_tokens);
entry.input_cached_tokens = entry
.input_cached_tokens
.saturating_add(rollup.input_cached_tokens);
entry.output_tokens = entry.output_tokens.saturating_add(rollup.output_tokens);
entry.billable_tokens = entry.billable_tokens.saturating_add(rollup.billable_tokens);
let current_credit = entry.credit_total.parse::<f64>().unwrap_or(0.0);
let added_credit = rollup.credit_total.parse::<f64>().unwrap_or(0.0);
entry.credit_total = (current_credit + added_credit).to_string();
entry.credit_missing_events = entry
.credit_missing_events
.saturating_add(rollup.credit_missing_events);
entry.last_used_at_ms = match (entry.last_used_at_ms, rollup.last_used_at_ms) {
(Some(left), Some(right)) => Some(left.max(right)),
(None, Some(right)) => Some(right),
(left, None) => left,
};
}
#[cfg(feature = "duckdb-runtime")]
pub async fn append_usage_events_to_tiered(
config: &TieredDuckDbUsageConfig,
state: &Mutex<TieredDuckDbUsageState>,
connection_config: &SharedDuckDbUsageConnectionConfig,
catalog_backend: &Arc<TieredUsageCatalogBackend>,
rows: &[UsageEventRow],
) -> anyhow::Result<()> {
let connection_config_snapshot = connection_config_snapshot(connection_config);
Comment on lines +122 to +129
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Adding an early return when rows is empty prevents unnecessary locking, writer reopening, and setting active_has_rows = true (which could trigger empty rollovers).

pub async fn append_usage_events_to_tiered(
    config: &TieredDuckDbUsageConfig,
    state: &Mutex<TieredDuckDbUsageState>,
    connection_config: &SharedDuckDbUsageConnectionConfig,
    catalog_backend: &Arc<TieredUsageCatalogBackend>,
    rows: &[UsageEventRow],
) -> anyhow::Result<()> {
    if rows.is_empty() {
        return Ok(());
    }
    let connection_config_snapshot = connection_config_snapshot(connection_config);
References
  1. For functions performing database queries or cache invalidations based on a list of IDs, return early if the input list is empty to prevent redundant database round-trips and cache operations.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring. Unlike a pure no-op early return, this changes the empty-input semantics: the current path still locks, may roll over, calls insert_usage_events(&[]), and sets active_has_rows = true — the early return skips that. Every caller already guards is_empty() upstream so it's effectively unreachable, and folding a semantics change into a verbatim structural move would break this PR's behavior-preservation guarantee. Deferred to the perf/cleanup follow-up.

let mut writer = {
let mut state = state
.lock()
.map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?;
if state.active_has_rows
&& active_segment_disk_bytes(&state.active_path) >= config.rollover_bytes.max(1)
{
rollover_active_segment(
config,
&mut state,
connection_config_snapshot,
Arc::clone(catalog_backend),
)?;
}
let should_reopen = state
.active_writer
.as_ref()
.map(|writer| writer.connection_config != connection_config_snapshot)
.unwrap_or(true);
if should_reopen {
state.active_writer = Some(PersistentUsageWriter::open(
&state.active_path,
connection_config_snapshot,
state.detail_store.clone(),
)?);
}
state
.active_writer
.take()
.ok_or_else(|| anyhow!("tiered active writer missing after initialization"))?
};
writer.writer.insert_usage_events(rows).await?;
let mut state = state
.lock()
.map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?;
state.active_has_rows = true;
state.active_writer = Some(writer);
Comment on lines +130 to +166
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Concurrency Bug: Race Condition and Database Lock Contention

There is a critical concurrency issue in append_usage_events_to_tiered due to the way the state mutex is locked and unlocked around the asynchronous insert_usage_events call.

  1. The Issue:

    • A task locks the state mutex, takes the active_writer out of the state (setting state.active_writer to None), and releases the lock.
    • While the task is awaiting writer.writer.insert_usage_events(rows).await, a concurrent task can enter append_usage_events_to_tiered and lock the state mutex.
    • Since state.active_writer is now None, the second task will evaluate should_reopen as true and attempt to open a new PersistentUsageWriter on the same active_path via PersistentUsageWriter::open.
    • Because DuckDB only allows a single writer connection to a database file at any given time, this second attempt will fail with a database locked error (or potentially cause database corruption if locking is bypassed).
  2. Recommendation:

    • To safely serialize writes without releasing the writer or attempting concurrent opens, consider using an asynchronous mutex (such as tokio::sync::Mutex) for the write path or the entire TieredDuckDbUsageState.
    • Alternatively, you can serialize all write operations through a message-passing channel to a single background worker task that exclusively owns and manages the PersistentUsageWriter connection.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed real, but out of scope here — deferring to a dedicated PR. The mechanism holds: state is a std::sync::Mutex, and the writer is take()n out with the lock dropped across writer.writer.insert_usage_events(rows).await, so a concurrent caller sees active_writer == Noneshould_reopen == truePersistentUsageWriter::open on the same single-writer DuckDB file → lock error.

This is pre-existing code moved verbatim in a behavior-preserving structural-split PR (it reads identically on master). The fix is a concurrency redesign (async Mutex on the write path, or a single-writer actor behind a channel) — a behavior change that warrants its own PR plus a concurrency-stress test. I'll trace the caller's concurrency model first to gauge real-world severity (the usage worker may already serialize flushes through one task).

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update — I traced the caller's concurrency model, and the race is confirmed reachable in production, retracting my earlier "the worker may serialize flushes through one task" hedge.

The tiered repo's only writers are the usage worker's two loops, and the llm-access-usage-worker bin runs them on two separate OS threads, each with its own current_thread runtime:

  • import/relay loop (calls append_usage_events_ownedappend_usage_events_to_tiered)
  • maintenance loop (calls prune_usage_analyticsprune_tiered_usage_analytics)

Both hold Arc::clone of the same DuckDbUsageRepository(Tiered) (Arc::new(duckdb)primary; then maintenance.duckdb_usage = primary.usage_repository() = Arc::clone). (The API path writes to JournalUsageEventSink, not the tiered repo, so it isn't a third writer.)

The real trigger is append-vs-maintenance, not the append-vs-append double-open in the original comment: during append's insert_usage_events(...).await window (writer take()n out, the std::sync::Mutex dropped), the maintenance thread locks the state, sets active_writer = None, rolls over / discards the expired active segment and deletes its files (retention.rs has 5× active_writer = None). Append then returns and restores Some(writer) pointing at a segment that's been rolled away / deleted → lost writes / corruption.

Fix lands in a dedicated follow-up PR once this structural move merges (it edits the just-moved append.rs/retention.rs), with a reproducing concurrency test. Direction: an async write-gate (tokio::sync::Mutex) serializing the append and maintenance mutating paths against each other, keeping the short std::sync::Mutex critical sections and the concurrent read/query path intact.

if active_segment_disk_bytes(&state.active_path) >= config.rollover_bytes.max(1) {
rollover_active_segment(
config,
&mut state,
connection_config_snapshot,
Arc::clone(catalog_backend),
)?;
}
Ok(())
}
#[cfg(feature = "duckdb-runtime")]
pub async fn publish_pending_segment_details_if_configured(
config: &TieredDuckDbUsageConfig,
pending_path: &Path,
) -> anyhow::Result<()> {
let _ = (config, pending_path);
Ok(())
}
#[cfg(feature = "duckdb-runtime")]
pub fn dedupe_usage_events_owned(events: Vec<UsageEvent>) -> Vec<UsageEvent> {
let mut seen = HashSet::new();
let mut deduped = Vec::with_capacity(events.len());
for event in events {
if seen.insert(event.event_id.clone()) {
deduped.push(event);
}
}
deduped
}
#[cfg(feature = "duckdb-runtime")]
fn active_segment_disk_bytes(path: &Path) -> u64 {
fs::metadata(path).map(|meta| meta.len()).unwrap_or(0)
+ fs::metadata(duckdb_wal_path(path))
.map(|meta| meta.len())
.unwrap_or(0)
}
Loading
Loading