From cddb3b9fe75ba4dc17840105b266782bcf80b5d3 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Sat, 16 May 2026 04:21:13 +0000 Subject: [PATCH 1/3] feat(web): useEventSource hook + MockEventSource test helper --- web/src/api/sse.ts | 24 +++++++++++++++ web/tests/_helpers/MockEventSource.ts | 44 +++++++++++++++++++++++++++ web/tests/unit/sse.test.ts | 30 ++++++++++++++++++ 3 files changed, 98 insertions(+) create mode 100644 web/src/api/sse.ts create mode 100644 web/tests/_helpers/MockEventSource.ts create mode 100644 web/tests/unit/sse.test.ts diff --git a/web/src/api/sse.ts b/web/src/api/sse.ts new file mode 100644 index 0000000..f67296a --- /dev/null +++ b/web/src/api/sse.ts @@ -0,0 +1,24 @@ +import { useEffect } from 'react'; +import type { SessionEvent } from './types'; + +export function useEventSource( + url: string, + onMessage: (ev: SessionEvent) => void, + enabled: boolean = true, +) { + useEffect(() => { + if (!enabled) return; + const es = new EventSource(url); + es.onmessage = (ev) => { + try { + const data = JSON.parse(ev.data) as SessionEvent; + onMessage(data); + } catch { + // skip malformed + } + }; + return () => es.close(); + // onMessage intentionally omitted from deps to avoid reconnect on every render + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [url, enabled]); +} diff --git a/web/tests/_helpers/MockEventSource.ts b/web/tests/_helpers/MockEventSource.ts new file mode 100644 index 0000000..27663f1 --- /dev/null +++ b/web/tests/_helpers/MockEventSource.ts @@ -0,0 +1,44 @@ +type Listener = (ev: MessageEvent) => void; + +export class MockEventSource { + url: string; + readyState: number = 0; // CONNECTING + onopen: ((ev: Event) => void) | null = null; + onmessage: Listener | null = null; + onerror: ((ev: Event) => void) | null = null; + private listeners: Map = new Map(); + private static instances: MockEventSource[] = []; + + constructor(url: string) { + this.url = url; + MockEventSource.instances.push(this); + setTimeout(() => { + this.readyState = 1; // OPEN + this.onopen?.(new Event('open')); + }, 0); + } + + addEventListener(type: string, listener: Listener) { + const list = this.listeners.get(type) ?? []; + list.push(listener); + this.listeners.set(type, list); + } + + emit(data: string) { + const ev = new MessageEvent('message', { data }); + this.onmessage?.(ev); + this.listeners.get('message')?.forEach((l) => l(ev)); + } + + close() { + this.readyState = 2; + } + + static lastInstance(): MockEventSource | undefined { + return MockEventSource.instances[MockEventSource.instances.length - 1]; + } + + static reset() { + MockEventSource.instances = []; + } +} diff --git a/web/tests/unit/sse.test.ts b/web/tests/unit/sse.test.ts new file mode 100644 index 0000000..4ca6bda --- /dev/null +++ b/web/tests/unit/sse.test.ts @@ -0,0 +1,30 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { renderHook, act } from '@testing-library/react'; +import { useEventSource } from '@/api/sse'; +import { MockEventSource } from '../_helpers/MockEventSource'; + +describe('useEventSource', () => { + beforeEach(() => { + MockEventSource.reset(); + // @ts-expect-error global override + global.EventSource = MockEventSource; + }); + + it('opens an EventSource at the given URL', async () => { + renderHook(() => useEventSource('/api/v1/sessions/SES-1/events', () => {})); + await Promise.resolve(); + expect(MockEventSource.lastInstance()?.url).toBe('/api/v1/sessions/SES-1/events'); + }); + + it('calls onMessage with parsed JSON payload per data: line', async () => { + const onMessage = vi.fn(); + renderHook(() => useEventSource('/x', onMessage)); + await Promise.resolve(); + act(() => { + MockEventSource.lastInstance()!.emit('{"seq":1,"kind":"agent_started","payload":{},"ts":"x"}'); + }); + expect(onMessage).toHaveBeenCalledWith( + expect.objectContaining({ seq: 1, kind: 'agent_started' }), + ); + }); +}); From 5940489c38978e013db586d2a3113b3e6a98fa87 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Sat, 16 May 2026 04:22:43 +0000 Subject: [PATCH 2/3] feat(web): useWebSocket fallback transport hook --- web/src/api/ws.ts | 28 +++++++++++++++ web/tests/_helpers/MockWebSocket.ts | 55 +++++++++++++++++++++++++++++ web/tests/unit/ws.test.ts | 46 ++++++++++++++++++++++++ 3 files changed, 129 insertions(+) create mode 100644 web/src/api/ws.ts create mode 100644 web/tests/_helpers/MockWebSocket.ts create mode 100644 web/tests/unit/ws.test.ts diff --git a/web/src/api/ws.ts b/web/src/api/ws.ts new file mode 100644 index 0000000..30b5201 --- /dev/null +++ b/web/src/api/ws.ts @@ -0,0 +1,28 @@ +import { useEffect } from 'react'; +import type { SessionEvent } from './types'; + +export function useWebSocket( + url: string, + onMessage: (ev: SessionEvent) => void, + enabled: boolean = true, +) { + useEffect(() => { + if (!enabled) return; + const ws = new WebSocket(url); + ws.onmessage = (ev) => { + try { + const data = JSON.parse(ev.data) as SessionEvent; + onMessage(data); + } catch { + // skip malformed + } + }; + return () => { + if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) { + ws.close(); + } + }; + // onMessage intentionally omitted from deps to avoid reconnect on every render + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [url, enabled]); +} diff --git a/web/tests/_helpers/MockWebSocket.ts b/web/tests/_helpers/MockWebSocket.ts new file mode 100644 index 0000000..bd7951c --- /dev/null +++ b/web/tests/_helpers/MockWebSocket.ts @@ -0,0 +1,55 @@ +type Listener = (ev: MessageEvent) => void; + +export class MockWebSocket { + static readonly CONNECTING = 0; + static readonly OPEN = 1; + static readonly CLOSING = 2; + static readonly CLOSED = 3; + + url: string; + readyState: number = MockWebSocket.CONNECTING; + onopen: ((ev: Event) => void) | null = null; + onmessage: Listener | null = null; + onerror: ((ev: Event) => void) | null = null; + onclose: ((ev: CloseEvent) => void) | null = null; + private listeners: Map = new Map(); + private static instances: MockWebSocket[] = []; + + constructor(url: string) { + this.url = url; + MockWebSocket.instances.push(this); + setTimeout(() => { + this.readyState = MockWebSocket.OPEN; + this.onopen?.(new Event('open')); + }, 0); + } + + addEventListener(type: string, listener: Listener) { + const list = this.listeners.get(type) ?? []; + list.push(listener); + this.listeners.set(type, list); + } + + emit(data: string) { + const ev = new MessageEvent('message', { data }); + this.onmessage?.(ev); + this.listeners.get('message')?.forEach((l) => l(ev)); + } + + send(_data: string) { + // no-op in mock + } + + close() { + this.readyState = MockWebSocket.CLOSED; + this.onclose?.(new CloseEvent('close')); + } + + static lastInstance(): MockWebSocket | undefined { + return MockWebSocket.instances[MockWebSocket.instances.length - 1]; + } + + static reset() { + MockWebSocket.instances = []; + } +} diff --git a/web/tests/unit/ws.test.ts b/web/tests/unit/ws.test.ts new file mode 100644 index 0000000..a737f9c --- /dev/null +++ b/web/tests/unit/ws.test.ts @@ -0,0 +1,46 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { renderHook, act } from '@testing-library/react'; +import { useWebSocket } from '@/api/ws'; +import { MockWebSocket } from '../_helpers/MockWebSocket'; + +describe('useWebSocket', () => { + beforeEach(() => { + MockWebSocket.reset(); + // @ts-expect-error global override + global.WebSocket = MockWebSocket; + }); + + it('opens a WebSocket at the given URL', async () => { + renderHook(() => useWebSocket('ws://test/api/v1/sessions/SES-1/ws', () => {})); + await Promise.resolve(); + expect(MockWebSocket.lastInstance()?.url).toBe('ws://test/api/v1/sessions/SES-1/ws'); + }); + + it('calls onMessage with parsed JSON payload per message event', async () => { + const onMessage = vi.fn(); + renderHook(() => useWebSocket('ws://x', onMessage)); + await Promise.resolve(); + act(() => { + MockWebSocket.lastInstance()!.emit('{"seq":2,"kind":"tool_invoked","payload":{},"ts":"y"}'); + }); + expect(onMessage).toHaveBeenCalledWith( + expect.objectContaining({ seq: 2, kind: 'tool_invoked' }), + ); + }); + + it('skips messages with malformed JSON', async () => { + const onMessage = vi.fn(); + renderHook(() => useWebSocket('ws://x', onMessage)); + await Promise.resolve(); + act(() => { + MockWebSocket.lastInstance()!.emit('not json'); + }); + expect(onMessage).not.toHaveBeenCalled(); + }); + + it('does not connect when enabled=false', async () => { + renderHook(() => useWebSocket('ws://x', () => {}, false)); + await Promise.resolve(); + expect(MockWebSocket.lastInstance()).toBeUndefined(); + }); +}); From 2ff92c4b46f170a0a66ec262c3a48a07f41473ee Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Sat, 16 May 2026 04:22:03 +0000 Subject: [PATCH 3/3] feat(web): sessionReducer pure delta-application with vm_seq watermark --- web/src/state/sessionReducer.ts | 104 ++++++++++++++++++++++++++ web/tests/unit/sessionReducer.test.ts | 89 ++++++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 web/src/state/sessionReducer.ts create mode 100644 web/tests/unit/sessionReducer.test.ts diff --git a/web/src/state/sessionReducer.ts b/web/src/state/sessionReducer.ts new file mode 100644 index 0000000..9ffd1ab --- /dev/null +++ b/web/src/state/sessionReducer.ts @@ -0,0 +1,104 @@ +import type { + Session, AgentRun, ToolCall, AgentDefinition, + SessionEvent, SessionFullBundle, +} from '@/api/types'; + +export interface SessionState { + session: Session | null; + agentsRun: AgentRun[]; + toolCalls: ToolCall[]; + events: SessionEvent[]; + agentDefinitions: Record; + vmSeq: number; +} + +export const initialSessionState: SessionState = { + session: null, + agentsRun: [], + toolCalls: [], + events: [], + agentDefinitions: {}, + vmSeq: 0, +}; + +type Action = + | { type: 'bootstrap'; bundle: SessionFullBundle } + | { type: 'event'; event: SessionEvent }; + +export function sessionReducer(state: SessionState, action: Action): SessionState { + switch (action.type) { + case 'bootstrap': + return { + session: action.bundle.session, + agentsRun: action.bundle.agents_run, + toolCalls: action.bundle.tool_calls, + events: action.bundle.events, + agentDefinitions: action.bundle.agent_definitions, + vmSeq: action.bundle.vm_seq, + }; + + case 'event': { + const ev = action.event; + if (ev.seq <= state.vmSeq) return state; // drop dupes / out-of-order + const events = [...state.events, ev]; + let session = state.session; + let agentsRun = state.agentsRun; + let toolCalls = state.toolCalls; + + switch (ev.kind) { + case 'agent_finished': { + const p = ev.payload as Partial; + agentsRun = [...agentsRun, { + agent: p.agent ?? '', + started_at: p.started_at ?? '', + ended_at: p.ended_at ?? ev.ts, + summary: p.summary ?? '', + confidence: p.confidence ?? null, + confidence_rationale: p.confidence_rationale ?? null, + signal: p.signal ?? null, + }]; + break; + } + case 'tool_invoked': { + const p = ev.payload as Partial; + toolCalls = [...toolCalls, { + agent: p.agent ?? '', + tool: p.tool ?? '', + args: p.args ?? {}, + result: p.result ?? null, + ts: ev.ts, + risk: p.risk ?? null, + status: 'executed', + approver: null, + approved_at: null, + approval_rationale: null, + }]; + break; + } + case 'approval_pending': { + const p = ev.payload as Partial; + toolCalls = [...toolCalls, { + agent: p.agent ?? '', + tool: p.tool ?? '', + args: p.args ?? {}, + result: null, + ts: ev.ts, + risk: p.risk ?? null, + status: 'pending_approval', + approver: null, + approved_at: null, + approval_rationale: null, + }]; + break; + } + case 'status_changed': { + const p = ev.payload as { status?: Session['status'] }; + if (session && p.status) session = { ...session, status: p.status }; + break; + } + } + + return { ...state, events, session, agentsRun, toolCalls, vmSeq: ev.seq }; + } + } +} diff --git a/web/tests/unit/sessionReducer.test.ts b/web/tests/unit/sessionReducer.test.ts new file mode 100644 index 0000000..973a1cd --- /dev/null +++ b/web/tests/unit/sessionReducer.test.ts @@ -0,0 +1,89 @@ +import { describe, it, expect } from 'vitest'; +import { sessionReducer, initialSessionState } from '@/state/sessionReducer'; +import type { SessionFullBundle, SessionEvent } from '@/api/types'; + +const baseBundle: SessionFullBundle = { + session: { + id: 'SES-1', status: 'in_progress', + created_at: 'x', updated_at: 'x', deleted_at: null, + agents_run: [], tool_calls: [], findings: {}, + pending_intervention: null, user_inputs: [], + parent_session_id: null, dedup_rationale: null, + extra_fields: {}, version: 1, + }, + agents_run: [], + tool_calls: [], + events: [], + agent_definitions: {}, + vm_seq: 0, +}; + +describe('sessionReducer', () => { + describe('action: bootstrap', () => { + it('replaces full state with bundle', () => { + const next = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle }); + expect(next.vmSeq).toBe(0); + expect(next.session?.id).toBe('SES-1'); + }); + }); + + describe('action: event', () => { + it('drops events with seq <= vmSeq (idempotent)', () => { + const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: { ...baseBundle, vm_seq: 5 } }); + const stale: SessionEvent = { seq: 3, kind: 'agent_started', payload: {}, ts: 'x' }; + const next = sessionReducer(state, { type: 'event', event: stale }); + expect(next).toBe(state); // no change + }); + + it('appends events with seq > vmSeq and bumps vmSeq', () => { + const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle }); + const fresh: SessionEvent = { seq: 1, kind: 'agent_started', payload: { agent: 'intake' }, ts: 'x' }; + const next = sessionReducer(state, { type: 'event', event: fresh }); + expect(next.vmSeq).toBe(1); + expect(next.events).toHaveLength(1); + }); + + it('event "agent_finished" appends an AgentRun', () => { + const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle }); + const finished: SessionEvent = { + seq: 1, kind: 'agent_finished', ts: 'x', + payload: { agent: 'intake', summary: 'done', confidence: 0.9 }, + }; + const next = sessionReducer(state, { type: 'event', event: finished }); + expect(next.agentsRun).toHaveLength(1); + expect(next.agentsRun[0]?.agent).toBe('intake'); + }); + + it('event "tool_invoked" appends a ToolCall with status="executed"', () => { + const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle }); + const tool: SessionEvent = { + seq: 1, kind: 'tool_invoked', ts: 'x', + payload: { agent: 'triage', tool: 'obs:get_logs', args: {}, result: 'ok' }, + }; + const next = sessionReducer(state, { type: 'event', event: tool }); + expect(next.toolCalls).toHaveLength(1); + expect(next.toolCalls[0]?.status).toBe('executed'); + }); + + it('event "approval_pending" inserts a pending tool call', () => { + const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle }); + const ev: SessionEvent = { + seq: 1, kind: 'approval_pending', ts: 'x', + payload: { agent: 'investigate', tool: 'rem:propose_fix', args: {}, risk: 'high' }, + }; + const next = sessionReducer(state, { type: 'event', event: ev }); + expect(next.toolCalls[0]?.status).toBe('pending_approval'); + expect(next.toolCalls[0]?.risk).toBe('high'); + }); + + it('event "status_changed" updates session.status', () => { + const state = sessionReducer(initialSessionState, { type: 'bootstrap', bundle: baseBundle }); + const ev: SessionEvent = { + seq: 1, kind: 'status_changed', ts: 'x', + payload: { status: 'resolved' }, + }; + const next = sessionReducer(state, { type: 'event', event: ev }); + expect(next.session?.status).toBe('resolved'); + }); + }); +});