diff --git a/kun/src/adapters/in-memory-approval-gate.ts b/kun/src/adapters/in-memory-approval-gate.ts index 5614511c..f2b44ae8 100644 --- a/kun/src/adapters/in-memory-approval-gate.ts +++ b/kun/src/adapters/in-memory-approval-gate.ts @@ -1,23 +1,41 @@ import type { ApprovalGate } from '../ports/approval-gate.js' import type { ApprovalRequest } from '../domain/approval.js' -import { resolveApprovalRequest } from '../domain/approval.js' type PendingResolver = { resolve: (decision: 'allow' | 'deny') => void reject: (error: Error) => void } +const DEFAULT_PENDING_TIMEOUT_MS = 10 * 60 * 1000 + /** * In-memory approval gate. The HTTP layer posts decisions into * `decide`; the loop awaits the `request` promise to learn whether * the user allowed or denied the call. + * + * Pending approvals auto-expire after `pendingTimeoutMs` (default 10 min) + * to prevent unbounded growth from abandoned requests. Resolved entries + * are removed immediately since the event log is the source of truth. */ export class InMemoryApprovalGate implements ApprovalGate { private readonly approvals = new Map() private readonly resolvers = new Map() + private readonly timers = new Map>() + private readonly pendingTimeoutMs: number + + constructor(options?: { pendingTimeoutMs?: number }) { + this.pendingTimeoutMs = + options?.pendingTimeoutMs ?? DEFAULT_PENDING_TIMEOUT_MS + } request(approval: ApprovalRequest): Promise<'allow' | 'deny'> { this.approvals.set(approval.id, approval) + const timer = setTimeout( + () => this.expire(approval.id), + this.pendingTimeoutMs + ) + if (typeof timer.unref === 'function') timer.unref() + this.timers.set(approval.id, timer) return new Promise<'allow' | 'deny'>((resolve, reject) => { this.resolvers.set(approval.id, { resolve, reject }) }) @@ -26,10 +44,8 @@ export class InMemoryApprovalGate implements ApprovalGate { decide(approvalId: string, decision: 'allow' | 'deny', reason?: string): boolean { const approval = this.approvals.get(approvalId) if (!approval) return false - const resolved = resolveApprovalRequest(approval, decision, reason) - this.approvals.set(approvalId, resolved) const resolver = this.resolvers.get(approvalId) - this.resolvers.delete(approvalId) + this.cleanupEntry(approvalId) resolver?.resolve(decision) return true } @@ -49,4 +65,33 @@ export class InMemoryApprovalGate implements ApprovalGate { resolve(approvalId: string, decision: 'allow' | 'deny', reason?: string): boolean { return this.decide(approvalId, decision, reason) } + + /** + * Reject all pending approvals and clear timers. Call this on startup + * to drain any state left over from a previous process, or during + * graceful shutdown. + */ + drainAllPending(reason = 'approval drained'): void { + for (const approvalId of [...this.approvals.keys()]) { + this.expire(approvalId, reason) + } + } + + private expire(approvalId: string, reason = 'approval expired'): void { + const approval = this.approvals.get(approvalId) + if (!approval) return + const resolver = this.resolvers.get(approvalId) + this.cleanupEntry(approvalId) + resolver?.reject(new Error(`${reason}: ${approvalId}`)) + } + + private cleanupEntry(approvalId: string): void { + this.approvals.delete(approvalId) + this.resolvers.delete(approvalId) + const timer = this.timers.get(approvalId) + if (timer) { + clearTimeout(timer) + this.timers.delete(approvalId) + } + } } diff --git a/kun/src/adapters/model/deepseek-compat-model-client.ts b/kun/src/adapters/model/deepseek-compat-model-client.ts index ec7c78f1..7430ec9b 100644 --- a/kun/src/adapters/model/deepseek-compat-model-client.ts +++ b/kun/src/adapters/model/deepseek-compat-model-client.ts @@ -26,6 +26,19 @@ export type DeepseekCompatConfig = { nonStreaming?: boolean /** Maximum idle time between streaming chunks before the turn fails. */ streamIdleTimeoutMs?: number + /** + * Retry configuration for transient HTTP errors (429, 5xx) and + * network failures. Retries only happen before any streaming chunk + * is yielded, so the consumer never sees partial + retried output. + */ + retry?: { + /** Total attempts including the initial request. Defaults to 3. */ + maxAttempts?: number + /** Base delay for exponential backoff in ms. Defaults to 500. */ + baseDelayMs?: number + /** Upper bound for a single backoff delay in ms. Defaults to 8000. */ + maxDelayMs?: number + } } type ChatMessage = { @@ -86,6 +99,15 @@ type StreamReadResult = | { kind: 'error'; message: string } const DEFAULT_STREAM_IDLE_TIMEOUT_MS = 45_000 +const DEFAULT_RETRY_MAX_ATTEMPTS = 3 +const DEFAULT_RETRY_BASE_DELAY_MS = 500 +const DEFAULT_RETRY_MAX_DELAY_MS = 8_000 + +type ResolvedRetryOptions = { + maxAttempts: number + baseDelayMs: number + maxDelayMs: number +} /** * DeepSeek-compatible model client. @@ -129,22 +151,43 @@ export class DeepseekCompatModelClient implements ModelClient { body: JSON.stringify(body), signal: request.abortSignal } - let response: Response - try { - response = await this.fetchImpl(url, init) - } catch (error) { - const message = error instanceof Error ? error.message : String(error) - yield { kind: 'error', message: `model request failed: ${message}` } - return - } - if (!response.ok) { - const text = await response.text() - const classified = await this.classifyHttpError(response.status, text) - yield { - kind: 'error', - message: classified.message, - code: classified.code + let response: Response | undefined + const retry = this.retryOptions() + for (let attempt = 0; ; attempt += 1) { + if (request.abortSignal.aborted) { + yield { kind: 'error', message: 'request was aborted before start' } + return + } + try { + response = await this.fetchImpl(url, init) + } catch (error) { + const message = error instanceof Error ? error.message : String(error) + if (attempt < retry.maxAttempts - 1) { + await sleep(computeBackoffDelay(attempt, retry), request.abortSignal) + continue + } + yield { kind: 'error', message: `model request failed: ${message}` } + return } + if (response.ok) break + if (attempt >= retry.maxAttempts - 1 || !isRetryableHttpStatus(response.status)) { + const text = await response.text() + const classified = await this.classifyHttpError(response.status, text) + yield { + kind: 'error', + message: classified.message, + code: classified.code + } + return + } + const retryAfterMs = parseRetryAfterMs(response.headers.get('retry-after')) + await sleep( + Math.min(retryAfterMs ?? computeBackoffDelay(attempt, retry), retry.maxDelayMs), + request.abortSignal + ) + } + if (!response) { + yield { kind: 'error', message: 'model request produced no response' } return } if (this.config.nonStreaming || response.headers.get('content-type')?.includes('application/json')) { @@ -175,6 +218,15 @@ export class DeepseekCompatModelClient implements ModelClient { return { ...headers, ...(this.config.headers ?? {}) } } + private retryOptions(): ResolvedRetryOptions { + const retry = this.config.retry + return { + maxAttempts: Math.max(1, Math.floor(retry?.maxAttempts ?? DEFAULT_RETRY_MAX_ATTEMPTS)), + baseDelayMs: Math.max(0, Math.floor(retry?.baseDelayMs ?? DEFAULT_RETRY_BASE_DELAY_MS)), + maxDelayMs: Math.max(1, Math.floor(retry?.maxDelayMs ?? DEFAULT_RETRY_MAX_DELAY_MS)) + } + } + private async classifyHttpError(status: number, text: string): Promise<{ message: string; code: string }> { const body = text.slice(0, 500) if (status === 429) { @@ -1046,3 +1098,42 @@ function limitHistoryPreservingCompaction(history: TurnItem[], windowSize: numbe } return limited } + +function isRetryableHttpStatus(status: number): boolean { + return status === 429 || (status >= 500 && status <= 599) +} + +function parseRetryAfterMs(value: string | null): number | undefined { + if (!value) return undefined + const trimmed = value.trim() + const seconds = Number(trimmed) + if (Number.isFinite(seconds) && seconds >= 0) { + return Math.min(Math.ceil(seconds * 1000), 60_000) + } + const date = Date.parse(trimmed) + if (Number.isFinite(date)) { + return Math.max(0, Math.ceil(date - Date.now())) + } + return undefined +} + +function computeBackoffDelay(attempt: number, options: ResolvedRetryOptions): number { + const raw = options.baseDelayMs * Math.pow(2, attempt) + const jitter = raw * 0.25 * (Math.random() * 2 - 1) + return Math.max(0, Math.min(Math.ceil(raw + jitter), options.maxDelayMs)) +} + +function sleep(ms: number, signal: AbortSignal): Promise { + if (ms <= 0 || signal.aborted) return Promise.resolve() + return new Promise((resolve) => { + const timer = setTimeout(resolve, ms) + signal.addEventListener( + 'abort', + () => { + clearTimeout(timer) + resolve() + }, + { once: true } + ) + }) +} diff --git a/kun/src/loop/agent-loop.ts b/kun/src/loop/agent-loop.ts index ec477237..37c7c37d 100644 --- a/kun/src/loop/agent-loop.ts +++ b/kun/src/loop/agent-loop.ts @@ -32,6 +32,7 @@ import { type PrefixVolatilityFinding } from '../cache/prefix-volatility.js' import { buildToolCatalogFingerprint } from '../cache/tool-catalog-fingerprint.js' +import { TtlLruCache } from '../cache/ttl-lru-cache.js' import { makeUserItem, makeAssistantTextItem, @@ -285,9 +286,15 @@ export type AgentLoopOptions = { export class AgentLoop { private readonly opts: AgentLoopOptions private readonly autoModelRoutes = new Map() - private readonly promptTokenPressure = new Map() + private readonly promptTokenPressure = new TtlLruCache({ + limit: 128, + ttlMs: 10 * 60 * 1000 + }) private readonly toolStormBreakers = new Map() - private readonly toolCatalogSnapshots = new Map() + private readonly toolCatalogSnapshots = new TtlLruCache({ + limit: 64, + ttlMs: 30 * 60 * 1000 + }) constructor(opts: AgentLoopOptions) { this.opts = opts diff --git a/kun/src/server/routes/index.ts b/kun/src/server/routes/index.ts index ec307941..169751b3 100644 --- a/kun/src/server/routes/index.ts +++ b/kun/src/server/routes/index.ts @@ -150,7 +150,11 @@ export function buildRouter(runtime: ServerRuntime): Router { }) router.add('DELETE', '/v1/threads/:id', async (request, ctx) => { if (!authorize(request, runtime)) return ERRORS.unauthorized() - return deleteThread(runtime.threadService, ctx.params.id) + const response = await deleteThread(runtime.threadService, ctx.params.id) + if (response.status === 200) { + runtime.usageService.reset(ctx.params.id) + } + return response }) router.add('POST', '/v1/threads/:id/fork', async (request, ctx) => { if (!authorize(request, runtime)) return ERRORS.unauthorized() diff --git a/kun/tests/model-client.test.ts b/kun/tests/model-client.test.ts index 06163931..cd677218 100644 --- a/kun/tests/model-client.test.ts +++ b/kun/tests/model-client.test.ts @@ -1168,4 +1168,162 @@ describe('DeepseekCompatModelClient', () => { }) expect(chunks.find((chunk) => chunk.kind === 'completed')).toBeUndefined() }) + + it('retries on HTTP 429 and succeeds on a later attempt', async () => { + const successResponse = { + id: 'ok', + model: 'deepseek-chat', + choices: [{ index: 0, finish_reason: 'stop', message: { role: 'assistant', content: 'done' } }] + } + let calls = 0 + const fetchImpl: typeof fetch = async () => { + calls += 1 + if (calls < 3) { + return new Response('rate limited', { + status: 429, + headers: { 'retry-after': '0.01' } + }) + } + return new Response(JSON.stringify(successResponse), { + status: 200, + headers: { 'content-type': 'application/json' } + }) + } + const client = new DeepseekCompatModelClient({ + baseUrl: 'https://example.com/beta', + apiKey: 'k', + model: 'deepseek-chat', + fetchImpl, + nonStreaming: true, + retry: { maxAttempts: 3, baseDelayMs: 1, maxDelayMs: 10 } + }) + const chunks = [] + for await (const chunk of client.stream(buildRequest(new AbortController().signal))) { + chunks.push(chunk) + } + expect(calls).toBe(3) + expect(chunks.some((c) => c.kind === 'completed')).toBe(true) + expect(chunks.some((c) => c.kind === 'error')).toBe(false) + }) + + it('honours Retry-After header value for backoff', async () => { + const successResponse = { + id: 'ok', + model: 'deepseek-chat', + choices: [{ index: 0, finish_reason: 'stop', message: { role: 'assistant', content: 'done' } }] + } + let calls = 0 + const fetchImpl: typeof fetch = async () => { + calls += 1 + if (calls === 1) { + return new Response('rate limited', { + status: 429, + headers: { 'retry-after': '2' } + }) + } + return new Response(JSON.stringify(successResponse), { + status: 200, + headers: { 'content-type': 'application/json' } + }) + } + const client = new DeepseekCompatModelClient({ + baseUrl: 'https://example.com/beta', + apiKey: 'k', + model: 'deepseek-chat', + fetchImpl, + nonStreaming: true, + retry: { maxAttempts: 2, baseDelayMs: 10_000, maxDelayMs: 10_000 } + }) + // Use fake timers to verify the retry-after value is respected. + const start = Date.now() + const chunks: Array<{ kind: string }> = [] + for await (const chunk of client.stream(buildRequest(new AbortController().signal))) { + chunks.push(chunk) + } + const elapsed = Date.now() - start + // Retry-After: 2 means at least ~2s of delay before second attempt. + expect(elapsed).toBeGreaterThanOrEqual(1900) + expect(calls).toBe(2) + expect(chunks.some((c: any) => c.kind === 'completed')).toBe(true) + }) + + it('does not retry on non-retryable HTTP errors (e.g. 400)', async () => { + let calls = 0 + const fetchImpl: typeof fetch = async () => { + calls += 1 + return new Response('bad request', { status: 400 }) + } + const client = new DeepseekCompatModelClient({ + baseUrl: 'https://example.com/beta', + apiKey: 'k', + model: 'deepseek-chat', + fetchImpl, + nonStreaming: true, + retry: { maxAttempts: 3, baseDelayMs: 1, maxDelayMs: 10 } + }) + const chunks = [] + for await (const chunk of client.stream(buildRequest(new AbortController().signal))) { + chunks.push(chunk) + } + expect(calls).toBe(1) + expect(chunks[0].kind).toBe('error') + }) + + it('retries on network errors before yielding any chunk', async () => { + const successResponse = { + id: 'ok', + model: 'deepseek-chat', + choices: [{ index: 0, finish_reason: 'stop', message: { role: 'assistant', content: 'done' } }] + } + let calls = 0 + const fetchImpl: typeof fetch = async () => { + calls += 1 + if (calls === 1) throw new TypeError('network error') + return new Response(JSON.stringify(successResponse), { + status: 200, + headers: { 'content-type': 'application/json' } + }) + } + const client = new DeepseekCompatModelClient({ + baseUrl: 'https://example.com/beta', + apiKey: 'k', + model: 'deepseek-chat', + fetchImpl, + nonStreaming: true, + retry: { maxAttempts: 3, baseDelayMs: 1, maxDelayMs: 10 } + }) + const chunks = [] + for await (const chunk of client.stream(buildRequest(new AbortController().signal))) { + chunks.push(chunk) + } + expect(calls).toBe(2) + expect(chunks.some((c) => c.kind === 'completed')).toBe(true) + }) + + it('respects abort signal during retry backoff', async () => { + const controller = new AbortController() + let calls = 0 + const fetchImpl: typeof fetch = async () => { + calls += 1 + return new Response('rate limited', { status: 429 }) + } + const client = new DeepseekCompatModelClient({ + baseUrl: 'https://example.com/beta', + apiKey: 'k', + model: 'deepseek-chat', + fetchImpl, + nonStreaming: true, + retry: { maxAttempts: 5, baseDelayMs: 10_000, maxDelayMs: 10_000 } + }) + const request = buildRequest(controller.signal) + // Abort during the first backoff delay. + setTimeout(() => controller.abort(), 10) + const chunks = [] + for await (const chunk of client.stream(request)) { + chunks.push(chunk) + } + expect(calls).toBe(1) + // After abort, the loop should stop and yield an error. + expect(chunks.some((c) => c.kind === 'error')).toBe(true) + }) }) diff --git a/kun/tests/ports.test.ts b/kun/tests/ports.test.ts index ba36996c..93ede05a 100644 --- a/kun/tests/ports.test.ts +++ b/kun/tests/ports.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from 'vitest' +import { describe, expect, it, vi } from 'vitest' import { InMemoryEventBus } from '../src/adapters/in-memory-event-bus.js' import { InMemoryApprovalGate } from '../src/adapters/in-memory-approval-gate.js' import { InMemoryThreadStore } from '../src/adapters/in-memory-thread-store.js' @@ -61,6 +61,60 @@ describe('InMemoryApprovalGate', () => { ) expect(gate.pending('th1')).toHaveLength(1) }) + + it('removes the entry after decide so resolved approvals do not leak', async () => { + const gate = new InMemoryApprovalGate() + const approval = createApprovalRequest({ + id: 'clean', + threadId: 't', + turnId: 'tu', + toolName: 'echo', + summary: 's' + }) + const pending = gate.request(approval) + gate.decide('clean', 'deny') + await expect(pending).resolves.toBe('deny') + // Entry must be gone — no lingering state after resolution. + expect(gate.get('clean')).toBeUndefined() + expect(gate.pending()).toHaveLength(0) + }) + + it('auto-expires pending approvals after the timeout and rejects the promise', async () => { + vi.useFakeTimers() + try { + const gate = new InMemoryApprovalGate({ pendingTimeoutMs: 1000 }) + const approval = createApprovalRequest({ + id: 'exp', + threadId: 't', + turnId: 'tu', + toolName: 'echo', + summary: 's' + }) + const pending = gate.request(approval) + expect(gate.pending()).toHaveLength(1) + vi.advanceTimersByTime(1001) + await expect(pending).rejects.toThrow('expired') + expect(gate.pending()).toHaveLength(0) + expect(gate.get('exp')).toBeUndefined() + } finally { + vi.useRealTimers() + } + }) + + it('drainAllPending rejects every pending request and clears state', async () => { + const gate = new InMemoryApprovalGate() + const a = gate.request( + createApprovalRequest({ id: 'd1', threadId: 't', turnId: 'tu', toolName: 'x', summary: 's' }) + ) + const b = gate.request( + createApprovalRequest({ id: 'd2', threadId: 't', turnId: 'tu', toolName: 'y', summary: 's' }) + ) + expect(gate.pending()).toHaveLength(2) + gate.drainAllPending('shutdown') + await expect(a).rejects.toThrow('shutdown') + await expect(b).rejects.toThrow('shutdown') + expect(gate.pending()).toHaveLength(0) + }) }) describe('InMemoryThreadStore', () => {