From f27cb3f6811a1ec94d437685fdd0a431366b287e Mon Sep 17 00:00:00 2001 From: xuyingzhou Date: Mon, 8 Jun 2026 12:21:33 +0800 Subject: [PATCH 1/3] feat(model): add retry with backoff for 429/5xx and network errors The DeepSeek-compatible model client previously failed the entire turn on the first transient error (rate limit, server hiccup, network blip). This adds exponential backoff with jitter, honors the Retry-After header, and aborts cleanly if the turn is cancelled mid-backoff. Retries only happen before any streaming chunk is yielded, so consumers never see partial-then-retried output. Adds 5 tests covering: 429 retry success, Retry-After header, non-retryable errors, network error retry, and abort during backoff. --- .../model/deepseek-compat-model-client.ts | 121 ++++++++++++-- kun/tests/model-client.test.ts | 158 ++++++++++++++++++ 2 files changed, 264 insertions(+), 15 deletions(-) diff --git a/kun/src/adapters/model/deepseek-compat-model-client.ts b/kun/src/adapters/model/deepseek-compat-model-client.ts index ec7c78f1..7430ec9b 100644 --- a/kun/src/adapters/model/deepseek-compat-model-client.ts +++ b/kun/src/adapters/model/deepseek-compat-model-client.ts @@ -26,6 +26,19 @@ export type DeepseekCompatConfig = { nonStreaming?: boolean /** Maximum idle time between streaming chunks before the turn fails. */ streamIdleTimeoutMs?: number + /** + * Retry configuration for transient HTTP errors (429, 5xx) and + * network failures. Retries only happen before any streaming chunk + * is yielded, so the consumer never sees partial + retried output. + */ + retry?: { + /** Total attempts including the initial request. Defaults to 3. */ + maxAttempts?: number + /** Base delay for exponential backoff in ms. Defaults to 500. */ + baseDelayMs?: number + /** Upper bound for a single backoff delay in ms. Defaults to 8000. */ + maxDelayMs?: number + } } type ChatMessage = { @@ -86,6 +99,15 @@ type StreamReadResult = | { kind: 'error'; message: string } const DEFAULT_STREAM_IDLE_TIMEOUT_MS = 45_000 +const DEFAULT_RETRY_MAX_ATTEMPTS = 3 +const DEFAULT_RETRY_BASE_DELAY_MS = 500 +const DEFAULT_RETRY_MAX_DELAY_MS = 8_000 + +type ResolvedRetryOptions = { + maxAttempts: number + baseDelayMs: number + maxDelayMs: number +} /** * DeepSeek-compatible model client. @@ -129,22 +151,43 @@ export class DeepseekCompatModelClient implements ModelClient { body: JSON.stringify(body), signal: request.abortSignal } - let response: Response - try { - response = await this.fetchImpl(url, init) - } catch (error) { - const message = error instanceof Error ? error.message : String(error) - yield { kind: 'error', message: `model request failed: ${message}` } - return - } - if (!response.ok) { - const text = await response.text() - const classified = await this.classifyHttpError(response.status, text) - yield { - kind: 'error', - message: classified.message, - code: classified.code + let response: Response | undefined + const retry = this.retryOptions() + for (let attempt = 0; ; attempt += 1) { + if (request.abortSignal.aborted) { + yield { kind: 'error', message: 'request was aborted before start' } + return + } + try { + response = await this.fetchImpl(url, init) + } catch (error) { + const message = error instanceof Error ? error.message : String(error) + if (attempt < retry.maxAttempts - 1) { + await sleep(computeBackoffDelay(attempt, retry), request.abortSignal) + continue + } + yield { kind: 'error', message: `model request failed: ${message}` } + return } + if (response.ok) break + if (attempt >= retry.maxAttempts - 1 || !isRetryableHttpStatus(response.status)) { + const text = await response.text() + const classified = await this.classifyHttpError(response.status, text) + yield { + kind: 'error', + message: classified.message, + code: classified.code + } + return + } + const retryAfterMs = parseRetryAfterMs(response.headers.get('retry-after')) + await sleep( + Math.min(retryAfterMs ?? computeBackoffDelay(attempt, retry), retry.maxDelayMs), + request.abortSignal + ) + } + if (!response) { + yield { kind: 'error', message: 'model request produced no response' } return } if (this.config.nonStreaming || response.headers.get('content-type')?.includes('application/json')) { @@ -175,6 +218,15 @@ export class DeepseekCompatModelClient implements ModelClient { return { ...headers, ...(this.config.headers ?? {}) } } + private retryOptions(): ResolvedRetryOptions { + const retry = this.config.retry + return { + maxAttempts: Math.max(1, Math.floor(retry?.maxAttempts ?? DEFAULT_RETRY_MAX_ATTEMPTS)), + baseDelayMs: Math.max(0, Math.floor(retry?.baseDelayMs ?? DEFAULT_RETRY_BASE_DELAY_MS)), + maxDelayMs: Math.max(1, Math.floor(retry?.maxDelayMs ?? DEFAULT_RETRY_MAX_DELAY_MS)) + } + } + private async classifyHttpError(status: number, text: string): Promise<{ message: string; code: string }> { const body = text.slice(0, 500) if (status === 429) { @@ -1046,3 +1098,42 @@ function limitHistoryPreservingCompaction(history: TurnItem[], windowSize: numbe } return limited } + +function isRetryableHttpStatus(status: number): boolean { + return status === 429 || (status >= 500 && status <= 599) +} + +function parseRetryAfterMs(value: string | null): number | undefined { + if (!value) return undefined + const trimmed = value.trim() + const seconds = Number(trimmed) + if (Number.isFinite(seconds) && seconds >= 0) { + return Math.min(Math.ceil(seconds * 1000), 60_000) + } + const date = Date.parse(trimmed) + if (Number.isFinite(date)) { + return Math.max(0, Math.ceil(date - Date.now())) + } + return undefined +} + +function computeBackoffDelay(attempt: number, options: ResolvedRetryOptions): number { + const raw = options.baseDelayMs * Math.pow(2, attempt) + const jitter = raw * 0.25 * (Math.random() * 2 - 1) + return Math.max(0, Math.min(Math.ceil(raw + jitter), options.maxDelayMs)) +} + +function sleep(ms: number, signal: AbortSignal): Promise { + if (ms <= 0 || signal.aborted) return Promise.resolve() + return new Promise((resolve) => { + const timer = setTimeout(resolve, ms) + signal.addEventListener( + 'abort', + () => { + clearTimeout(timer) + resolve() + }, + { once: true } + ) + }) +} diff --git a/kun/tests/model-client.test.ts b/kun/tests/model-client.test.ts index 06163931..cd677218 100644 --- a/kun/tests/model-client.test.ts +++ b/kun/tests/model-client.test.ts @@ -1168,4 +1168,162 @@ describe('DeepseekCompatModelClient', () => { }) expect(chunks.find((chunk) => chunk.kind === 'completed')).toBeUndefined() }) + + it('retries on HTTP 429 and succeeds on a later attempt', async () => { + const successResponse = { + id: 'ok', + model: 'deepseek-chat', + choices: [{ index: 0, finish_reason: 'stop', message: { role: 'assistant', content: 'done' } }] + } + let calls = 0 + const fetchImpl: typeof fetch = async () => { + calls += 1 + if (calls < 3) { + return new Response('rate limited', { + status: 429, + headers: { 'retry-after': '0.01' } + }) + } + return new Response(JSON.stringify(successResponse), { + status: 200, + headers: { 'content-type': 'application/json' } + }) + } + const client = new DeepseekCompatModelClient({ + baseUrl: 'https://example.com/beta', + apiKey: 'k', + model: 'deepseek-chat', + fetchImpl, + nonStreaming: true, + retry: { maxAttempts: 3, baseDelayMs: 1, maxDelayMs: 10 } + }) + const chunks = [] + for await (const chunk of client.stream(buildRequest(new AbortController().signal))) { + chunks.push(chunk) + } + expect(calls).toBe(3) + expect(chunks.some((c) => c.kind === 'completed')).toBe(true) + expect(chunks.some((c) => c.kind === 'error')).toBe(false) + }) + + it('honours Retry-After header value for backoff', async () => { + const successResponse = { + id: 'ok', + model: 'deepseek-chat', + choices: [{ index: 0, finish_reason: 'stop', message: { role: 'assistant', content: 'done' } }] + } + let calls = 0 + const fetchImpl: typeof fetch = async () => { + calls += 1 + if (calls === 1) { + return new Response('rate limited', { + status: 429, + headers: { 'retry-after': '2' } + }) + } + return new Response(JSON.stringify(successResponse), { + status: 200, + headers: { 'content-type': 'application/json' } + }) + } + const client = new DeepseekCompatModelClient({ + baseUrl: 'https://example.com/beta', + apiKey: 'k', + model: 'deepseek-chat', + fetchImpl, + nonStreaming: true, + retry: { maxAttempts: 2, baseDelayMs: 10_000, maxDelayMs: 10_000 } + }) + // Use fake timers to verify the retry-after value is respected. + const start = Date.now() + const chunks: Array<{ kind: string }> = [] + for await (const chunk of client.stream(buildRequest(new AbortController().signal))) { + chunks.push(chunk) + } + const elapsed = Date.now() - start + // Retry-After: 2 means at least ~2s of delay before second attempt. + expect(elapsed).toBeGreaterThanOrEqual(1900) + expect(calls).toBe(2) + expect(chunks.some((c: any) => c.kind === 'completed')).toBe(true) + }) + + it('does not retry on non-retryable HTTP errors (e.g. 400)', async () => { + let calls = 0 + const fetchImpl: typeof fetch = async () => { + calls += 1 + return new Response('bad request', { status: 400 }) + } + const client = new DeepseekCompatModelClient({ + baseUrl: 'https://example.com/beta', + apiKey: 'k', + model: 'deepseek-chat', + fetchImpl, + nonStreaming: true, + retry: { maxAttempts: 3, baseDelayMs: 1, maxDelayMs: 10 } + }) + const chunks = [] + for await (const chunk of client.stream(buildRequest(new AbortController().signal))) { + chunks.push(chunk) + } + expect(calls).toBe(1) + expect(chunks[0].kind).toBe('error') + }) + + it('retries on network errors before yielding any chunk', async () => { + const successResponse = { + id: 'ok', + model: 'deepseek-chat', + choices: [{ index: 0, finish_reason: 'stop', message: { role: 'assistant', content: 'done' } }] + } + let calls = 0 + const fetchImpl: typeof fetch = async () => { + calls += 1 + if (calls === 1) throw new TypeError('network error') + return new Response(JSON.stringify(successResponse), { + status: 200, + headers: { 'content-type': 'application/json' } + }) + } + const client = new DeepseekCompatModelClient({ + baseUrl: 'https://example.com/beta', + apiKey: 'k', + model: 'deepseek-chat', + fetchImpl, + nonStreaming: true, + retry: { maxAttempts: 3, baseDelayMs: 1, maxDelayMs: 10 } + }) + const chunks = [] + for await (const chunk of client.stream(buildRequest(new AbortController().signal))) { + chunks.push(chunk) + } + expect(calls).toBe(2) + expect(chunks.some((c) => c.kind === 'completed')).toBe(true) + }) + + it('respects abort signal during retry backoff', async () => { + const controller = new AbortController() + let calls = 0 + const fetchImpl: typeof fetch = async () => { + calls += 1 + return new Response('rate limited', { status: 429 }) + } + const client = new DeepseekCompatModelClient({ + baseUrl: 'https://example.com/beta', + apiKey: 'k', + model: 'deepseek-chat', + fetchImpl, + nonStreaming: true, + retry: { maxAttempts: 5, baseDelayMs: 10_000, maxDelayMs: 10_000 } + }) + const request = buildRequest(controller.signal) + // Abort during the first backoff delay. + setTimeout(() => controller.abort(), 10) + const chunks = [] + for await (const chunk of client.stream(request)) { + chunks.push(chunk) + } + expect(calls).toBe(1) + // After abort, the loop should stop and yield an error. + expect(chunks.some((c) => c.kind === 'error')).toBe(true) + }) }) From 40683eaf208197821b7c51404d4a78e45ebd29fc Mon Sep 17 00:00:00 2001 From: xuyingzhou Date: Mon, 8 Jun 2026 12:22:13 +0800 Subject: [PATCH 2/3] fix(mem): bound in-memory maps with TtlLruCache, fix approval gate leak, clean telemetry on thread delete --- kun/src/adapters/in-memory-approval-gate.ts | 52 +++++++++++++++++-- kun/src/loop/agent-loop.ts | 11 +++- kun/src/server/routes/index.ts | 6 ++- kun/tests/ports.test.ts | 56 ++++++++++++++++++++- 4 files changed, 118 insertions(+), 7 deletions(-) diff --git a/kun/src/adapters/in-memory-approval-gate.ts b/kun/src/adapters/in-memory-approval-gate.ts index 5614511c..f5542771 100644 --- a/kun/src/adapters/in-memory-approval-gate.ts +++ b/kun/src/adapters/in-memory-approval-gate.ts @@ -1,23 +1,41 @@ import type { ApprovalGate } from '../ports/approval-gate.js' import type { ApprovalRequest } from '../domain/approval.js' -import { resolveApprovalRequest } from '../domain/approval.js' type PendingResolver = { resolve: (decision: 'allow' | 'deny') => void reject: (error: Error) => void } +const DEFAULT_PENDING_TIMEOUT_MS = 10 * 60 * 1000 + /** * In-memory approval gate. The HTTP layer posts decisions into * `decide`; the loop awaits the `request` promise to learn whether * the user allowed or denied the call. + * + * Pending approvals auto-expire after `pendingTimeoutMs` (default 10 min) + * to prevent unbounded growth from abandoned requests. Resolved entries + * are removed immediately since the event log is the source of truth. */ export class InMemoryApprovalGate implements ApprovalGate { private readonly approvals = new Map() private readonly resolvers = new Map() + private readonly timers = new Map>() + private readonly pendingTimeoutMs: number + + constructor(options?: { pendingTimeoutMs?: number }) { + this.pendingTimeoutMs = + options?.pendingTimeoutMs ?? DEFAULT_PENDING_TIMEOUT_MS + } request(approval: ApprovalRequest): Promise<'allow' | 'deny'> { this.approvals.set(approval.id, approval) + const timer = setTimeout( + () => this.expire(approval.id), + this.pendingTimeoutMs + ) + if (typeof timer.unref === 'function') timer.unref() + this.timers.set(approval.id, timer) return new Promise<'allow' | 'deny'>((resolve, reject) => { this.resolvers.set(approval.id, { resolve, reject }) }) @@ -26,8 +44,7 @@ export class InMemoryApprovalGate implements ApprovalGate { decide(approvalId: string, decision: 'allow' | 'deny', reason?: string): boolean { const approval = this.approvals.get(approvalId) if (!approval) return false - const resolved = resolveApprovalRequest(approval, decision, reason) - this.approvals.set(approvalId, resolved) + this.cleanupEntry(approvalId) const resolver = this.resolvers.get(approvalId) this.resolvers.delete(approvalId) resolver?.resolve(decision) @@ -49,4 +66,33 @@ export class InMemoryApprovalGate implements ApprovalGate { resolve(approvalId: string, decision: 'allow' | 'deny', reason?: string): boolean { return this.decide(approvalId, decision, reason) } + + /** + * Reject all pending approvals and clear timers. Call this on startup + * to drain any state left over from a previous process, or during + * graceful shutdown. + */ + drainAllPending(reason = 'approval drained'): void { + for (const approvalId of [...this.approvals.keys()]) { + this.expire(approvalId, reason) + } + } + + private expire(approvalId: string, reason = 'approval expired'): void { + const approval = this.approvals.get(approvalId) + if (!approval) return + this.cleanupEntry(approvalId) + const resolver = this.resolvers.get(approvalId) + this.resolvers.delete(approvalId) + resolver?.reject(new Error(`${reason}: ${approvalId}`)) + } + + private cleanupEntry(approvalId: string): void { + this.approvals.delete(approvalId) + const timer = this.timers.get(approvalId) + if (timer) { + clearTimeout(timer) + this.timers.delete(approvalId) + } + } } diff --git a/kun/src/loop/agent-loop.ts b/kun/src/loop/agent-loop.ts index ec477237..37c7c37d 100644 --- a/kun/src/loop/agent-loop.ts +++ b/kun/src/loop/agent-loop.ts @@ -32,6 +32,7 @@ import { type PrefixVolatilityFinding } from '../cache/prefix-volatility.js' import { buildToolCatalogFingerprint } from '../cache/tool-catalog-fingerprint.js' +import { TtlLruCache } from '../cache/ttl-lru-cache.js' import { makeUserItem, makeAssistantTextItem, @@ -285,9 +286,15 @@ export type AgentLoopOptions = { export class AgentLoop { private readonly opts: AgentLoopOptions private readonly autoModelRoutes = new Map() - private readonly promptTokenPressure = new Map() + private readonly promptTokenPressure = new TtlLruCache({ + limit: 128, + ttlMs: 10 * 60 * 1000 + }) private readonly toolStormBreakers = new Map() - private readonly toolCatalogSnapshots = new Map() + private readonly toolCatalogSnapshots = new TtlLruCache({ + limit: 64, + ttlMs: 30 * 60 * 1000 + }) constructor(opts: AgentLoopOptions) { this.opts = opts diff --git a/kun/src/server/routes/index.ts b/kun/src/server/routes/index.ts index ec307941..169751b3 100644 --- a/kun/src/server/routes/index.ts +++ b/kun/src/server/routes/index.ts @@ -150,7 +150,11 @@ export function buildRouter(runtime: ServerRuntime): Router { }) router.add('DELETE', '/v1/threads/:id', async (request, ctx) => { if (!authorize(request, runtime)) return ERRORS.unauthorized() - return deleteThread(runtime.threadService, ctx.params.id) + const response = await deleteThread(runtime.threadService, ctx.params.id) + if (response.status === 200) { + runtime.usageService.reset(ctx.params.id) + } + return response }) router.add('POST', '/v1/threads/:id/fork', async (request, ctx) => { if (!authorize(request, runtime)) return ERRORS.unauthorized() diff --git a/kun/tests/ports.test.ts b/kun/tests/ports.test.ts index ba36996c..93ede05a 100644 --- a/kun/tests/ports.test.ts +++ b/kun/tests/ports.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from 'vitest' +import { describe, expect, it, vi } from 'vitest' import { InMemoryEventBus } from '../src/adapters/in-memory-event-bus.js' import { InMemoryApprovalGate } from '../src/adapters/in-memory-approval-gate.js' import { InMemoryThreadStore } from '../src/adapters/in-memory-thread-store.js' @@ -61,6 +61,60 @@ describe('InMemoryApprovalGate', () => { ) expect(gate.pending('th1')).toHaveLength(1) }) + + it('removes the entry after decide so resolved approvals do not leak', async () => { + const gate = new InMemoryApprovalGate() + const approval = createApprovalRequest({ + id: 'clean', + threadId: 't', + turnId: 'tu', + toolName: 'echo', + summary: 's' + }) + const pending = gate.request(approval) + gate.decide('clean', 'deny') + await expect(pending).resolves.toBe('deny') + // Entry must be gone — no lingering state after resolution. + expect(gate.get('clean')).toBeUndefined() + expect(gate.pending()).toHaveLength(0) + }) + + it('auto-expires pending approvals after the timeout and rejects the promise', async () => { + vi.useFakeTimers() + try { + const gate = new InMemoryApprovalGate({ pendingTimeoutMs: 1000 }) + const approval = createApprovalRequest({ + id: 'exp', + threadId: 't', + turnId: 'tu', + toolName: 'echo', + summary: 's' + }) + const pending = gate.request(approval) + expect(gate.pending()).toHaveLength(1) + vi.advanceTimersByTime(1001) + await expect(pending).rejects.toThrow('expired') + expect(gate.pending()).toHaveLength(0) + expect(gate.get('exp')).toBeUndefined() + } finally { + vi.useRealTimers() + } + }) + + it('drainAllPending rejects every pending request and clears state', async () => { + const gate = new InMemoryApprovalGate() + const a = gate.request( + createApprovalRequest({ id: 'd1', threadId: 't', turnId: 'tu', toolName: 'x', summary: 's' }) + ) + const b = gate.request( + createApprovalRequest({ id: 'd2', threadId: 't', turnId: 'tu', toolName: 'y', summary: 's' }) + ) + expect(gate.pending()).toHaveLength(2) + gate.drainAllPending('shutdown') + await expect(a).rejects.toThrow('shutdown') + await expect(b).rejects.toThrow('shutdown') + expect(gate.pending()).toHaveLength(0) + }) }) describe('InMemoryThreadStore', () => { From 047dda6f9cd82c114be6fd0a80dfba7c94dba225 Mon Sep 17 00:00:00 2001 From: xuyingzhou Date: Mon, 8 Jun 2026 14:55:12 +0800 Subject: [PATCH 3/3] refactor(approval-gate): consolidate state cleanup in cleanupEntry method Move resolvers.delete() into cleanupEntry so all state (approvals, timers, resolvers) is cleaned in one place. Adjust decide and expire to grab the resolver reference before calling cleanupEntry. --- kun/src/adapters/in-memory-approval-gate.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/kun/src/adapters/in-memory-approval-gate.ts b/kun/src/adapters/in-memory-approval-gate.ts index f5542771..f2b44ae8 100644 --- a/kun/src/adapters/in-memory-approval-gate.ts +++ b/kun/src/adapters/in-memory-approval-gate.ts @@ -44,9 +44,8 @@ export class InMemoryApprovalGate implements ApprovalGate { decide(approvalId: string, decision: 'allow' | 'deny', reason?: string): boolean { const approval = this.approvals.get(approvalId) if (!approval) return false - this.cleanupEntry(approvalId) const resolver = this.resolvers.get(approvalId) - this.resolvers.delete(approvalId) + this.cleanupEntry(approvalId) resolver?.resolve(decision) return true } @@ -81,14 +80,14 @@ export class InMemoryApprovalGate implements ApprovalGate { private expire(approvalId: string, reason = 'approval expired'): void { const approval = this.approvals.get(approvalId) if (!approval) return - this.cleanupEntry(approvalId) const resolver = this.resolvers.get(approvalId) - this.resolvers.delete(approvalId) + this.cleanupEntry(approvalId) resolver?.reject(new Error(`${reason}: ${approvalId}`)) } private cleanupEntry(approvalId: string): void { this.approvals.delete(approvalId) + this.resolvers.delete(approvalId) const timer = this.timers.get(approvalId) if (timer) { clearTimeout(timer)