Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,42 @@ entrypoint = "MemoryService"
binding = "TAROTSCRIPT"
service = "tarotscript-worker"
```

## Chat WebSocket Durable Object

The `/chat/ws` route uses a Durable Object binding named `CHAT_SESSION` to keep terminal and browser chat sessions attached to one ordered stream.

```toml
[[durable_objects.bindings]]
name = "CHAT_SESSION"
class_name = "ChatSession"

[[migrations]]
tag = "v1-chat-session"
new_sqlite_classes = ["ChatSession"]
```

The route is protected by the same `AEGIS_TOKEN` as the HTTP API. Connect with `wss://<host>/chat/ws?token=<AEGIS_TOKEN>` and request the `aegis-chat` subprotocol.

Client frames. `eventId` is optional but recommended for reconnect/replay deduplication:

```json
{ "type": "message", "text": "What changed today?", "conversationId": "optional-uuid", "eventId": "optional-client-event-id" }
```

Server frames:

```json
{ "type": "history", "conversationId": null, "messages": [] }
{ "type": "start", "conversationId": "uuid" }
{ "type": "delta", "text": "partial text" }
{ "type": "done", "conversationId": "uuid", "metadata": { "id": "message-id" } }
{ "type": "error", "error": "message" }
```

Fresh databases created from `schema.sql` include the required `conversations.user_id` column. Existing deployments should add it before enabling `/chat/ws`:

```sql
ALTER TABLE conversations ADD COLUMN user_id TEXT NOT NULL DEFAULT 'operator';
CREATE INDEX IF NOT EXISTS idx_conversations_user_updated ON conversations(user_id, updated_at);
```
2 changes: 2 additions & 0 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ npx wrangler d1 create my-agent

This prints a `database_id` — paste it into `wrangler.toml` under `[[d1_databases]]`.

The example Wrangler config also includes the `CHAT_SESSION` Durable Object binding and SQLite-backed migration required by `/chat/ws`.

Then run the schema migration:

```bash
Expand Down
3 changes: 3 additions & 0 deletions web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@
"./routes/health": "./src/routes/health.ts",
"./routes/sessions": "./src/routes/sessions.ts",
"./routes/messages": "./src/routes/messages.ts",
"./routes/chat-ws": "./src/routes/chat-ws.ts",
"./routes/conversations": "./src/routes/conversations.ts",
"./routes/feedback": "./src/routes/feedback.ts",
"./routes/observability": "./src/routes/observability.ts",
"./routes/cc-tasks": "./src/routes/cc-tasks.ts",
"./routes/pages": "./src/routes/pages.ts",
"./routes/dynamic-tools": "./src/routes/dynamic-tools.ts",
"./adapters/voice": "./src/adapters/voice/cloudflare-agent.ts",
"./durable-objects/chat-session": "./src/durable-objects/chat-session.ts",
"./durable-objects/chat-session-auth": "./src/durable-objects/chat-session-auth.ts",
"./schema-enums": "./src/schema-enums.ts",
"./contracts/goal": "./src/contracts/goal.contract.ts",
"./contracts/agenda-item": "./src/contracts/agenda-item.contract.ts",
Expand Down
2 changes: 2 additions & 0 deletions web/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
user_id TEXT NOT NULL DEFAULT 'operator',
title TEXT
);

Expand Down Expand Up @@ -200,6 +201,7 @@ CREATE TABLE IF NOT EXISTS operator_log (

CREATE INDEX IF NOT EXISTS idx_messages_conversation ON messages(conversation_id);
CREATE INDEX IF NOT EXISTS idx_messages_created ON messages(created_at);
CREATE INDEX IF NOT EXISTS idx_conversations_user_updated ON conversations(user_id, updated_at);
CREATE INDEX IF NOT EXISTS idx_memory_topic ON memory_entries(topic);
CREATE INDEX IF NOT EXISTS idx_memory_dedup ON memory_entries(topic, fact_hash);
CREATE INDEX IF NOT EXISTS idx_memory_expires ON memory_entries(expires_at);
Expand Down
5 changes: 5 additions & 0 deletions web/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { observability } from './routes/observability.js';
import { pages } from './routes/pages.js';
import { ccTasks } from './routes/cc-tasks.js';
import { messages } from './routes/messages.js';
import { chatWs } from './routes/chat-ws.js';
import { dynamicToolsRoutes } from './routes/dynamic-tools.js';

// ─── Scheduled Task Plugin ──────────────────────────────────
Expand Down Expand Up @@ -213,6 +214,7 @@ export function createAegisApp(config: AegisAppConfig): AegisApp {
app.route('/', pages);
app.route('/', ccTasks);
app.route('/', messages);
app.route('/', chatWs);
app.route('/', dynamicToolsRoutes);

// ── Extension routes ──
Expand Down Expand Up @@ -268,6 +270,7 @@ export function createAegisApp(config: AegisAppConfig): AegisApp {
pages,
ccTasks,
messages,
chatWs,
dynamicTools: dynamicToolsRoutes,
};

Expand All @@ -288,6 +291,8 @@ export type {
MessageMetadata,
} from './types.js';

export { ChatSession } from './durable-objects/chat-session.js';

export type { EdgeEnv } from './kernel/dispatch.js';

export type {
Expand Down
20 changes: 20 additions & 0 deletions web/src/durable-objects/chat-session-auth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;

export type ConversationOwnership = 'owned' | 'not_owned' | 'not_found';

export function isValidConversationId(id: string): boolean {
return UUID_RE.test(id);
}

export async function verifyConversationOwnership(
db: D1Database,
conversationId: string,
userId: string,
): Promise<ConversationOwnership> {
const row = await db.prepare(
'SELECT user_id FROM conversations WHERE id = ?',
).bind(conversationId).first<{ user_id: string | null }>();

if (!row) return 'not_found';
return (row.user_id ?? 'operator') === userId ? 'owned' : 'not_owned';
}
239 changes: 239 additions & 0 deletions web/src/durable-objects/chat-session.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
import type { Env, MessageMetadata } from '../types.js';
import { buildEdgeEnv } from '../edge-env.js';
import { createIntent, dispatchStream } from '../kernel/dispatch.js';
import type { DispatchResult } from '../kernel/types.js';
import { isValidConversationId, verifyConversationOwnership } from './chat-session-auth.js';
import { z } from 'zod';

const MessageFrameSchema = z.object({
type: z.literal('message'),
text: z.string().trim().min(1),
conversationId: z.string().optional(),
eventId: z.string().trim().min(1).optional(),
}).passthrough();

type StoredMessage = {
id: string;
role: 'user' | 'assistant';
content: string;
metadata: string | null;
created_at: string;
};

export class ChatSession implements DurableObject {
constructor(
private readonly state: DurableObjectState,
private readonly env: Env,
) {}

async fetch(request: Request): Promise<Response> {
if (request.headers.get('Upgrade')?.toLowerCase() !== 'websocket') {
return new Response('Expected WebSocket upgrade', { status: 426 });
}

const pair = new WebSocketPair();
const client = pair[0];
const server = pair[1];
const url = new URL(request.url);
const userId = this.userId();
const conversationId = url.searchParams.get('conversationId');

this.state.acceptWebSocket(server, ['aegis-chat']);
this.state.waitUntil(this.sendHistory(server, conversationId, userId));

const headers = new Headers();
if (request.headers.get('Sec-WebSocket-Protocol')?.split(',').map((p) => p.trim()).includes('aegis-chat')) {
headers.set('Sec-WebSocket-Protocol', 'aegis-chat');
}

return new Response(null, {
status: 101,
webSocket: client,
headers,
});
}

async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
let payload: unknown;
try {
payload = JSON.parse(typeof message === 'string' ? message : new TextDecoder().decode(message));
} catch {
this.send(ws, { type: 'error', error: 'Invalid JSON frame' });
return;
}

if (!isRecord(payload) || payload.type !== 'message') return;

const parsed = MessageFrameSchema.safeParse(payload);
if (!parsed.success) {
this.send(ws, { type: 'error', error: 'Invalid message frame' });
return;
}

const eventId = parsed.data.eventId ?? crypto.randomUUID();
const eventClaimed = await this.claimEvent(eventId);
if (!eventClaimed) {
this.send(ws, { type: 'error', error: 'duplicate event' });
return;
}

const text = parsed.data.text;
const userId = this.userId();
const conversationId = await this.resolveConversationId(ws, parsed.data.conversationId, userId, text);
if (!conversationId) return;

const userMessageId = crypto.randomUUID();
await this.env.DB.prepare(
'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)',
).bind(userMessageId, conversationId, 'user', text).run();

this.send(ws, { type: 'start', conversationId });

try {
const edgeEnv = buildEdgeEnv(this.env);
const intent = createIntent(conversationId, text);
const result = await dispatchStream(intent, edgeEnv, (delta) => {
this.send(ws, { type: 'delta', text: delta });
});

const assistantMessageId = crypto.randomUUID();
const metadata = buildMessageMetadata(result);

await this.env.DB.prepare(
'INSERT INTO messages (id, conversation_id, role, content, metadata) VALUES (?, ?, ?, ?, ?)',
).bind(assistantMessageId, conversationId, 'assistant', result.text, JSON.stringify(metadata)).run();

await this.env.DB.prepare(
"UPDATE conversations SET updated_at = datetime('now') WHERE id = ?",
).bind(conversationId).run();

this.send(ws, { type: 'done', conversationId, metadata: { id: assistantMessageId, ...metadata } });
} catch (err) {
const error = err instanceof Error ? err.message : String(err);
this.send(ws, { type: 'error', error });
}
}

private async resolveConversationId(
ws: WebSocket,
rawConversationId: unknown,
userId: string,
firstMessage: string,
): Promise<string | null> {
if (rawConversationId !== undefined && typeof rawConversationId !== 'string') {
this.send(ws, { type: 'error', error: 'conversationId must be a UUID string' });
return null;
}

const conversationId = rawConversationId ?? crypto.randomUUID();
if (!isValidConversationId(conversationId)) {
this.send(ws, { type: 'error', error: 'conversationId must be a UUID string' });
return null;
}

const ownership = await verifyConversationOwnership(this.env.DB, conversationId, userId);
if (ownership === 'not_owned') {
this.send(ws, { type: 'error', error: 'conversation not found' });
return null;
}
if (ownership === 'owned') return conversationId;

await this.env.DB.prepare(
'INSERT INTO conversations (id, title, user_id) VALUES (?, ?, ?)',
).bind(conversationId, firstMessage.slice(0, 100), userId).run();
return conversationId;
}

private async claimEvent(eventId: string): Promise<boolean> {
const existing = await this.env.DB.prepare(
'SELECT event_id FROM web_events WHERE event_id = ?',
).bind(eventId).first();
if (existing) return false;

await this.env.DB.prepare(
'INSERT INTO web_events (event_id) VALUES (?)',
).bind(eventId).run();
return true;
}

private async sendHistory(ws: WebSocket, conversationId: string | null, userId: string): Promise<void> {
if (!conversationId) {
this.send(ws, { type: 'history', conversationId: null, messages: [] });
return;
}
if (!isValidConversationId(conversationId)) {
this.send(ws, { type: 'error', error: 'conversationId must be a UUID string' });
return;
}

const ownership = await verifyConversationOwnership(this.env.DB, conversationId, userId);
if (ownership === 'not_owned') {
this.send(ws, { type: 'error', error: 'conversation not found' });
return;
}
if (ownership === 'not_found') {
this.send(ws, { type: 'history', conversationId, messages: [] });
return;
}

const rows = await this.env.DB.prepare(
`SELECT m.id, m.role, m.content, m.metadata, m.created_at
FROM messages m
JOIN conversations c ON c.id = m.conversation_id
WHERE m.conversation_id = ? AND c.user_id = ?
ORDER BY m.created_at ASC`,
).bind(conversationId, userId).all<StoredMessage>();

this.send(ws, {
type: 'history',
conversationId,
messages: rows.results.map((message) => ({
...message,
metadata: parseMetadata(message.metadata),
})),
});
}

private userId(): string {
return this.state.id.name ?? 'operator';
}

private send(ws: WebSocket, frame: Record<string, unknown>): void {
try {
ws.send(JSON.stringify(frame));
} catch {
// Ignore writes after the client disconnects.
}
}
}

function buildMessageMetadata(result: DispatchResult): MessageMetadata {
return {
classification: result.classification,
executor: result.executor,
procHit: result.procedureHit,
latencyMs: result.latency_ms,
cost: result.cost,
confidence: result.confidence,
reclassified: result.reclassified,
probeResult: result.probeResult,
grounded: result.grounded,
sources: result.sources,
unknowns: result.unknowns,
searched: result.searched,
unverifiedClaims: result.unverified_claims,
};
}

function parseMetadata(value: string | null): MessageMetadata | null {
if (!value) return null;
try {
return JSON.parse(value) as MessageMetadata;
} catch {
return null;
}
}

function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null;
}
Loading