Skip to content

refactor(llm-access-store): split duckdb.rs into concern submodules#16

Merged
acking-you merged 2 commits into
masterfrom
refactor/split-store-duckdb
May 31, 2026
Merged

refactor(llm-access-store): split duckdb.rs into concern submodules#16
acking-you merged 2 commits into
masterfrom
refactor/split-store-duckdb

Conversation

@acking-you
Copy link
Copy Markdown
Owner

What

Split the ~9.3k-line llm-access-store/src/duckdb.rs into a thin facade + 12 concern submodules. Unlike postgres.rs (a stateful PostgresControlRepository God-object, split across #14/#15), this file is a free-function DAG over private-field types, so the split needs ~no public-API churn.

mod.rs: 9253 → 755 lines.

New submodules (duckdb/)

module carries
sql insert/compact/select-expr/filter SQL builders
writer usage-event write path: detail rows/blobs/store + hot/persistent writers + insert executors
connection connection/config helpers, runtime-limit formatting, target init
catalog tiered usage-catalog backend (Postgres + in-memory test catalog) + catalog↔usage query translation
segment segment lifecycle: path layout, active-segment selection, async sealing/compaction, archive finalize + publish
append append path into the tiered store, key-rollup aggregation, detail publish
retention prune/rollover/discard expired segments, orphan cleanup, WAL/checkpoint helpers
query read path: list/get/totals across active+archived tiers, page planning, chart points, row decoders
filter_options usage filter-option discovery across tiers
metrics usage-metrics accumulation (summary/group/latency) + snapshot assembly
util small pure helpers (time, gzip, hashing, numeric casts)
repository DuckDbUsageRepository inherent impl + UsageEventSink / UsageAnalyticsStore trait impls

What stays in the facade

All structs/enums/type-aliases/consts/statics + the 4 ungated items + the one test-only fn (initialize_tiered_catalog). Submodules read those facade-private items as ancestor-privates (the "fat facade" — 417 private struct fields stay put, zero field-visibility change). The 45 tests move verbatim into duckdb/tests.rs via #[cfg(test)] mod tests;.

Visibility

Moved free fns become pub only where referenced from another region — 82 of 167 (incl. cross-module bare calls and super::<fn> test references; the rest stay private). Moved inherent-impl methods become pub(super) only where a foreign region calls them — 36; trait-impl methods keep the trait's visibility (never bumped). initialize_duckdb_target_path is re-exported from the facade to preserve crate::duckdb:: callers.

cfg note

Every mod X; decl is #[cfg(feature = "duckdb-runtime")]-gated, so feature-off the submodules vanish — preserving master's exact feature-off state. (master is not feature-off-clean: the ungated tiered_pending_dir references gated types. CI only ever builds default features — default → duckdb-prebuilt → duckdb-runtime — so the feature is always ON; this split changes nothing there.)

Behavior-preserving

Structural move only — fn/method bodies byte-identical.

  • Identical top-level name multiset (366) vs pre-split (verified by diff: 0 dropped, 0 added).
  • Same 45 #[test]/#[tokio::test].

Verification

  • cargo clippy -p llm-access-store --all-targets -- -D warnings → clean
  • cargo test -p llm-access-store → 67 passed
  • cargo build -p llm-access (reverse dep) → ok
  • rustfmt on the 14 changed files only; deps/lance/deps/lancedb untouched

🤖 Generated with Claude Code

Split the ~9.3k-line duckdb.rs into a thin facade + 12 concern submodules.
Unlike postgres (a stateful God-object), this file is a free-function DAG
over private-field types, so the split needs ~no public-API churn.

- 167 gated free fns + 16 gated impls move into duckdb/{sql, writer,
  connection, catalog, segment, append, retention, query, filter_options,
  metrics, util, repository}.
- All structs/enums/type-aliases/consts/statics + the 4 ungated items + the
  one test-only fn stay in the mod.rs facade; submodules read those
  facade-private items as ancestor-privates (the "fat facade", no field-vis
  change). facade 9253 -> 755 lines.
- The 45 tests move verbatim into duckdb/tests.rs (`#[cfg(test)] mod tests;`).

Behavior-preserving structural move: identical top-level name multiset
(366), same 45 tests. Moved free fns become `pub` only where referenced
from another region (82 of 167); moved INHERENT-impl methods become
`pub(super)` only where a foreign region calls them (36) -- trait-impl
methods keep the trait's visibility. `initialize_duckdb_target_path` is
re-exported from the facade to preserve `crate::duckdb::` callers.

cfg note: every `mod X;` decl is `#[cfg(feature = "duckdb-runtime")]`-gated
so feature-off the submodules vanish, preserving master's exact feature-off
state (master is not feature-off-clean: the ungated `tiered_pending_dir`
references gated types -- CI only ever builds default features, so the
feature is always ON).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@acking-you
Copy link
Copy Markdown
Owner Author

/gemini review

@acking-you
Copy link
Copy Markdown
Owner Author

@codex review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a DuckDB-based usage-event storage system, including tiered storage, key-rollup aggregation, catalog management, metrics accumulation, and retention pruning. The review feedback highlights a critical concurrency race condition in tiered event appending that could cause database lock contention, redundant cloning and deduplication on the write path, several instances of blocking synchronous I/O operations inside asynchronous functions, potential intermediate memory allocations during gzip/gunzip serialization, and silent thread spawning failures when sealing segments.

Comment on lines +130 to +166
let mut writer = {
let mut state = state
.lock()
.map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?;
if state.active_has_rows
&& active_segment_disk_bytes(&state.active_path) >= config.rollover_bytes.max(1)
{
rollover_active_segment(
config,
&mut state,
connection_config_snapshot,
Arc::clone(catalog_backend),
)?;
}
let should_reopen = state
.active_writer
.as_ref()
.map(|writer| writer.connection_config != connection_config_snapshot)
.unwrap_or(true);
if should_reopen {
state.active_writer = Some(PersistentUsageWriter::open(
&state.active_path,
connection_config_snapshot,
state.detail_store.clone(),
)?);
}
state
.active_writer
.take()
.ok_or_else(|| anyhow!("tiered active writer missing after initialization"))?
};
writer.writer.insert_usage_events(rows).await?;
let mut state = state
.lock()
.map_err(|_| anyhow!("tiered duckdb state lock poisoned"))?;
state.active_has_rows = true;
state.active_writer = Some(writer);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Concurrency Bug: Race Condition and Database Lock Contention

There is a critical concurrency issue in append_usage_events_to_tiered due to the way the state mutex is locked and unlocked around the asynchronous insert_usage_events call.

  1. The Issue:

    • A task locks the state mutex, takes the active_writer out of the state (setting state.active_writer to None), and releases the lock.
    • While the task is awaiting writer.writer.insert_usage_events(rows).await, a concurrent task can enter append_usage_events_to_tiered and lock the state mutex.
    • Since state.active_writer is now None, the second task will evaluate should_reopen as true and attempt to open a new PersistentUsageWriter on the same active_path via PersistentUsageWriter::open.
    • Because DuckDB only allows a single writer connection to a database file at any given time, this second attempt will fail with a database locked error (or potentially cause database corruption if locking is bypassed).
  2. Recommendation:

    • To safely serialize writes without releasing the writer or attempting concurrent opens, consider using an asynchronous mutex (such as tokio::sync::Mutex) for the write path or the entire TieredDuckDbUsageState.
    • Alternatively, you can serialize all write operations through a message-passing channel to a single background worker task that exclusively owns and manages the PersistentUsageWriter connection.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed real, but out of scope here — deferring to a dedicated PR. The mechanism holds: state is a std::sync::Mutex, and the writer is take()n out with the lock dropped across writer.writer.insert_usage_events(rows).await, so a concurrent caller sees active_writer == Noneshould_reopen == truePersistentUsageWriter::open on the same single-writer DuckDB file → lock error.

This is pre-existing code moved verbatim in a behavior-preserving structural-split PR (it reads identically on master). The fix is a concurrency redesign (async Mutex on the write path, or a single-writer actor behind a channel) — a behavior change that warrants its own PR plus a concurrency-stress test. I'll trace the caller's concurrency model first to gauge real-world severity (the usage worker may already serialize flushes through one task).

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update — I traced the caller's concurrency model, and the race is confirmed reachable in production, retracting my earlier "the worker may serialize flushes through one task" hedge.

The tiered repo's only writers are the usage worker's two loops, and the llm-access-usage-worker bin runs them on two separate OS threads, each with its own current_thread runtime:

  • import/relay loop (calls append_usage_events_ownedappend_usage_events_to_tiered)
  • maintenance loop (calls prune_usage_analyticsprune_tiered_usage_analytics)

Both hold Arc::clone of the same DuckDbUsageRepository(Tiered) (Arc::new(duckdb)primary; then maintenance.duckdb_usage = primary.usage_repository() = Arc::clone). (The API path writes to JournalUsageEventSink, not the tiered repo, so it isn't a third writer.)

The real trigger is append-vs-maintenance, not the append-vs-append double-open in the original comment: during append's insert_usage_events(...).await window (writer take()n out, the std::sync::Mutex dropped), the maintenance thread locks the state, sets active_writer = None, rolls over / discards the expired active segment and deletes its files (retention.rs has 5× active_writer = None). Append then returns and restores Some(writer) pointing at a segment that's been rolled away / deleted → lost writes / corruption.

Fix lands in a dedicated follow-up PR once this structural move merges (it edits the just-moved append.rs/retention.rs), with a reproducing concurrency test. Direction: an async write-gate (tokio::sync::Mutex) serializing the append and maintenance mutating paths against each other, keeping the short std::sync::Mutex critical sections and the concurrent read/query path intact.

Comment on lines +278 to +285
pub async fn append_usage_events_if_new(&self, events: &[UsageEvent]) -> anyhow::Result<usize> {
let deduped = dedupe_usage_events_owned(events.to_vec());
if deduped.is_empty() {
return Ok(0);
}
UsageEventSink::append_usage_events(self, &deduped).await?;
Ok(deduped.len())
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Performance: Redundant Cloning and Deduplication on Write Path

In append_usage_events_if_new, the events are cloned and deduplicated, and then passed to UsageEventSink::append_usage_events(self, &deduped).

However, append_usage_events clones the slice again to call append_usage_events_owned, which then runs the deduplication algorithm a second time. This results in double allocation, double cloning, and redundant CPU work on a hot write path.

Recommendation:
Call UsageEventSink::append_usage_events_owned directly with the already-deduplicated Vec to bypass the redundant clone and deduplication step.

Suggested change
pub async fn append_usage_events_if_new(&self, events: &[UsageEvent]) -> anyhow::Result<usize> {
let deduped = dedupe_usage_events_owned(events.to_vec());
if deduped.is_empty() {
return Ok(0);
}
UsageEventSink::append_usage_events(self, &deduped).await?;
Ok(deduped.len())
}
pub async fn append_usage_events_if_new(&self, events: &[UsageEvent]) -> anyhow::Result<usize> {
let deduped = dedupe_usage_events_owned(events.to_vec());
let len = deduped.len();
if len == 0 {
return Ok(0);
}
UsageEventSink::append_usage_events_owned(self, deduped).await?;
Ok(len)
}
References
  1. For functions performing database queries or cache invalidations based on a list of IDs, return early if the input list is empty to prevent redundant database round-trips and cache operations.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied in 6a393d8. One nuance on the framing: append_usage_events_owned still runs the inner dedup (a cheap no-op on already-unique event_ids, order-preserving), so the win is the eliminated .to_vec() clone, not a second dedup pass. Behavior is byte-identical (same rows, same return value). Verified: clippy -D warnings + the append-path integration tests (duckdb_repository_persists_usage_event_batches, ..._ignores_segment_local_duplicates) ran locally and pass.

Comment on lines +181 to +221
pub(super) async fn get_row_for_ref(
&self,
event_id: &str,
detail_ref: &UsageEventDetailObjectRef,
) -> anyhow::Result<Option<UsageEventDetailRow>> {
let pack_path = self.root_dir.join(&detail_ref.relative_path);
let mut file = match fs::File::open(&pack_path) {
Ok(file) => file,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => {
return Err(err).with_context(|| {
format!("failed to open usage detail pack `{}`", pack_path.display())
})
},
};
let range_len = detail_ref
.byte_range
.end
.checked_sub(detail_ref.byte_range.start)
.ok_or_else(|| anyhow!("usage detail pack byte range is invalid"))?;
let mut bytes =
vec![0_u8; usize::try_from(range_len).context("detail byte range too large")?];
file.seek(SeekFrom::Start(detail_ref.byte_range.start))
.with_context(|| {
format!("failed to seek usage detail pack `{}`", pack_path.display())
})?;
file.read_exact(&mut bytes).with_context(|| {
format!("failed to read usage detail pack `{}`", pack_path.display())
})?;
let actual_sha = sha256_hex(&bytes);
if actual_sha != detail_ref.sha256 {
return Err(anyhow!(
"usage detail pack member hash mismatch for event `{event_id}` in `{}`",
pack_path.display()
));
}
let blob: UsageEventDetailBlob = gunzip_json_bytes(&bytes).with_context(|| {
format!("failed to decode usage detail pack member `{}`", pack_path.display())
})?;
Ok(Some(blob.into_detail_row(event_id.to_string())))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Performance: Blocking I/O in Async Function

The get_row_for_ref function is async, but it performs synchronous blocking file I/O operations (fs::File::open, file.seek, and file.read_exact). Calling blocking operations directly inside an asynchronous context can block the Tokio executor thread, leading to latency spikes and reduced throughput under load.

Recommendation:
Wrap the blocking file operations in tokio::task::spawn_blocking to offload them to Tokio's blocking thread pool.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring. Offloading blocking I/O to spawn_blocking is sound async hygiene, but it's a behavior change, not a no-op: it moves the work to a different thread, changes panic→JoinError and cancellation semantics, and adds 'static/Send constraints that require restructuring the borrows. This is verbatim-moved pre-existing code in a behavior-preserving structural-split PR — the right home is a dedicated async-hygiene PR that handles the whole blocking-I/O read/write path consistently.

Comment on lines +165 to +179
async fn put_pack(&self, pack: UsageEventDetailPackWrite) -> anyhow::Result<()> {
let pack_path = self.root_dir.join(&pack.relative_path);
if let Some(parent) = pack_path.parent() {
fs::create_dir_all(parent).with_context(|| {
format!(
"failed to create usage detail pack parent directory `{}`",
parent.display()
)
})?;
}
fs::write(&pack_path, pack.bytes).with_context(|| {
format!("failed to write usage detail pack `{}`", pack_path.display())
})?;
Ok(())
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Performance: Blocking I/O in Async Function

The put_pack function is async, but it performs synchronous blocking file I/O operations (fs::create_dir_all and fs::write). This can block the Tokio executor thread.

Recommendation:
Wrap these blocking file operations in tokio::task::spawn_blocking to offload them to the blocking thread pool.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring. Offloading blocking I/O to spawn_blocking is sound async hygiene, but it's a behavior change, not a no-op: it moves the work to a different thread, changes panic→JoinError and cancellation semantics, and adds 'static/Send constraints that require restructuring the borrows. This is verbatim-moved pre-existing code in a behavior-preserving structural-split PR — the right home is a dedicated async-hygiene PR that handles the whole blocking-I/O read/write path consistently.

Comment on lines +32 to +65
pub async fn prune_tiered_usage_analytics(
config: &TieredDuckDbUsageConfig,
state: &Mutex<TieredDuckDbUsageState>,
connection_config: &SharedDuckDbUsageConnectionConfig,
catalog_backend: &TieredUsageCatalogBackend,
now_ms: i64,
retention_days: u64,
) -> anyhow::Result<UsageAnalyticsPruneReport> {
let cutoff_ms = usage_analytics_retention_cutoff_ms(now_ms, retention_days);
let mut deleted_files = rollover_expired_active_segment(
config,
state,
connection_config_snapshot(connection_config),
cutoff_ms,
)?;
let expired_segments = delete_expired_segments_from_catalog(catalog_backend, cutoff_ms)?;
for segment in &expired_segments {
deleted_files =
deleted_files.saturating_add(remove_duckdb_segment_files(&segment.archive_path)?);
if let Some(parent) = segment.archive_path.parent() {
let _ = prune_empty_directories_up_to(&config.archive_dir, parent);
}
}
let deleted_orphan_files = prune_orphan_archived_duckdb_files(config, catalog_backend)?;
let (deleted_detail_files, deleted_detail_dirs) =
prune_expired_detail_day_buckets(config, cutoff_ms)?;
Ok(UsageAnalyticsPruneReport {
deleted_segments: expired_segments.len(),
deleted_files,
deleted_orphan_files,
deleted_detail_files,
deleted_detail_dirs,
})
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Performance: Heavy Blocking I/O in Async Function

The prune_tiered_usage_analytics function is async, but it consists entirely of synchronous, heavy blocking I/O operations (such as recursive directory traversal, file deletions, and database queries). Calling this directly in an async context will block the Tokio runtime thread.

Recommendation:
Since all operations inside this function are synchronous, wrap the entire body of prune_tiered_usage_analytics in tokio::task::spawn_blocking to ensure it runs on the dedicated blocking thread pool.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring. Offloading blocking I/O to spawn_blocking is sound async hygiene, but it's a behavior change, not a no-op: it moves the work to a different thread, changes panic→JoinError and cancellation semantics, and adds 'static/Send constraints that require restructuring the borrows. This is verbatim-moved pre-existing code in a behavior-preserving structural-split PR — the right home is a dedicated async-hygiene PR that handles the whole blocking-I/O read/write path consistently.

Comment on lines +41 to +48
pub fn gzip_json_bytes<T: serde::Serialize>(value: &T) -> anyhow::Result<Vec<u8>> {
let json = serde_json::to_vec(value).context("serialize usage detail json")?;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(&json)
.context("write gzip usage detail payload")?;
encoder.finish().context("finish gzip usage detail payload")
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Performance: Avoid Intermediate Allocation during Gzip Serialization

Currently, gzip_json_bytes serializes the entire object into an intermediate Vec<u8> using serde_json::to_vec before writing it to the GzEncoder. For large payloads (such as LLM request/response bodies), this causes unnecessary memory overhead.

Recommendation:
Serialize directly into the GzEncoder using serde_json::to_writer to avoid the intermediate buffer allocation.

pub fn gzip_json_bytes<T: serde::Serialize>(value: &T) -> anyhow::Result<Vec<u8>> {
    let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
    serde_json::to_writer(&mut encoder, value).context("serialize and gzip usage detail json")?;
    encoder.finish().context("finish gzip usage detail payload")
}

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring. The recovered JSON would be identical, but the compressed bytes here are content-addressed: writer.rs computes compressed_sha = sha256_hex(&encoded) and stores/verifies it. I can't prove serde_json::to_writer into the encoder yields byte-identical deflate output vs the current to_vec + write_all (deflate block-splitting can differ under different write chunking), so this isn't obviously safe to fold into a behavior-preserving move. Good candidate for a dedicated perf PR that adds a byte-equality assertion across the two forms.

Comment on lines +50 to +57
pub fn gunzip_json_bytes<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> anyhow::Result<T> {
let mut decoder = GzDecoder::new(bytes);
let mut json = Vec::new();
decoder
.read_to_end(&mut json)
.context("gunzip usage detail payload")?;
serde_json::from_slice(&json).context("deserialize usage detail json")
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Performance: Avoid Intermediate Allocation during Gunzip Deserialization

Currently, gunzip_json_bytes reads the entire uncompressed payload into an intermediate Vec<u8> before deserializing it. This causes unnecessary memory allocation.

Recommendation:
Deserialize directly from the GzDecoder using serde_json::from_reader to avoid the intermediate buffer allocation.

pub fn gunzip_json_bytes<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> anyhow::Result<T> {
    let decoder = GzDecoder::new(bytes);
    serde_json::from_reader(decoder).context("deserialize gunzipped usage detail json")
}

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not applying — the premise is inverted for this case. serde_json's own guidance is that from_reader over an unbuffered reader (which GzDecoder<&[u8]> is) does many tiny reads and is slower than reading to a slice and using from_slice. The current read_to_end + from_slice is the recommended pattern, so this would be a regression, not a win. (Also verbatim-moved code in a structural-split PR.)

Comment on lines +293 to +295
let _ = thread::Builder::new()
.name("llm-access-duckdb-sealer".to_string())
.spawn(move || {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Reliability: Silent Thread Spawning Failures

Using let _ = thread::Builder::new().spawn(...) silently ignores any errors if spawning the background thread fails (e.g., due to system resource exhaustion). If thread spawning fails, the segment will never be sealed or archived, leading to data accumulation and potential data loss, with no error logged.

Recommendation:
Check the result of spawn and log any errors using tracing::error!.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring. Logging spawn failures is a real reliability win, but emitting a new log line is a new observable side effect — not behavior-preserving — and this is verbatim-moved code in a structural-split PR. Good candidate for a follow-up reliability PR covering both let _ = ...spawn() sites.

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Keep it up!

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a DuckDB-based tiered storage and analytics backend for usage events, including components for appending, querying, cataloging, retention pruning, and metrics accumulation. The review feedback focuses on robustness improvements, specifically suggesting an early return in append_usage_events_to_tiered when the rows slice is empty, checking for directory existence before clearing stale compacting files, and properly handling potential errors when spawning the background segment sealer thread.

Comment on lines +122 to +129
pub async fn append_usage_events_to_tiered(
config: &TieredDuckDbUsageConfig,
state: &Mutex<TieredDuckDbUsageState>,
connection_config: &SharedDuckDbUsageConnectionConfig,
catalog_backend: &Arc<TieredUsageCatalogBackend>,
rows: &[UsageEventRow],
) -> anyhow::Result<()> {
let connection_config_snapshot = connection_config_snapshot(connection_config);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Adding an early return when rows is empty prevents unnecessary locking, writer reopening, and setting active_has_rows = true (which could trigger empty rollovers).

pub async fn append_usage_events_to_tiered(
    config: &TieredDuckDbUsageConfig,
    state: &Mutex<TieredDuckDbUsageState>,
    connection_config: &SharedDuckDbUsageConnectionConfig,
    catalog_backend: &Arc<TieredUsageCatalogBackend>,
    rows: &[UsageEventRow],
) -> anyhow::Result<()> {
    if rows.is_empty() {
        return Ok(());
    }
    let connection_config_snapshot = connection_config_snapshot(connection_config);
References
  1. For functions performing database queries or cache invalidations based on a list of IDs, return early if the input list is empty to prevent redundant database round-trips and cache operations.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring. Unlike a pure no-op early return, this changes the empty-input semantics: the current path still locks, may roll over, calls insert_usage_events(&[]), and sets active_has_rows = true — the early return skips that. Every caller already guards is_empty() upstream so it's effectively unreachable, and folding a semantics change into a verbatim structural move would break this PR's behavior-preservation guarantee. Deferred to the perf/cleanup follow-up.

Comment on lines +120 to +122
pub fn clear_stale_compacting_files(config: &TieredDuckDbUsageConfig) -> anyhow::Result<()> {
let compacting_dir = tiered_compacting_dir(config);
for entry in fs::read_dir(&compacting_dir).with_context(|| {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Adding an existence check for compacting_dir makes the function robust against missing directories, aligning with the defensive patterns used elsewhere in the codebase.

pub fn clear_stale_compacting_files(config: &TieredDuckDbUsageConfig) -> anyhow::Result<()> {
    let compacting_dir = tiered_compacting_dir(config);
    if !compacting_dir.exists() {
        return Ok(());
    }
    for entry in fs::read_dir(&compacting_dir).with_context(|| {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring. This changes the error contract: today a missing compacting dir makes read_dir return an Err that propagates; the suggested guard returns Ok(()) instead. Reasonable hardening, but a behavior change on verbatim-moved code — out of scope for a structural-split PR. Deferred to the cleanup follow-up.

Comment on lines +286 to +331
pub fn spawn_segment_sealer(
config: TieredDuckDbUsageConfig,
catalog_backend: Arc<TieredUsageCatalogBackend>,
pending_path: PathBuf,
segment_id: String,
connection_config: SharedDuckDbUsageConnectionConfig,
) {
let _ = thread::Builder::new()
.name("llm-access-duckdb-sealer".to_string())
.spawn(move || {
let Ok(_sealer_guard) = TIERED_SEGMENT_SEALER_LOCK.lock() else {
eprintln!(
"failed to archive llm-access duckdb segment `{segment_id}` from `{}`: sealer \
lock poisoned",
pending_path.display()
);
return;
};
let mut last_err = None;
for attempt in 0..5 {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build tiered segment sealer runtime");
match runtime.block_on(publish_pending_segment_async(
&config,
catalog_backend.as_ref(),
&pending_path,
&segment_id,
connection_config_snapshot(&connection_config),
)) {
Ok(()) => return,
Err(err) => {
last_err = Some(err);
thread::sleep(Duration::from_millis(250 * (attempt + 1)));
},
}
}
if let Some(err) = last_err {
eprintln!(
"failed to archive llm-access duckdb segment `{segment_id}` from `{}`: {err:#}",
pending_path.display()
);
}
});
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Spawning a background thread with thread::Builder::new().spawn() can fail under resource exhaustion. Handling the potential error instead of silently ignoring it with let _ = ensures visibility into background task failures.

pub fn spawn_segment_sealer(
    config: TieredDuckDbUsageConfig,
    catalog_backend: Arc<TieredUsageCatalogBackend>,
    pending_path: PathBuf,
    segment_id: String,
    connection_config: SharedDuckDbUsageConnectionConfig,
) {
    if let Err(err) = thread::Builder::new()
        .name("llm-access-duckdb-sealer".to_string())
        .spawn(move || {
            let Ok(_sealer_guard) = TIERED_SEGMENT_SEALER_LOCK.lock() else {
                eprintln!(
                    "failed to archive llm-access duckdb segment `{segment_id}` from `{}`: sealer \
                     lock poisoned",
                    pending_path.display()
                );
                return;
            };
            let mut last_err = None;
            for attempt in 0..5 {
                let runtime = tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                    .expect("build tiered segment sealer runtime");
                match runtime.block_on(publish_pending_segment_async(
                    &config,
                    catalog_backend.as_ref(),
                    &pending_path,
                    &segment_id,
                    connection_config_snapshot(&connection_config),
                )) {
                    Ok(()) => return,
                    Err(err) => {
                        last_err = Some(err);
                        thread::sleep(Duration::from_millis(250 * (attempt + 1)));
                    },
                }
            }
            if let Some(err) = last_err {
                eprintln!(
                    "failed to archive llm-access duckdb segment `{segment_id}` from `{}`: {err:#}",
                    pending_path.display()
                );
            }
        })
    {
        eprintln!("failed to spawn segment sealer thread for `{segment_id}`: {err}");
    }
}

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring. Logging spawn failures is a real reliability win, but emitting a new log line is a new observable side effect — not behavior-preserving — and this is verbatim-moved code in a structural-split PR. Good candidate for a follow-up reliability PR covering both let _ = ...spawn() sites.

…path

append_usage_events_if_new already dedups into an owned Vec, then called
UsageEventSink::append_usage_events(&deduped), which clones the slice again
to reach append_usage_events_owned. Call append_usage_events_owned directly
with the owned Vec to drop that clone. Dedup is idempotent (unique
event_ids, order-preserving) so the inner dedup in _owned is a no-op and the
produced rows/return value are identical.

From PR #16 review (gemini). Verified: clippy -D warnings + 67 tests
(incl. the append-path integration tests) green locally.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@acking-you acking-you merged commit 72e9ceb into master May 31, 2026
3 checks passed
acking-you added a commit that referenced this pull request Jun 1, 2026
* fix(llm-access-store): address deferred duckdb review nits

Apply the genuinely-actionable findings deferred from the duckdb split (#16)
review, now that the structural moves have landed:

- segment.rs: `spawn_segment_sealer` swallowed `thread::Builder::spawn`
  failures via `let _ = ...`. On spawn failure (e.g. resource exhaustion) the
  segment would never seal/archive with no diagnostic. Capture the result and
  log it (matching the closure's existing eprintln! idiom).
- append.rs: `append_usage_events_to_tiered` now returns early on empty `rows`
  before taking the write gate — a no-op empty append must not reopen the
  writer or flip `active_has_rows` (which could trigger an empty rollover).
  Callers already guard upstream, so this is defensive.
- util.rs: `gzip_json_bytes` serializes directly into the `GzEncoder` via
  `serde_json::to_writer`, dropping the intermediate `Vec<u8>` (and the now
  unused `std::io::Write` import). Behavior-preserving: detail-pack bytes are
  sha'd and verified per-blob against their own stored bytes, never compared
  across writers — confirmed by the detail round-trip integration tests.

Deliberately NOT changed (declined with reasons):
- The suggested `clear_stale_compacting_files` missing-dir guard: its sole
  caller creates the compacting dir first, so the guarded condition is
  unreachable.
- `gunzip_json_bytes` -> `from_reader`: serde_json's own guidance is that
  `from_reader` over an unbuffered reader is slower than read-to-slice +
  `from_slice`; current code is the recommended form.

Verification:
- cargo clippy -p llm-access-store --all-targets -- -D warnings -> clean
- cargo test -p llm-access-store -> 68 passed
- cargo build -p llm-access (reverse dep) -> ok
- rustfmt on the 3 changed files only; deps/lance, deps/lancedb untouched

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(llm-access-store): buffer gzip_json_bytes writes to drop CPU regression

CR on PR #22: `serde_json::to_writer` straight into an unbuffered
`GzEncoder` emits many tiny writes, each forcing the compressor to run —
a CPU regression versus the bulk write it replaced.

Wrap the encoder in a `BufWriter` so serde's small writes are batched
into ~8KB chunks before reaching the compressor, while still avoiding
the intermediate uncompressed `Vec<u8>` (the PR's original intent).

Drain the buffer via `into_inner()` rather than `flush()`: `flush()`
propagates to `GzEncoder::flush()` and injects a DEFLATE sync-flush that
changes the output bytes, whereas `into_inner()` only writes out the
buffered bytes. The encoder therefore sees the same byte stream as a
single bulk write — output stays byte-identical to master (verified
against the `to_vec`+`write_all` path across empty/small/8KB-overflow
payloads).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant