Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions desktop/windows/electron-builder.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ asarUnpack:
# koffi loads its native .node at runtime, resolved relative to its own package
# dir — it must live outside the asar archive or the foreground monitor fails.
- node_modules/koffi/**
# kgWorker.js is loaded via new Worker(path) which bypasses Electron's asar
# virtual-fs patch — it must be a real file on disk.
- out/main/kgWorker.js
win:
executableName: omi-windows
target:
Expand Down
14 changes: 13 additions & 1 deletion desktop/windows/electron.vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,19 @@ import { defineConfig } from 'electron-vite'
import react from '@vitejs/plugin-react'

export default defineConfig({
main: {},
main: {
build: {
rollupOptions: {
input: {
index: resolve('src/main/index.ts'),
// Second entry so vite emits out/main/kgWorker.js alongside index.js.
// The worker file must be a separate bundle (not inlined) because
// new Worker(path) needs a real file — it can't load from the main bundle.
kgWorker: resolve('src/main/ipc/kgWorker.ts')
}
}
}
},
preload: {},
renderer: {
// Pin the dev server to a fixed port so the renderer's origin
Expand Down
16 changes: 9 additions & 7 deletions desktop/windows/src/main/ipc/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ function get(): Database.Database {
// never reads or writes the user's real omi.db.
const file = process.env.OMI_DB_PATH ?? join(app.getPath('userData'), 'omi.db')
db = new Database(file)
// For the throwaway bench DB only, relax durability so seeding ~7k rows isn't
// dominated by a per-insert fsync (otherwise it swamps the startup measurement).
if (process.env.OMI_DB_PATH) {
db.pragma('journal_mode = WAL')
db.pragma('synchronous = NORMAL')
}
// WAL mode: allows reads on the main thread to proceed concurrently while the
// KG write worker holds the write lock. NORMAL sync is crash-safe in WAL mode
// (may lose the last committed transaction on OS power-loss; acceptable for
// this derived cache). Previously bench-only; now unconditional.
db.pragma('journal_mode = WAL')
db.pragma('synchronous = NORMAL')
// Migrate away the incompatible local_kg_* schema from the parked KG experiment.
dropIfMissingColumn(db, 'local_kg_nodes', 'summary')
dropIfMissingColumn(db, 'local_kg_edges', 'id')
Expand Down Expand Up @@ -531,7 +531,9 @@ export function queryKgNodes(q: string, limit = 12): LocalKnowledgeGraph {
aliases: parseJsonArray(r.aliasesJson),
sourceRefs: parseJsonArray(r.sourceRefs)
}))
if (nodes.length === 0) return { nodes: [], edges: [] }
if (nodes.length === 0) {
return { nodes: [], edges: [] }
}
const ids = nodes.map((n) => n.id)
const placeholders = ids.map(() => '?').join(',')
const edges = d
Expand Down
125 changes: 117 additions & 8 deletions desktop/windows/src/main/ipc/kg.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,132 @@
import { ipcMain } from 'electron'
import { app, ipcMain } from 'electron'
import { join } from 'path'
import { Worker } from 'worker_threads'
import {
execSafeSelect,
getFileIndexDigest,
getLocalKGStatus,
queryKgNodes,
replaceLocalGraph,
searchIndexedFiles
} from './db'
import { guardSelect } from '../../shared/sqlGuard'
import type { LocalKnowledgeGraph } from '../../shared/types'

// All local-knowledge-graph IPC. Kept in this dedicated module so registration
// is a single append in index.ts (conflict discipline with the concurrent
// integrations/Settings work).
// ---------------------------------------------------------------------------
// KG write worker
//
// Writes run in a worker_thread so the Electron main thread stays free for
// IPC during the synchronous DELETE+INSERT transaction.
//
// Lifecycle:
// - Worker is created lazily on the first kg:saveGraph call.
// - At most one write runs at a time; subsequent kg:saveGraph calls are
// coalesced — only the latest pending graph is kept.
// - Reads (queryNodes / status) run on the main thread via WAL mode.
// - kgSnapshot caches the last successfully written graph so empty-query
// reads skip SQLite entirely.
// ---------------------------------------------------------------------------

let worker: Worker | null = null
let workerBusy = false
let pendingGraph: LocalKnowledgeGraph | null = null
let lastDispatched: LocalKnowledgeGraph | null = null
let kgSnapshot: LocalKnowledgeGraph | null = null

function dbPath(): string {
return process.env.OMI_DB_PATH ?? join(app.getPath('userData'), 'omi.db')
}

function workerScriptPath(): string {
// Packaged builds: kgWorker.js is unpacked from the asar (see electron-builder.yml).
// Dev: vite emits kgWorker.js into out/main/ alongside index.js.
if (app.isPackaged) {
return join(process.resourcesPath, 'app.asar.unpacked', 'out', 'main', 'kgWorker.js')
}
return join(__dirname, 'kgWorker.js')
}

function ensureWorker(): Worker {
if (worker) return worker
worker = new Worker(workerScriptPath(), { workerData: { dbPath: dbPath() } })
worker.on('message', (msg: { type: string; ms?: number; message?: string }) => {
if (msg.type === 'done') {
kgSnapshot = lastDispatched
} else if (msg.type === 'error') {
console.error('[kg:worker] saveGraph error:', msg.message)
}
workerBusy = false
flushPending()
})
worker.on('error', (err) => {
console.error('[kg:worker] crash:', err.message)
worker = null
workerBusy = false
flushPending()
})
return worker
}

function flushPending(): void {
if (pendingGraph !== null) {
const next = pendingGraph
pendingGraph = null
dispatch(next)
}
}

function dispatch(graph: LocalKnowledgeGraph): void {
workerBusy = true
lastDispatched = graph
try {
ensureWorker().postMessage({ type: 'replace', nodes: graph.nodes, edges: graph.edges })
} catch (err) {
// Worker construction failed (e.g. missing kgWorker.js in packaged build).
// Reset state so future saves can retry rather than being silently dropped.
console.error('[kg:worker] failed to dispatch:', (err as Error).message)
worker = null
workerBusy = false
flushPending()
}
}
Comment on lines +77 to +90

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Write queue deadlocks if new Worker() throws

workerBusy is set to true on line 78 before ensureWorker() is called. If new Worker(workerScriptPath(), ...) throws synchronously — for example because kgWorker.js is missing from app.asar.unpacked in a misconfigured packaged build, or the path resolution fails — the exception propagates out of dispatch() uncaught, leaving workerBusy = true permanently. Every subsequent kg:saveGraph call then writes to pendingGraph and returns, but flushPending() is never called because workerBusy never clears. All future graph saves are silently dropped for the lifetime of the session.


function enqueueGraph(graph: LocalKnowledgeGraph): void {
if (workerBusy) {
pendingGraph = graph
return
}
dispatch(graph)
}

// ---------------------------------------------------------------------------
// IPC handlers
// ---------------------------------------------------------------------------

export function registerKgHandlers(): void {
ipcMain.handle('kg:fileIndexDigest', async () => getFileIndexDigest())
ipcMain.handle('kg:saveGraph', async (_e, graph: LocalKnowledgeGraph) => replaceLocalGraph(graph))
ipcMain.handle('kg:status', async () => getLocalKGStatus())
ipcMain.handle('kg:queryNodes', async (_e, q: string, limit?: number) => queryKgNodes(q, limit))

// Offloaded to worker — returns immediately, write completes asynchronously.
ipcMain.handle('kg:saveGraph', (_e, graph: LocalKnowledgeGraph) => {
enqueueGraph(graph)
})
Comment on lines +107 to +110

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 kg:saveGraph changed from awaitable to fire-and-forget

Previously ipcRenderer.invoke('kg:saveGraph', graph) resolved only after the SQLite transaction committed, letting callers safely read back their own writes. Now it resolves immediately. Any renderer code that does await invoke('kg:saveGraph', ...) followed by invoke('kg:queryNodes', '') will read the pre-save state — neither the DB nor kgSnapshot has been updated yet. Worth auditing the renderer call-sites to ensure no query immediately follows a save.


ipcMain.handle('kg:status', () => getLocalKGStatus())

ipcMain.handle('kg:queryNodes', (_e, q: string, limit?: number) => {
// Resolve cap once so snapshot and DB paths always return the same count.
const cap = limit ?? 80
if (q === '' && kgSnapshot !== null) {
// Hot path: serve from in-memory snapshot, no SQLite access required.
const nodes = kgSnapshot.nodes
.slice()
.sort((a, b) => b.createdAt - a.createdAt)
.slice(0, cap)
const idSet = new Set(nodes.map((n) => n.id))
const edges = kgSnapshot.edges.filter((e) => idSet.has(e.sourceId) || idSet.has(e.targetId))
return { nodes, edges }
Comment on lines +117 to +125

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Default node limit differs between snapshot hot-path (80) and DB fallback (12)

When limit is not passed and q === '', the snapshot path caps at 80 nodes (limit ?? 80), but queryKgNodes(q, limit) with limit = undefined falls through to the DB's default of 12 (the parameter default in db.ts). Before kgSnapshot is populated (session start, before the first successful write), callers receive at most 12 nodes; after the first save completes they silently jump to 80. Chat-agent calls that rely on the "recent nodes" fallback get inconsistent context window sizes depending on whether the worker has completed its first write.

}
return queryKgNodes(q, cap)
})

ipcMain.handle('kg:searchFiles', async (_e, q: string, fileType?: string, limit?: number) =>
searchIndexedFiles(q, fileType, limit)
)
Expand Down
72 changes: 72 additions & 0 deletions desktop/windows/src/main/ipc/kgWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* KG write worker — runs in a Node.js worker_thread so the Electron main
* thread stays free for IPC during the synchronous SQLite replace transaction.
*
* Protocol (parentPort messages):
* Receive: { type: 'replace'; nodes: KgNode[]; edges: KgEdge[] }
* Send: { type: 'done'; ms: number }
* { type: 'error'; message: string }
*
* workerData: { dbPath: string }
*/
import { parentPort, workerData } from 'worker_threads'
import Database from 'better-sqlite3'

const d = new Database((workerData as { dbPath: string }).dbPath)
// WAL: readers on the main thread are not blocked while we hold the write lock.
d.pragma('journal_mode = WAL')
d.pragma('synchronous = NORMAL')

// Prepare all statements once at startup.
const insertNode = d.prepare(
`INSERT OR REPLACE INTO local_kg_nodes
(id, label, node_type, summary, source, created_at, aliases_json, source_refs)
VALUES (@id, @label, @nodeType, @summary, @source, @createdAt, @aliasesJson, @sourceRefs)`
)
const insertEdge = d.prepare(
`INSERT OR REPLACE INTO local_kg_edges (id, source_id, target_id, label, created_at)
VALUES (@id, @sourceId, @targetId, @label, @createdAt)`
)
const deleteEdges = d.prepare('DELETE FROM local_kg_edges')
const deleteNodes = d.prepare('DELETE FROM local_kg_nodes')

type KgNode = {
id: string
label: string
nodeType: string
summary: string
source: string
createdAt: number
aliases?: string[]
sourceRefs?: string[]
}
type KgEdge = { id: string; sourceId: string; targetId: string; label: string; createdAt: number }

const doReplace = d.transaction((nodes: KgNode[], edges: KgEdge[]) => {
deleteEdges.run()
deleteNodes.run()
for (const n of nodes) {
insertNode.run({
id: n.id,
label: n.label,
nodeType: n.nodeType,
summary: n.summary,
source: n.source,
createdAt: n.createdAt,
aliasesJson: n.aliases?.length ? JSON.stringify(n.aliases) : null,
sourceRefs: n.sourceRefs?.length ? JSON.stringify(n.sourceRefs) : null
})
}
for (const e of edges) insertEdge.run(e)
})

parentPort!.on('message', (msg: { type: string; nodes: KgNode[]; edges: KgEdge[] }) => {
if (msg.type !== 'replace') return
const t0 = performance.now()
try {
doReplace(msg.nodes, msg.edges)
parentPort!.postMessage({ type: 'done', ms: Math.round(performance.now() - t0) })
} catch (err) {
parentPort!.postMessage({ type: 'error', message: (err as Error).message })
}
})