diff --git a/src/runtime/watch-pipeline-ingest.ts b/src/runtime/watch-pipeline-ingest.ts index 05b6a8e..18bd5b8 100644 --- a/src/runtime/watch-pipeline-ingest.ts +++ b/src/runtime/watch-pipeline-ingest.ts @@ -133,6 +133,7 @@ export function createWatchIngest(options: WatchIngestOptions): WatchIngest { poll_interval_ms: cfg.watch.poll_interval_ms, on_checkpoint: (ts) => { db.setState('file_watcher.checkpoint_ts', String(ts)) + db.flush() }, }) diff --git a/src/runtime/watch-pipeline-processor.ts b/src/runtime/watch-pipeline-processor.ts index adc51fc..1b36d77 100644 --- a/src/runtime/watch-pipeline-processor.ts +++ b/src/runtime/watch-pipeline-processor.ts @@ -67,6 +67,7 @@ export function createWatchPipelineProcessor(ingest: WatchIngest): WatchPipeline try { await db.insert(enriched) db.setState('events.last_ts', String(enriched.timestamp)) + db.flush() stats.stored++ } catch (err) { stats.errors++ diff --git a/src/sync/sync-engine.ts b/src/sync/sync-engine.ts index 0860aa1..f16c797 100644 --- a/src/sync/sync-engine.ts +++ b/src/sync/sync-engine.ts @@ -48,6 +48,11 @@ export class SyncEngine { } try { + // Commit any pending pipeline transaction so sync operates on a clean + // connection state. This is an intentional cross-subsystem commit: + // each pipeline event is an independent unit, so force-committing + // partial pipeline work is safe and prevents write-lock contention. + this.options.db.flush() const cfg = this.options.config.sync if (!cfg?.server_url) { result.errors.push('sync.server_url not configured')