Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions kun/src/adapters/in-memory-approval-gate.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,41 @@
import type { ApprovalGate } from '../ports/approval-gate.js'
import type { ApprovalRequest } from '../domain/approval.js'
import { resolveApprovalRequest } from '../domain/approval.js'

type PendingResolver = {
resolve: (decision: 'allow' | 'deny') => void
reject: (error: Error) => void
}

const DEFAULT_PENDING_TIMEOUT_MS = 10 * 60 * 1000

/**
* In-memory approval gate. The HTTP layer posts decisions into
* `decide`; the loop awaits the `request` promise to learn whether
* the user allowed or denied the call.
*
* Pending approvals auto-expire after `pendingTimeoutMs` (default 10 min)
* to prevent unbounded growth from abandoned requests. Resolved entries
* are removed immediately since the event log is the source of truth.
*/
export class InMemoryApprovalGate implements ApprovalGate {
private readonly approvals = new Map<string, ApprovalRequest>()
private readonly resolvers = new Map<string, PendingResolver>()
private readonly timers = new Map<string, ReturnType<typeof setTimeout>>()
private readonly pendingTimeoutMs: number

constructor(options?: { pendingTimeoutMs?: number }) {
this.pendingTimeoutMs =
options?.pendingTimeoutMs ?? DEFAULT_PENDING_TIMEOUT_MS
}

request(approval: ApprovalRequest): Promise<'allow' | 'deny'> {
this.approvals.set(approval.id, approval)
const timer = setTimeout(
() => this.expire(approval.id),
this.pendingTimeoutMs
)
if (typeof timer.unref === 'function') timer.unref()
this.timers.set(approval.id, timer)
return new Promise<'allow' | 'deny'>((resolve, reject) => {
this.resolvers.set(approval.id, { resolve, reject })
})
Expand All @@ -26,10 +44,8 @@ export class InMemoryApprovalGate implements ApprovalGate {
decide(approvalId: string, decision: 'allow' | 'deny', reason?: string): boolean {
const approval = this.approvals.get(approvalId)
if (!approval) return false
const resolved = resolveApprovalRequest(approval, decision, reason)
this.approvals.set(approvalId, resolved)
const resolver = this.resolvers.get(approvalId)
this.resolvers.delete(approvalId)
this.cleanupEntry(approvalId)
resolver?.resolve(decision)
return true
}
Expand All @@ -49,4 +65,33 @@ export class InMemoryApprovalGate implements ApprovalGate {
resolve(approvalId: string, decision: 'allow' | 'deny', reason?: string): boolean {
return this.decide(approvalId, decision, reason)
}

/**
* Reject all pending approvals and clear timers. Call this on startup
* to drain any state left over from a previous process, or during
* graceful shutdown.
*/
drainAllPending(reason = 'approval drained'): void {
for (const approvalId of [...this.approvals.keys()]) {
this.expire(approvalId, reason)
}
}

private expire(approvalId: string, reason = 'approval expired'): void {
const approval = this.approvals.get(approvalId)
if (!approval) return
const resolver = this.resolvers.get(approvalId)
this.cleanupEntry(approvalId)
resolver?.reject(new Error(`${reason}: ${approvalId}`))
}

private cleanupEntry(approvalId: string): void {
this.approvals.delete(approvalId)
this.resolvers.delete(approvalId)
const timer = this.timers.get(approvalId)
if (timer) {
clearTimeout(timer)
this.timers.delete(approvalId)
}
}
}
121 changes: 106 additions & 15 deletions kun/src/adapters/model/deepseek-compat-model-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ export type DeepseekCompatConfig = {
nonStreaming?: boolean
/** Maximum idle time between streaming chunks before the turn fails. */
streamIdleTimeoutMs?: number
/**
* Retry configuration for transient HTTP errors (429, 5xx) and
* network failures. Retries only happen before any streaming chunk
* is yielded, so the consumer never sees partial + retried output.
*/
retry?: {
/** Total attempts including the initial request. Defaults to 3. */
maxAttempts?: number
/** Base delay for exponential backoff in ms. Defaults to 500. */
baseDelayMs?: number
/** Upper bound for a single backoff delay in ms. Defaults to 8000. */
maxDelayMs?: number
}
}

type ChatMessage = {
Expand Down Expand Up @@ -86,6 +99,15 @@ type StreamReadResult =
| { kind: 'error'; message: string }

const DEFAULT_STREAM_IDLE_TIMEOUT_MS = 45_000
const DEFAULT_RETRY_MAX_ATTEMPTS = 3
const DEFAULT_RETRY_BASE_DELAY_MS = 500
const DEFAULT_RETRY_MAX_DELAY_MS = 8_000

type ResolvedRetryOptions = {
maxAttempts: number
baseDelayMs: number
maxDelayMs: number
}

/**
* DeepSeek-compatible model client.
Expand Down Expand Up @@ -129,22 +151,43 @@ export class DeepseekCompatModelClient implements ModelClient {
body: JSON.stringify(body),
signal: request.abortSignal
}
let response: Response
try {
response = await this.fetchImpl(url, init)
} catch (error) {
const message = error instanceof Error ? error.message : String(error)
yield { kind: 'error', message: `model request failed: ${message}` }
return
}
if (!response.ok) {
const text = await response.text()
const classified = await this.classifyHttpError(response.status, text)
yield {
kind: 'error',
message: classified.message,
code: classified.code
let response: Response | undefined
const retry = this.retryOptions()
for (let attempt = 0; ; attempt += 1) {
if (request.abortSignal.aborted) {
yield { kind: 'error', message: 'request was aborted before start' }
return
}
try {
response = await this.fetchImpl(url, init)
} catch (error) {
const message = error instanceof Error ? error.message : String(error)
if (attempt < retry.maxAttempts - 1) {
await sleep(computeBackoffDelay(attempt, retry), request.abortSignal)
continue
}
yield { kind: 'error', message: `model request failed: ${message}` }
return
}
if (response.ok) break
if (attempt >= retry.maxAttempts - 1 || !isRetryableHttpStatus(response.status)) {
const text = await response.text()
const classified = await this.classifyHttpError(response.status, text)
yield {
kind: 'error',
message: classified.message,
code: classified.code
}
return
}
const retryAfterMs = parseRetryAfterMs(response.headers.get('retry-after'))
await sleep(
Math.min(retryAfterMs ?? computeBackoffDelay(attempt, retry), retry.maxDelayMs),
request.abortSignal
)
}
if (!response) {
yield { kind: 'error', message: 'model request produced no response' }
return
}
if (this.config.nonStreaming || response.headers.get('content-type')?.includes('application/json')) {
Expand Down Expand Up @@ -175,6 +218,15 @@ export class DeepseekCompatModelClient implements ModelClient {
return { ...headers, ...(this.config.headers ?? {}) }
}

private retryOptions(): ResolvedRetryOptions {
const retry = this.config.retry
return {
maxAttempts: Math.max(1, Math.floor(retry?.maxAttempts ?? DEFAULT_RETRY_MAX_ATTEMPTS)),
baseDelayMs: Math.max(0, Math.floor(retry?.baseDelayMs ?? DEFAULT_RETRY_BASE_DELAY_MS)),
maxDelayMs: Math.max(1, Math.floor(retry?.maxDelayMs ?? DEFAULT_RETRY_MAX_DELAY_MS))
}
}

private async classifyHttpError(status: number, text: string): Promise<{ message: string; code: string }> {
const body = text.slice(0, 500)
if (status === 429) {
Expand Down Expand Up @@ -1046,3 +1098,42 @@ function limitHistoryPreservingCompaction(history: TurnItem[], windowSize: numbe
}
return limited
}

function isRetryableHttpStatus(status: number): boolean {
return status === 429 || (status >= 500 && status <= 599)
}

function parseRetryAfterMs(value: string | null): number | undefined {
if (!value) return undefined
const trimmed = value.trim()
const seconds = Number(trimmed)
if (Number.isFinite(seconds) && seconds >= 0) {
return Math.min(Math.ceil(seconds * 1000), 60_000)
}
const date = Date.parse(trimmed)
if (Number.isFinite(date)) {
return Math.max(0, Math.ceil(date - Date.now()))
}
return undefined
}

function computeBackoffDelay(attempt: number, options: ResolvedRetryOptions): number {
const raw = options.baseDelayMs * Math.pow(2, attempt)
const jitter = raw * 0.25 * (Math.random() * 2 - 1)
return Math.max(0, Math.min(Math.ceil(raw + jitter), options.maxDelayMs))
}

function sleep(ms: number, signal: AbortSignal): Promise<void> {
if (ms <= 0 || signal.aborted) return Promise.resolve()
return new Promise((resolve) => {
const timer = setTimeout(resolve, ms)
signal.addEventListener(
'abort',
() => {
clearTimeout(timer)
resolve()
},
{ once: true }
)
})
}
11 changes: 9 additions & 2 deletions kun/src/loop/agent-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
type PrefixVolatilityFinding
} from '../cache/prefix-volatility.js'
import { buildToolCatalogFingerprint } from '../cache/tool-catalog-fingerprint.js'
import { TtlLruCache } from '../cache/ttl-lru-cache.js'
import {
makeUserItem,
makeAssistantTextItem,
Expand Down Expand Up @@ -285,9 +286,15 @@ export type AgentLoopOptions = {
export class AgentLoop {
private readonly opts: AgentLoopOptions
private readonly autoModelRoutes = new Map<string, AutoModelRouteSelection>()
private readonly promptTokenPressure = new Map<string, { model: string; promptTokens: number }>()
private readonly promptTokenPressure = new TtlLruCache<string, { model: string; promptTokens: number }>({
limit: 128,
ttlMs: 10 * 60 * 1000
})
private readonly toolStormBreakers = new Map<string, ToolStormBreaker>()
private readonly toolCatalogSnapshots = new Map<string, ToolCatalogSnapshot>()
private readonly toolCatalogSnapshots = new TtlLruCache<string, ToolCatalogSnapshot>({
limit: 64,
ttlMs: 30 * 60 * 1000
})

constructor(opts: AgentLoopOptions) {
this.opts = opts
Expand Down
6 changes: 5 additions & 1 deletion kun/src/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ export function buildRouter(runtime: ServerRuntime): Router {
})
router.add('DELETE', '/v1/threads/:id', async (request, ctx) => {
if (!authorize(request, runtime)) return ERRORS.unauthorized()
return deleteThread(runtime.threadService, ctx.params.id)
const response = await deleteThread(runtime.threadService, ctx.params.id)
if (response.status === 200) {
runtime.usageService.reset(ctx.params.id)
}
return response
})
router.add('POST', '/v1/threads/:id/fork', async (request, ctx) => {
if (!authorize(request, runtime)) return ERRORS.unauthorized()
Expand Down
Loading