Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions web/src/api/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { useEffect } from 'react';
import type { SessionEvent } from './types';

export function useEventSource(
url: string,
onMessage: (ev: SessionEvent) => void,
enabled: boolean = true,
) {
useEffect(() => {
if (!enabled) return;
const es = new EventSource(url);
es.onmessage = (ev) => {
try {
const data = JSON.parse(ev.data) as SessionEvent;
onMessage(data);
} catch {
// skip malformed
}
};
return () => es.close();
// onMessage intentionally omitted from deps to avoid reconnect on every render
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [url, enabled]);
}
28 changes: 28 additions & 0 deletions web/src/api/ws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { useEffect } from 'react';
import type { SessionEvent } from './types';

export function useWebSocket(
url: string,
onMessage: (ev: SessionEvent) => void,
enabled: boolean = true,
) {
useEffect(() => {
if (!enabled) return;
const ws = new WebSocket(url);
ws.onmessage = (ev) => {
try {
const data = JSON.parse(ev.data) as SessionEvent;
onMessage(data);
} catch {
// skip malformed
}
};
return () => {
if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) {
ws.close();
}
};
// onMessage intentionally omitted from deps to avoid reconnect on every render
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [url, enabled]);
}
104 changes: 104 additions & 0 deletions web/src/state/sessionReducer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import type {
Session, AgentRun, ToolCall, AgentDefinition,
SessionEvent, SessionFullBundle,
} from '@/api/types';

export interface SessionState {
session: Session | null;
agentsRun: AgentRun[];
toolCalls: ToolCall[];
events: SessionEvent[];
agentDefinitions: Record<string, AgentDefinition>;
vmSeq: number;
}

export const initialSessionState: SessionState = {
session: null,
agentsRun: [],
toolCalls: [],
events: [],
agentDefinitions: {},
vmSeq: 0,
};

type Action =
| { type: 'bootstrap'; bundle: SessionFullBundle }
| { type: 'event'; event: SessionEvent };

export function sessionReducer(state: SessionState, action: Action): SessionState {
switch (action.type) {
case 'bootstrap':
return {
session: action.bundle.session,
agentsRun: action.bundle.agents_run,
toolCalls: action.bundle.tool_calls,
events: action.bundle.events,
agentDefinitions: action.bundle.agent_definitions,
vmSeq: action.bundle.vm_seq,
};

case 'event': {
const ev = action.event;
if (ev.seq <= state.vmSeq) return state; // drop dupes / out-of-order
const events = [...state.events, ev];
let session = state.session;
let agentsRun = state.agentsRun;
let toolCalls = state.toolCalls;

switch (ev.kind) {
case 'agent_finished': {
const p = ev.payload as Partial<AgentRun>;
agentsRun = [...agentsRun, {
agent: p.agent ?? '',
started_at: p.started_at ?? '',
ended_at: p.ended_at ?? ev.ts,
summary: p.summary ?? '',
confidence: p.confidence ?? null,
confidence_rationale: p.confidence_rationale ?? null,
signal: p.signal ?? null,
}];
break;
}
case 'tool_invoked': {
const p = ev.payload as Partial<ToolCall>;
toolCalls = [...toolCalls, {
agent: p.agent ?? '',
tool: p.tool ?? '',
args: p.args ?? {},
result: p.result ?? null,
ts: ev.ts,
risk: p.risk ?? null,
status: 'executed',
approver: null,
approved_at: null,
approval_rationale: null,
}];
break;
}
case 'approval_pending': {
const p = ev.payload as Partial<ToolCall>;
toolCalls = [...toolCalls, {
agent: p.agent ?? '',
tool: p.tool ?? '',
args: p.args ?? {},
result: null,
ts: ev.ts,
risk: p.risk ?? null,
status: 'pending_approval',
approver: null,
approved_at: null,
approval_rationale: null,
}];
break;
}
case 'status_changed': {
const p = ev.payload as { status?: Session['status'] };
if (session && p.status) session = { ...session, status: p.status };
break;
}
}

return { ...state, events, session, agentsRun, toolCalls, vmSeq: ev.seq };
}
}
}
44 changes: 44 additions & 0 deletions web/tests/_helpers/MockEventSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
type Listener = (ev: MessageEvent) => void;

export class MockEventSource {
url: string;
readyState: number = 0; // CONNECTING
onopen: ((ev: Event) => void) | null = null;
onmessage: Listener | null = null;
onerror: ((ev: Event) => void) | null = null;
private listeners: Map<string, Listener[]> = new Map();
private static instances: MockEventSource[] = [];

constructor(url: string) {
this.url = url;
MockEventSource.instances.push(this);
setTimeout(() => {
this.readyState = 1; // OPEN
this.onopen?.(new Event('open'));
}, 0);
}

addEventListener(type: string, listener: Listener) {
const list = this.listeners.get(type) ?? [];
list.push(listener);
this.listeners.set(type, list);
}

emit(data: string) {
const ev = new MessageEvent('message', { data });
this.onmessage?.(ev);
this.listeners.get('message')?.forEach((l) => l(ev));
}

close() {
this.readyState = 2;
}

static lastInstance(): MockEventSource | undefined {
return MockEventSource.instances[MockEventSource.instances.length - 1];
}

static reset() {
MockEventSource.instances = [];
}
}
55 changes: 55 additions & 0 deletions web/tests/_helpers/MockWebSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
type Listener = (ev: MessageEvent) => void;

export class MockWebSocket {
static readonly CONNECTING = 0;
static readonly OPEN = 1;
static readonly CLOSING = 2;
static readonly CLOSED = 3;

url: string;
readyState: number = MockWebSocket.CONNECTING;
onopen: ((ev: Event) => void) | null = null;
onmessage: Listener | null = null;
onerror: ((ev: Event) => void) | null = null;
onclose: ((ev: CloseEvent) => void) | null = null;
private listeners: Map<string, Listener[]> = new Map();
private static instances: MockWebSocket[] = [];

constructor(url: string) {
this.url = url;
MockWebSocket.instances.push(this);
setTimeout(() => {
this.readyState = MockWebSocket.OPEN;
this.onopen?.(new Event('open'));
}, 0);
}

addEventListener(type: string, listener: Listener) {
const list = this.listeners.get(type) ?? [];
list.push(listener);
this.listeners.set(type, list);
}

emit(data: string) {
const ev = new MessageEvent('message', { data });
this.onmessage?.(ev);
this.listeners.get('message')?.forEach((l) => l(ev));
}

send(_data: string) {
// no-op in mock
}

close() {
this.readyState = MockWebSocket.CLOSED;
this.onclose?.(new CloseEvent('close'));
}

static lastInstance(): MockWebSocket | undefined {
return MockWebSocket.instances[MockWebSocket.instances.length - 1];
}

static reset() {
MockWebSocket.instances = [];
}
}
89 changes: 89 additions & 0 deletions web/tests/unit/sessionReducer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { describe, it, expect } from 'vitest';
import { sessionReducer, initialSessionState } from '@/state/sessionReducer';
import type { SessionFullBundle, SessionEvent } from '@/api/types';

const baseBundle: SessionFullBundle = {
session: {
id: 'SES-1', status: 'in_progress',
created_at: 'x', updated_at: 'x', deleted_at: null,
agents_run: [], tool_calls: [], findings: {},
pending_intervention: null, user_inputs: [],
parent_session_id: null, dedup_rationale: null,
extra_fields: {}, version: 1,
},
agents_run: [],
tool_calls: [],
events: [],
agent_definitions: {},
vm_seq: 0,
};

describe('sessionReducer', () => {
describe('action: bootstrap', () => {
it('replaces full state with bundle', () => {
const next = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle });
expect(next.vmSeq).toBe(0);
expect(next.session?.id).toBe('SES-1');
});
});

describe('action: event', () => {
it('drops events with seq <= vmSeq (idempotent)', () => {
const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: { ...baseBundle, vm_seq: 5 } });
const stale: SessionEvent = { seq: 3, kind: 'agent_started', payload: {}, ts: 'x' };
const next = sessionReducer(state, { type: 'event', event: stale });
expect(next).toBe(state); // no change
});

it('appends events with seq > vmSeq and bumps vmSeq', () => {
const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle });
const fresh: SessionEvent = { seq: 1, kind: 'agent_started', payload: { agent: 'intake' }, ts: 'x' };
const next = sessionReducer(state, { type: 'event', event: fresh });
expect(next.vmSeq).toBe(1);
expect(next.events).toHaveLength(1);
});

it('event "agent_finished" appends an AgentRun', () => {
const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle });
const finished: SessionEvent = {
seq: 1, kind: 'agent_finished', ts: 'x',
payload: { agent: 'intake', summary: 'done', confidence: 0.9 },
};
const next = sessionReducer(state, { type: 'event', event: finished });
expect(next.agentsRun).toHaveLength(1);
expect(next.agentsRun[0]?.agent).toBe('intake');
});

it('event "tool_invoked" appends a ToolCall with status="executed"', () => {
const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle });
const tool: SessionEvent = {
seq: 1, kind: 'tool_invoked', ts: 'x',
payload: { agent: 'triage', tool: 'obs:get_logs', args: {}, result: 'ok' },
};
const next = sessionReducer(state, { type: 'event', event: tool });
expect(next.toolCalls).toHaveLength(1);
expect(next.toolCalls[0]?.status).toBe('executed');
});

it('event "approval_pending" inserts a pending tool call', () => {
const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle });
const ev: SessionEvent = {
seq: 1, kind: 'approval_pending', ts: 'x',
payload: { agent: 'investigate', tool: 'rem:propose_fix', args: {}, risk: 'high' },
};
const next = sessionReducer(state, { type: 'event', event: ev });
expect(next.toolCalls[0]?.status).toBe('pending_approval');
expect(next.toolCalls[0]?.risk).toBe('high');
});

it('event "status_changed" updates session.status', () => {
const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle });
const ev: SessionEvent = {
seq: 1, kind: 'status_changed', ts: 'x',
payload: { status: 'resolved' },
};
const next = sessionReducer(state, { type: 'event', event: ev });
expect(next.session?.status).toBe('resolved');
});
});
});
30 changes: 30 additions & 0 deletions web/tests/unit/sse.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { renderHook, act } from '@testing-library/react';
import { useEventSource } from '@/api/sse';
import { MockEventSource } from '../_helpers/MockEventSource';

describe('useEventSource', () => {
beforeEach(() => {
MockEventSource.reset();
// @ts-expect-error global override
global.EventSource = MockEventSource;
});

it('opens an EventSource at the given URL', async () => {
renderHook(() => useEventSource('/api/v1/sessions/SES-1/events', () => {}));
await Promise.resolve();
expect(MockEventSource.lastInstance()?.url).toBe('/api/v1/sessions/SES-1/events');
});

it('calls onMessage with parsed JSON payload per data: line', async () => {
const onMessage = vi.fn();
renderHook(() => useEventSource('/x', onMessage));
await Promise.resolve();
act(() => {
MockEventSource.lastInstance()!.emit('{"seq":1,"kind":"agent_started","payload":{},"ts":"x"}');
});
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({ seq: 1, kind: 'agent_started' }),
);
});
});
Loading
Loading