Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions packages/core/src/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,27 @@ export function initSchema(db: Database.Database): void {
db.prepare('INSERT OR REPLACE INTO schema_version (version) VALUES (?)').run(43);
}

// v44: thread identity (resolution-as-fan-out slice 1). Adds
// memories.thread_id (correlation id minted by the engine's threads
// registry) + memories.supersede_reason (audit trail for the supersede
// action). Index lives here, not SCHEMA_SQL, per the v42 boot-order rule.
if (currentVersion < 44 && v40Applied) {
const memoryColumns = new Set(
(db.prepare(`PRAGMA table_info(memories)`).all() as Array<{ name: string }>).map((row) => row.name)
);

if (!memoryColumns.has('thread_id')) {
db.exec(`ALTER TABLE memories ADD COLUMN thread_id TEXT`);
}
if (!memoryColumns.has('supersede_reason')) {
db.exec(`ALTER TABLE memories ADD COLUMN supersede_reason TEXT`);
}

db.exec(`CREATE INDEX IF NOT EXISTS idx_memories_thread_id ON memories(thread_id)`);

db.prepare('INSERT OR REPLACE INTO schema_version (version) VALUES (?)').run(44);
}

// Only stamp SCHEMA_VERSION at the end if every migration ran. Dry-run
// skips v40 → leave schema_version at 39 so the next non-dry-run boot
// re-enters the v40 branch.
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// =============================================================================

/** Current schema version - bump when schema changes */
export const SCHEMA_VERSION = 43;
export const SCHEMA_VERSION = 44;

/** State database filename */
export const STATE_DB_FILENAME = 'state.db';
Expand Down Expand Up @@ -391,13 +391,16 @@ CREATE TABLE IF NOT EXISTS memories (
ttl_days INTEGER,
superseded_by INTEGER REFERENCES memories(id),
visibility TEXT NOT NULL DEFAULT 'shared',
owner_scope TEXT NOT NULL DEFAULT 'global'
owner_scope TEXT NOT NULL DEFAULT 'global',
thread_id TEXT,
supersede_reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_memories_key ON memories(key);
CREATE INDEX IF NOT EXISTS idx_memories_entity ON memories(entity);
CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type);
-- idx_memories_key_owner_scope (UNIQUE) and idx_memories_owner_scope are created
-- by the v42 migration in migrations.ts, after that migration adds owner_scope.
-- idx_memories_thread_id is created by the v44 migration, after it adds thread_id.

CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
key, value,
Expand Down
118 changes: 109 additions & 9 deletions packages/mcp-server/src/core/write/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export interface Memory {
superseded_by: number | null;
visibility: string;
owner_scope: string;
thread_id: string | null;
supersede_reason: string | null;
}

export interface StoreMemoryOptions {
Expand All @@ -45,6 +47,23 @@ export interface StoreMemoryOptions {
agent_id?: string;
session_id?: string;
visibility?: 'shared' | 'private';
thread_id?: string;
}

export interface SupersedeMemoryOptions {
/** Supersede every current memory carrying this thread correlation id. */
thread_id?: string;
/** Supersede the single current memory under this key (caller's scope). */
key?: string;
/** Audit reason recorded on each superseded row (e.g. "thread-resolved"). */
reason?: string;
agent_id?: string;
}

export interface SupersedeMemoryResult {
superseded: Array<{ id: number; key: string; owner_scope: string }>;
/** Rows matched but already superseded — idempotent no-op count. */
already_superseded: number;
}

export interface SearchMemoryOptions {
Expand Down Expand Up @@ -222,6 +241,7 @@ export function storeMemory(
agent_id,
session_id,
visibility = 'shared',
thread_id,
} = options;
const ownerScope = resolveOwnerScope(agent_id, visibility);

Expand All @@ -237,40 +257,53 @@ export function storeMemory(

// Check if memory with this key already exists
const existing = stateDb.db.prepare(
'SELECT id FROM memories WHERE key = ? AND owner_scope = ?'
).get(key, ownerScope) as { id: number } | undefined;
'SELECT id, superseded_by FROM memories WHERE key = ? AND owner_scope = ?'
).get(key, ownerScope) as { id: number; superseded_by: number | null } | undefined;

if (existing) {
// Upsert: update existing memory
// Upsert: update existing memory.
//
// Supersession is PRESERVED, never reset (thread-identity slice 1).
// The previous shape set `superseded_by = NULL` here, which meant any
// routine same-key re-store (cron jobs idempotently re-storing facts)
// silently resurrected a tombstoned memory — the upsert-resurrection
// bug. A superseded fact stays superseded; explicit revival is
// forget-then-store. thread_id is updated only when the caller provides
// one, so re-stores without thread context don't strip the correlation id.
stateDb.db.prepare(`
UPDATE memories SET
value = ?, memory_type = ?, entity = ?, entities_json = ?,
source_agent_id = ?, source_session_id = ?,
confidence = ?, updated_at = ?, accessed_at = ?,
ttl_days = ?, visibility = ?, owner_scope = ?, superseded_by = NULL
ttl_days = ?, visibility = ?, owner_scope = ?,
thread_id = COALESCE(?, thread_id)
WHERE key = ? AND owner_scope = ?
`).run(
value, type, entity ?? null, entitiesJson,
agent_id ?? null, session_id ?? null,
confidence, now, now,
ttl_days ?? null, visibility, ownerScope, key, ownerScope,
ttl_days ?? null, visibility, ownerScope,
thread_id ?? null,
key, ownerScope,
);
} else {
// Insert new memory
stateDb.db.prepare(`
INSERT INTO memories (key, value, memory_type, entity, entities_json,
source_agent_id, source_session_id, confidence,
created_at, updated_at, accessed_at, ttl_days, visibility, owner_scope)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
created_at, updated_at, accessed_at, ttl_days, visibility, owner_scope, thread_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(
key, value, type, entity ?? null, entitiesJson,
agent_id ?? null, session_id ?? null, confidence,
now, now, now, ttl_days ?? null, visibility, ownerScope,
now, now, now, ttl_days ?? null, visibility, ownerScope, thread_id ?? null,
);
}

// Private memories must not leak into shared graph-derived signals.
if (visibility === 'shared') {
// A superseded row stays out of the graph: its edges were removed at
// supersede time and a re-store must not resurrect them either.
if (visibility === 'shared' && !(existing && existing.superseded_by !== null)) {
updateGraphSignals(stateDb, key, detectedEntities);
}

Expand Down Expand Up @@ -423,6 +456,73 @@ export function forgetMemory(
return true;
}

/**
* Supersede memories — the resolution-as-fan-out consumer (slice 1).
*
* Marks current memories as superseded (tombstoned, retained for audit)
* without deleting them. Targets either every current row carrying a
* thread correlation id, or the single current row under a key in the
* caller's scope. Idempotent: already-superseded rows are counted, not
* an error, and a repeat call is a no-op.
*
* superseded_by is set to the row's own id — the self-pointer is the
* tombstone-without-successor marker (every read path already filters
* `superseded_by IS NULL`, so the fact disappears from get/search/list/
* brief the moment this commits). supersede_reason carries the audit why.
*
* Graph edges for shared/global rows are removed (mirrors forgetMemory) so
* stale memory:{key} edges don't outlive the fact.
*/
export function supersedeMemories(
stateDb: StateDb,
options: SupersedeMemoryOptions,
): SupersedeMemoryResult {
const { thread_id, key, reason, agent_id } = options;

if (!thread_id && !key) {
throw new Error('supersede requires thread_id or key');
}

const conditions: string[] = [];
const params: unknown[] = [];

if (thread_id) {
conditions.push('thread_id = ?');
params.push(thread_id);
}
if (key) {
conditions.push('key = ?');
params.push(key);
}
// Scope rule (council round 1): a caller may only close facts it can see —
// global rows plus its own agent scope. Never another agent's private facts.
applyVisibilityFilter(conditions, params, agent_id);

const matched = stateDb.db.prepare(
`SELECT id, key, owner_scope, visibility, superseded_by FROM memories WHERE ${conditions.join(' AND ')}`
).all(...params) as Array<Pick<Memory, 'id' | 'key' | 'owner_scope' | 'visibility' | 'superseded_by'>>;

const current = matched.filter((m) => m.superseded_by === null);
const alreadySuperseded = matched.length - current.length;

const now = Date.now();
const stamp = stateDb.db.prepare(
'UPDATE memories SET superseded_by = id, supersede_reason = ?, updated_at = ? WHERE id = ? AND superseded_by IS NULL'
);

for (const m of current) {
stamp.run(reason ?? null, now, m.id);
if (m.visibility === 'shared' && m.owner_scope === GLOBAL_OWNER_SCOPE) {
removeGraphSignals(stateDb, m.key);
}
}

return {
superseded: current.map((m) => ({ id: m.id, key: m.key, owner_scope: m.owner_scope })),
already_superseded: alreadySuperseded,
};
}

// =============================================================================
// SESSION SUMMARIES
// =============================================================================
Expand Down
Loading
Loading