From d0543e72b0c04a1f82ac23d3dabda9a10e019501 Mon Sep 17 00:00:00 2001 From: Karthik_Yeluripati Date: Thu, 18 Jun 2026 07:37:56 -0400 Subject: [PATCH 1/2] perf(windows): move local KG writes off main thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The synchronous DELETE+INSERT transaction in replaceLocalGraph was running on the Electron main thread, blocking all IPC for 1–3 s on large graphs. Fix: - kgWorker.ts: new worker_thread with its own WAL better-sqlite3 connection; prepares all statements once at startup, runs the full replace transaction off the main thread, returns `{type:'done',ms}`. - kg.ts: lazy worker lifecycle with coalescing queue (only the latest pending graph is kept while a write is in flight); in-memory kgSnapshot cache so empty-query reads skip SQLite entirely; kg:status and kg:saveGraph IPC return immediately. - db.ts: WAL + NORMAL sync unconditional (was bench-only); main thread reads are no longer blocked while the worker holds the write lock. - electron.vite.config.ts: second rollupOptions.input entry emits out/main/kgWorker.js alongside out/main/index.js. - electron-builder.yml: kgWorker.js added to asarUnpack so worker_threads can require() it in packaged builds. Co-Authored-By: Claude Sonnet 4.6 --- desktop/windows/electron-builder.yml | 3 + desktop/windows/electron.vite.config.ts | 14 ++- desktop/windows/src/main/ipc/db.ts | 16 ++-- desktop/windows/src/main/ipc/kg.ts | 115 +++++++++++++++++++++-- desktop/windows/src/main/ipc/kgWorker.ts | 72 ++++++++++++++ 5 files changed, 204 insertions(+), 16 deletions(-) create mode 100644 desktop/windows/src/main/ipc/kgWorker.ts diff --git a/desktop/windows/electron-builder.yml b/desktop/windows/electron-builder.yml index 1594c459b8..521d334b3a 100644 --- a/desktop/windows/electron-builder.yml +++ b/desktop/windows/electron-builder.yml @@ -19,6 +19,9 @@ asarUnpack: # koffi loads its native .node at runtime, resolved relative to its own package # dir — it must live outside the asar archive or the foreground monitor fails. - node_modules/koffi/** + # kgWorker.js is loaded via new Worker(path) which bypasses Electron's asar + # virtual-fs patch — it must be a real file on disk. + - out/main/kgWorker.js win: executableName: omi-windows target: diff --git a/desktop/windows/electron.vite.config.ts b/desktop/windows/electron.vite.config.ts index 93b74cf74d..529711ecad 100644 --- a/desktop/windows/electron.vite.config.ts +++ b/desktop/windows/electron.vite.config.ts @@ -3,7 +3,19 @@ import { defineConfig } from 'electron-vite' import react from '@vitejs/plugin-react' export default defineConfig({ - main: {}, + main: { + build: { + rollupOptions: { + input: { + index: resolve('src/main/index.ts'), + // Second entry so vite emits out/main/kgWorker.js alongside index.js. + // The worker file must be a separate bundle (not inlined) because + // new Worker(path) needs a real file — it can't load from the main bundle. + kgWorker: resolve('src/main/ipc/kgWorker.ts') + } + } + } + }, preload: {}, renderer: { // Pin the dev server to a fixed port so the renderer's origin diff --git a/desktop/windows/src/main/ipc/db.ts b/desktop/windows/src/main/ipc/db.ts index 6d6d989b11..a0a6591273 100644 --- a/desktop/windows/src/main/ipc/db.ts +++ b/desktop/windows/src/main/ipc/db.ts @@ -68,12 +68,12 @@ function get(): Database.Database { // never reads or writes the user's real omi.db. const file = process.env.OMI_DB_PATH ?? join(app.getPath('userData'), 'omi.db') db = new Database(file) - // For the throwaway bench DB only, relax durability so seeding ~7k rows isn't - // dominated by a per-insert fsync (otherwise it swamps the startup measurement). - if (process.env.OMI_DB_PATH) { - db.pragma('journal_mode = WAL') - db.pragma('synchronous = NORMAL') - } + // WAL mode: allows reads on the main thread to proceed concurrently while the + // KG write worker holds the write lock. NORMAL sync is crash-safe in WAL mode + // (may lose the last committed transaction on OS power-loss; acceptable for + // this derived cache). Previously bench-only; now unconditional. + db.pragma('journal_mode = WAL') + db.pragma('synchronous = NORMAL') // Migrate away the incompatible local_kg_* schema from the parked KG experiment. dropIfMissingColumn(db, 'local_kg_nodes', 'summary') dropIfMissingColumn(db, 'local_kg_edges', 'id') @@ -531,7 +531,9 @@ export function queryKgNodes(q: string, limit = 12): LocalKnowledgeGraph { aliases: parseJsonArray(r.aliasesJson), sourceRefs: parseJsonArray(r.sourceRefs) })) - if (nodes.length === 0) return { nodes: [], edges: [] } + if (nodes.length === 0) { + return { nodes: [], edges: [] } + } const ids = nodes.map((n) => n.id) const placeholders = ids.map(() => '?').join(',') const edges = d diff --git a/desktop/windows/src/main/ipc/kg.ts b/desktop/windows/src/main/ipc/kg.ts index a2a9800d25..f02a135452 100644 --- a/desktop/windows/src/main/ipc/kg.ts +++ b/desktop/windows/src/main/ipc/kg.ts @@ -1,23 +1,122 @@ -import { ipcMain } from 'electron' +import { app, ipcMain } from 'electron' +import { join } from 'path' +import { Worker } from 'worker_threads' import { execSafeSelect, getFileIndexDigest, getLocalKGStatus, queryKgNodes, - replaceLocalGraph, searchIndexedFiles } from './db' import { guardSelect } from '../../shared/sqlGuard' import type { LocalKnowledgeGraph } from '../../shared/types' -// All local-knowledge-graph IPC. Kept in this dedicated module so registration -// is a single append in index.ts (conflict discipline with the concurrent -// integrations/Settings work). +// --------------------------------------------------------------------------- +// KG write worker +// +// Writes run in a worker_thread so the Electron main thread stays free for +// IPC during the synchronous DELETE+INSERT transaction. +// +// Lifecycle: +// - Worker is created lazily on the first kg:saveGraph call. +// - At most one write runs at a time; subsequent kg:saveGraph calls are +// coalesced — only the latest pending graph is kept. +// - Reads (queryNodes / status) run on the main thread via WAL mode. +// - kgSnapshot caches the last successfully written graph so empty-query +// reads skip SQLite entirely. +// --------------------------------------------------------------------------- + +let worker: Worker | null = null +let workerBusy = false +let pendingGraph: LocalKnowledgeGraph | null = null +let lastDispatched: LocalKnowledgeGraph | null = null +let kgSnapshot: LocalKnowledgeGraph | null = null + +function dbPath(): string { + return process.env.OMI_DB_PATH ?? join(app.getPath('userData'), 'omi.db') +} + +function workerScriptPath(): string { + // Packaged builds: kgWorker.js is unpacked from the asar (see electron-builder.yml). + // Dev: vite emits kgWorker.js into out/main/ alongside index.js. + if (app.isPackaged) { + return join(process.resourcesPath, 'app.asar.unpacked', 'out', 'main', 'kgWorker.js') + } + return join(__dirname, 'kgWorker.js') +} + +function ensureWorker(): Worker { + if (worker) return worker + worker = new Worker(workerScriptPath(), { workerData: { dbPath: dbPath() } }) + worker.on('message', (msg: { type: string; ms?: number; message?: string }) => { + if (msg.type === 'done') { + kgSnapshot = lastDispatched + } else if (msg.type === 'error') { + console.error('[kg:worker] saveGraph error:', msg.message) + } + workerBusy = false + flushPending() + }) + worker.on('error', (err) => { + console.error('[kg:worker] crash:', err.message) + worker = null + workerBusy = false + flushPending() + }) + return worker +} + +function flushPending(): void { + if (pendingGraph !== null) { + const next = pendingGraph + pendingGraph = null + dispatch(next) + } +} + +function dispatch(graph: LocalKnowledgeGraph): void { + workerBusy = true + lastDispatched = graph + ensureWorker().postMessage({ type: 'replace', nodes: graph.nodes, edges: graph.edges }) +} + +function enqueueGraph(graph: LocalKnowledgeGraph): void { + if (workerBusy) { + pendingGraph = graph + return + } + dispatch(graph) +} + +// --------------------------------------------------------------------------- +// IPC handlers +// --------------------------------------------------------------------------- + export function registerKgHandlers(): void { ipcMain.handle('kg:fileIndexDigest', async () => getFileIndexDigest()) - ipcMain.handle('kg:saveGraph', async (_e, graph: LocalKnowledgeGraph) => replaceLocalGraph(graph)) - ipcMain.handle('kg:status', async () => getLocalKGStatus()) - ipcMain.handle('kg:queryNodes', async (_e, q: string, limit?: number) => queryKgNodes(q, limit)) + + // Offloaded to worker — returns immediately, write completes asynchronously. + ipcMain.handle('kg:saveGraph', (_e, graph: LocalKnowledgeGraph) => { + enqueueGraph(graph) + }) + + ipcMain.handle('kg:status', () => getLocalKGStatus()) + + ipcMain.handle('kg:queryNodes', (_e, q: string, limit?: number) => { + if (q === '' && kgSnapshot !== null) { + // Hot path: serve from in-memory snapshot, no SQLite access required. + const cap = limit ?? 80 + const nodes = kgSnapshot.nodes + .slice() + .sort((a, b) => b.createdAt - a.createdAt) + .slice(0, cap) + const idSet = new Set(nodes.map((n) => n.id)) + const edges = kgSnapshot.edges.filter((e) => idSet.has(e.sourceId) || idSet.has(e.targetId)) + return { nodes, edges } + } + return queryKgNodes(q, limit) + }) + ipcMain.handle('kg:searchFiles', async (_e, q: string, fileType?: string, limit?: number) => searchIndexedFiles(q, fileType, limit) ) diff --git a/desktop/windows/src/main/ipc/kgWorker.ts b/desktop/windows/src/main/ipc/kgWorker.ts new file mode 100644 index 0000000000..0eceba32d4 --- /dev/null +++ b/desktop/windows/src/main/ipc/kgWorker.ts @@ -0,0 +1,72 @@ +/** + * KG write worker — runs in a Node.js worker_thread so the Electron main + * thread stays free for IPC during the synchronous SQLite replace transaction. + * + * Protocol (parentPort messages): + * Receive: { type: 'replace'; nodes: KgNode[]; edges: KgEdge[] } + * Send: { type: 'done'; ms: number } + * { type: 'error'; message: string } + * + * workerData: { dbPath: string } + */ +import { parentPort, workerData } from 'worker_threads' +import Database from 'better-sqlite3' + +const d = new Database((workerData as { dbPath: string }).dbPath) +// WAL: readers on the main thread are not blocked while we hold the write lock. +d.pragma('journal_mode = WAL') +d.pragma('synchronous = NORMAL') + +// Prepare all statements once at startup. +const insertNode = d.prepare( + `INSERT OR REPLACE INTO local_kg_nodes + (id, label, node_type, summary, source, created_at, aliases_json, source_refs) + VALUES (@id, @label, @nodeType, @summary, @source, @createdAt, @aliasesJson, @sourceRefs)` +) +const insertEdge = d.prepare( + `INSERT OR REPLACE INTO local_kg_edges (id, source_id, target_id, label, created_at) + VALUES (@id, @sourceId, @targetId, @label, @createdAt)` +) +const deleteEdges = d.prepare('DELETE FROM local_kg_edges') +const deleteNodes = d.prepare('DELETE FROM local_kg_nodes') + +type KgNode = { + id: string + label: string + nodeType: string + summary: string + source: string + createdAt: number + aliases?: string[] + sourceRefs?: string[] +} +type KgEdge = { id: string; sourceId: string; targetId: string; label: string; createdAt: number } + +const doReplace = d.transaction((nodes: KgNode[], edges: KgEdge[]) => { + deleteEdges.run() + deleteNodes.run() + for (const n of nodes) { + insertNode.run({ + id: n.id, + label: n.label, + nodeType: n.nodeType, + summary: n.summary, + source: n.source, + createdAt: n.createdAt, + aliasesJson: n.aliases?.length ? JSON.stringify(n.aliases) : null, + sourceRefs: n.sourceRefs?.length ? JSON.stringify(n.sourceRefs) : null + }) + } + for (const e of edges) insertEdge.run(e) +}) + +parentPort!.on('message', (msg: { type: string; nodes: KgNode[]; edges: KgEdge[] }) => { + if (msg.type !== 'replace') return + const t0 = performance.now() + try { + doReplace(msg.nodes, msg.edges) + parentPort!.postMessage({ type: 'done', ms: Math.round(performance.now() - t0) }) + } catch (err) { + parentPort!.postMessage({ type: 'error', message: (err as Error).message }) + } +}) From 0394af6ace5fd3943fad6a31bcaa2ac6b56fbb1d Mon Sep 17 00:00:00 2001 From: Karthik_Yeluripati Date: Thu, 18 Jun 2026 08:24:09 -0400 Subject: [PATCH 2/2] fix(windows): address Greptile P1 review findings in kg.ts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - dispatch(): wrap ensureWorker().postMessage() in try-catch so a Worker construction failure (e.g. missing kgWorker.js in packaged build) resets workerBusy and retries via flushPending() instead of silently deadlocking all future kg:saveGraph calls for the session. - kg:queryNodes: resolve cap = limit ?? 80 once before the snapshot/DB branch so both paths return the same node count; previously the DB fallback used queryKgNodes(q, limit) which defaulted to 12, while the snapshot path defaulted to 80 — callers saw different context depending on whether the first worker write had completed. Co-Authored-By: Claude Sonnet 4.6 --- desktop/windows/src/main/ipc/kg.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/desktop/windows/src/main/ipc/kg.ts b/desktop/windows/src/main/ipc/kg.ts index f02a135452..dbbd55c51b 100644 --- a/desktop/windows/src/main/ipc/kg.ts +++ b/desktop/windows/src/main/ipc/kg.ts @@ -77,7 +77,16 @@ function flushPending(): void { function dispatch(graph: LocalKnowledgeGraph): void { workerBusy = true lastDispatched = graph - ensureWorker().postMessage({ type: 'replace', nodes: graph.nodes, edges: graph.edges }) + try { + ensureWorker().postMessage({ type: 'replace', nodes: graph.nodes, edges: graph.edges }) + } catch (err) { + // Worker construction failed (e.g. missing kgWorker.js in packaged build). + // Reset state so future saves can retry rather than being silently dropped. + console.error('[kg:worker] failed to dispatch:', (err as Error).message) + worker = null + workerBusy = false + flushPending() + } } function enqueueGraph(graph: LocalKnowledgeGraph): void { @@ -103,9 +112,10 @@ export function registerKgHandlers(): void { ipcMain.handle('kg:status', () => getLocalKGStatus()) ipcMain.handle('kg:queryNodes', (_e, q: string, limit?: number) => { + // Resolve cap once so snapshot and DB paths always return the same count. + const cap = limit ?? 80 if (q === '' && kgSnapshot !== null) { // Hot path: serve from in-memory snapshot, no SQLite access required. - const cap = limit ?? 80 const nodes = kgSnapshot.nodes .slice() .sort((a, b) => b.createdAt - a.createdAt) @@ -114,7 +124,7 @@ export function registerKgHandlers(): void { const edges = kgSnapshot.edges.filter((e) => idSet.has(e.sourceId) || idSet.has(e.targetId)) return { nodes, edges } } - return queryKgNodes(q, limit) + return queryKgNodes(q, cap) }) ipcMain.handle('kg:searchFiles', async (_e, q: string, fileType?: string, limit?: number) =>