Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions openspec/changes/bound-prediction-fanout/tasks.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
## 1. Config: concurrency + retry tunables

- [ ] 1.1 Add `maxInflightInfers` to `services/prediction-worker/src/config.ts` under `triton` via `configField()` (env `TRITON_MAX_INFLIGHT_INFERS`, env-wins, conservative positive-int default).
- [ ] 1.2 Add retry tunables (max attempts, base backoff ms) — either in `config.ts` (`triton`) or as documented `triton-client` defaults — read through typed config, not `process.env`.
- [ ] 1.3 Update `config.test.ts` to cover the new fields' defaults and override parsing.
- [x] 1.1 Add `maxInflightInfers` to `services/prediction-worker/src/config.ts` under `triton` via `configField()` (env `TRITON_MAX_INFLIGHT_INFERS`, env-wins, conservative positive-int default).
- [x] 1.2 Add retry tunables (max attempts, base backoff ms) — either in `config.ts` (`triton`) or as documented `triton-client` defaults — read through typed config, not `process.env`.
- [x] 1.3 Update `config.test.ts` to cover the new fields' defaults and override parsing.

## 2. Bound the fan-out with a shared semaphore

- [ ] 2.1 Add a minimal async semaphore utility (acquire returns a release handle; FIFO) — colocate in prediction-worker or `@protifer/shared` if reusable.
- [ ] 2.2 Construct one semaphore in `index.ts` sized by `config.triton.maxInflightInfers` and inject it into the processor/`dispatchAll` so it is process-wide (shared by all `WORKER_CONCURRENCY` jobs).
- [ ] 2.3 In `dispatch.ts`, acquire a permit around each `triton.modelInfer` call and release it in a `finally` (released on success, throw, and timeout alike).
- [ ] 2.4 Unit tests in `dispatch.test.ts`: (a) concurrent in-flight calls never exceed the limit across multiple simultaneous `dispatchAll` invocations; (b) a thrown `modelInfer` releases its permit (no leak); (c) excess calls wait rather than open immediately.
- [x] 2.1 Add a minimal async semaphore utility (acquire returns a release handle; FIFO) — colocate in prediction-worker or `@protifer/shared` if reusable.
- [x] 2.2 Construct one semaphore in `index.ts` sized by `config.triton.maxInflightInfers` and inject it into the processor/`dispatchAll` so it is process-wide (shared by all `WORKER_CONCURRENCY` jobs).
- [x] 2.3 In `dispatch.ts`, acquire a permit around each `triton.modelInfer` call and release it in a `finally` (released on success, throw, and timeout alike).
- [x] 2.4 Unit tests in `dispatch.test.ts`: (a) concurrent in-flight calls never exceed the limit across multiple simultaneous `dispatchAll` invocations; (b) a thrown `modelInfer` releases its permit (no leak); (c) excess calls wait rather than open immediately.

## 3. Transient transport retry in the client

- [ ] 3.1 In `packages/triton-client/src/client.ts`, wrap `modelInfer` with a bounded jittered retry firing only on the transient transport classes (`UNAVAILABLE`; transport-signature `INTERNAL` — bandwidth/parse/connection), never on `INVALID_ARGUMENT`/`NOT_FOUND`/`DEADLINE_EXCEEDED`.
- [ ] 3.2 Ensure the retry sits _inside_ the caller's held permit (retry loop in the client call, permit held by `dispatch.ts`) so retries do not widen concurrency.
- [ ] 3.3 Unit tests in `client.test.ts`: retries on transient classes up to the cap; no retry on deterministic/deadline classes; success-after-retry returns the response; exhausted retries surface the original classified error.
- [x] 3.1 In `packages/triton-client/src/client.ts`, wrap `modelInfer` with a bounded jittered retry firing only on the transient transport classes (`UNAVAILABLE`; transport-signature `INTERNAL` — bandwidth/parse/connection), never on `INVALID_ARGUMENT`/`NOT_FOUND`/`DEADLINE_EXCEEDED`.
- [x] 3.2 Ensure the retry sits _inside_ the caller's held permit (retry loop in the client call, permit held by `dispatch.ts`) so retries do not widen concurrency.
- [x] 3.3 Unit tests in `client.test.ts`: retries on transient classes up to the cap; no retry on deterministic/deadline classes; success-after-retry returns the response; exhausted retries surface the original classified error.

## 4. Channel keepalive

- [ ] 4.1 Add conservative `grpc.keepalive_time_ms` / `grpc.keepalive_timeout_ms` / `grpc.keepalive_permit_without_calls` options to the channel in `client.ts`, documented to avoid tripping Triton's server-side enforcement.
- [ ] 4.2 Confirm existing `client.test.ts` / `mock-server.test.ts` still pass with the new channel options.
- [x] 4.1 Add conservative `grpc.keepalive_time_ms` / `grpc.keepalive_timeout_ms` / `grpc.keepalive_permit_without_calls` options to the channel in `client.ts`, documented to avoid tripping Triton's server-side enforcement.
- [x] 4.2 Confirm existing `client.test.ts` / `mock-server.test.ts` still pass with the new channel options.

## 5. Verification

- [ ] 5.1 Run repo gates: `bun run typecheck`, `bun run lint`, `bun run format`, `bun run test`.
- [ ] 5.2 Run `bun run test:int` (stack up) to exercise the bounded fan-out against the mock/real Triton path.
- [x] 5.1 Run repo gates: `bun run typecheck`, `bun run lint`, `bun run format`, `bun run test`.
- [x] 5.2 Run `bun run test:int` (stack up) to exercise the bounded fan-out against the mock/real Triton path.
- [ ] 5.3 Load verification: on a real load run confirm no `Connection dropped` / `Bandwidth exhausted or memory limit exceeded` storm, the GPU is busy during prediction (not idle), prediction jobs complete, and BullMQ whole-job retries drop sharply.
- [ ] 5.4 Tune `TRITON_MAX_INFLIGHT_INFERS` upward until Triton is well-utilized without reintroducing transport errors; record the chosen value and rationale (deploy runbook).
166 changes: 166 additions & 0 deletions packages/triton-client/src/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,169 @@ describe('modelInfer deadline enforcement', () => {
expect(response.model_name).toBe('test')
}, 10_000)
})

type Step = 'ok' | { code: number; details: string }

/** Start a gRPC server that walks `plan` per modelInfer call (last step repeats). */
async function startFlakyTritonServer(plan: Step[]): Promise<{
stop(): void
port: number
calls(): number
}> {
const proto = getPackageDef()
const server = new grpc.Server()
let calls = 0

server.addService(proto.inference.GRPCInferenceService.service, {
serverReady: (
_: grpc.ServerUnaryCall<unknown, unknown>,
cb: grpc.sendUnaryData<object>,
) => {
cb(null, { ready: true })
},
modelReady: (
_: grpc.ServerUnaryCall<unknown, unknown>,
cb: grpc.sendUnaryData<object>,
) => {
cb(null, { ready: true })
},
modelInfer: (
_: grpc.ServerUnaryCall<unknown, unknown>,
cb: grpc.sendUnaryData<object>,
) => {
const step = plan[Math.min(calls, plan.length - 1)] ?? 'ok'
calls++
if (step === 'ok') {
cb(null, { model_name: 'test', outputs: [], raw_output_contents: [] })
} else {
cb(
{ code: step.code, details: step.details } as grpc.ServiceError,
null,
)
}
},
})

return new Promise((resolve, reject) => {
server.bindAsync(
'0.0.0.0:0',
grpc.ServerCredentials.createInsecure(),
(err: Error | null, boundPort: number) => {
if (err) {
reject(err)
return
}
resolve({
stop: () => {
server.forceShutdown()
},
port: boundPort,
calls: () => calls,
})
},
)
})
}

const FAST_RETRY = { maxAttempts: 3, baseBackoffMs: 1 }

describe('modelInfer transient-transport retry', () => {
const cleanups: Array<() => void> = []

afterAll(() => {
for (const fn of cleanups) fn()
})

async function makeClient(plan: Step[]) {
const srv = await startFlakyTritonServer(plan)
const client = createTritonClient(`localhost:${srv.port.toString()}`)
cleanups.push(() => {
srv.stop()
client.close()
})
return { srv, client }
}

const REQ = { model_name: 'test', inputs: [], outputs: [] }

it('retries on UNAVAILABLE and succeeds after a transient drop', async () => {
const { srv, client } = await makeClient([
{ code: grpc.status.UNAVAILABLE, details: 'Connection dropped' },
{ code: grpc.status.UNAVAILABLE, details: 'Connection dropped' },
'ok',
])
const resp = await client.modelInfer(REQ, { retry: FAST_RETRY })
expect(resp.model_name).toBe('test')
expect(srv.calls()).toBe(3)
}, 10_000)

it('retries on transport-class INTERNAL (bandwidth exhausted)', async () => {
const { srv, client } = await makeClient([
{
code: grpc.status.INTERNAL,
details: 'Bandwidth exhausted or memory limit exceeded',
},
'ok',
])
const resp = await client.modelInfer(REQ, { retry: FAST_RETRY })
expect(resp.model_name).toBe('test')
expect(srv.calls()).toBe(2)
}, 10_000)

it('does not retry a genuine server INTERNAL', async () => {
const { srv, client } = await makeClient([
{
code: grpc.status.INTERNAL,
details: 'internal model assertion failed',
},
'ok',
])
await expect(
client.modelInfer(REQ, { retry: FAST_RETRY }),
).rejects.toMatchObject({ code: grpc.status.INTERNAL })
expect(srv.calls()).toBe(1)
}, 10_000)

it('does not retry INVALID_ARGUMENT', async () => {
const { srv, client } = await makeClient([
{ code: grpc.status.INVALID_ARGUMENT, details: 'bad shape' },
'ok',
])
await expect(
client.modelInfer(REQ, { retry: FAST_RETRY }),
).rejects.toMatchObject({ code: grpc.status.INVALID_ARGUMENT })
expect(srv.calls()).toBe(1)
}, 10_000)

it('does not retry NOT_FOUND', async () => {
const { srv, client } = await makeClient([
{ code: grpc.status.NOT_FOUND, details: 'no model' },
'ok',
])
await expect(
client.modelInfer(REQ, { retry: FAST_RETRY }),
).rejects.toMatchObject({ code: grpc.status.NOT_FOUND })
expect(srv.calls()).toBe(1)
}, 10_000)

it('does not retry DEADLINE_EXCEEDED (maps to TritonTimeoutError)', async () => {
const { srv, client } = await makeClient([
{ code: grpc.status.DEADLINE_EXCEEDED, details: 'too slow' },
'ok',
])
await expect(client.modelInfer(REQ, { retry: FAST_RETRY })).rejects.toThrow(
TritonTimeoutError,
)
expect(srv.calls()).toBe(1)
}, 10_000)

it('surfaces the classified error once retries are exhausted', async () => {
const { srv, client } = await makeClient([
{ code: grpc.status.UNAVAILABLE, details: 'Connection dropped' },
])
await expect(
client.modelInfer(REQ, { retry: { maxAttempts: 2, baseBackoffMs: 1 } }),
).rejects.toMatchObject({ code: grpc.status.UNAVAILABLE })
expect(srv.calls()).toBe(2)
}, 10_000)
})
121 changes: 96 additions & 25 deletions packages/triton-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,43 @@ export const TRITON_MAX_MESSAGE_BYTES = 64 * 1024 * 1024

export const DEFAULT_DEADLINE_MS = 60_000

export const DEFAULT_RETRY_MAX_ATTEMPTS = 3
export const DEFAULT_RETRY_BASE_BACKOFF_MS = 100

const TRANSIENT_INTERNAL_RE =
/bandwidth exhausted|memory limit exceeded|failed parsing|connection|rst_stream|stream reset/i

function isTransientTransportError(err: unknown): boolean {
if (err instanceof TritonTimeoutError) return false
if (
err !== null &&
typeof err === 'object' &&
'code' in err &&
typeof (err as { code: unknown }).code === 'number'
) {
const code = (err as { code: number }).code
if (code === (grpc.status.UNAVAILABLE as number)) return true
if (code === (grpc.status.INTERNAL as number)) {
const detail = (
(err as { details?: string }).details ??
(err as { message?: string }).message ??
''
).toLowerCase()
return TRANSIENT_INTERNAL_RE.test(detail)
}
}
return false
}

function retryBackoffMs(baseBackoffMs: number, attempt: number): number {
const exp = baseBackoffMs * 2 ** (attempt - 1)
return Math.round(exp / 2 + Math.random() * (exp / 2))
}

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}

/**
* Thrown when a gRPC call to Triton exceeds the configured deadline.
* Maps to gRPC status code 4 (DEADLINE_EXCEEDED).
Expand Down Expand Up @@ -67,9 +104,17 @@ export interface InferResponse {
raw_output_contents: Buffer[]
}

export interface ModelInferRetryOptions {
/** Total attempts including the first. Values ≤1 disable retry. */
maxAttempts: number
baseBackoffMs: number
}

export interface ModelInferOptions {
/** Milliseconds before the gRPC call is cancelled. Defaults to DEFAULT_DEADLINE_MS (60 000). */
deadlineMs?: number
/** Bounded jittered retry on transient transport errors. Defaults to the DEFAULT_RETRY_* constants. */
retry?: ModelInferRetryOptions
}

export interface TritonClient {
Expand Down Expand Up @@ -127,37 +172,63 @@ export function createTritonClient(url: string): TritonClient {
'grpc.enable_retries': 0,
'grpc.max_receive_message_length': TRITON_MAX_MESSAGE_BYTES,
'grpc.max_send_message_length': TRITON_MAX_MESSAGE_BYTES,
// Keepalive pings only while calls are in flight (permit_without_calls: 0)
// so a half-open connection is detected mid-burst without tripping Triton's
// server-side min-ping-interval enforcement (ENHANCE_YOUR_CALM).
'grpc.keepalive_time_ms': 30_000,
'grpc.keepalive_timeout_ms': 10_000,
'grpc.keepalive_permit_without_calls': 0,
},
)

function callOnce(
request: InferRequest,
deadlineMs: number,
): Promise<InferResponse> {
return new Promise((resolve, reject) => {
const deadline = new Date(Date.now() + deadlineMs)
stub.modelInfer(
request,
{ deadline },
(err: grpc.ServiceError | null, response: InferResponse) => {
if (err) {
if (err.code === grpc.status.DEADLINE_EXCEEDED) {
reject(
new TritonTimeoutError(
`Triton modelInfer timed out after ${deadlineMs.toString()} ms`,
deadlineMs,
),
)
} else {
reject(err)
}
} else {
resolve(response)
}
},
)
})
}

return {
modelInfer(
async modelInfer(
request: InferRequest,
{ deadlineMs = DEFAULT_DEADLINE_MS }: ModelInferOptions = {},
{ deadlineMs = DEFAULT_DEADLINE_MS, retry }: ModelInferOptions = {},
): Promise<InferResponse> {
return new Promise((resolve, reject) => {
const deadline = new Date(Date.now() + deadlineMs)
stub.modelInfer(
request,
{ deadline },
(err: grpc.ServiceError | null, response: InferResponse) => {
if (err) {
if (err.code === grpc.status.DEADLINE_EXCEEDED) {
reject(
new TritonTimeoutError(
`Triton modelInfer timed out after ${deadlineMs.toString()} ms`,
deadlineMs,
),
)
} else {
reject(err)
}
} else {
resolve(response)
}
},
)
})
const maxAttempts = retry?.maxAttempts ?? DEFAULT_RETRY_MAX_ATTEMPTS
const baseBackoffMs =
retry?.baseBackoffMs ?? DEFAULT_RETRY_BASE_BACKOFF_MS
let attempt = 0
for (;;) {
attempt++
try {
return await callOnce(request, deadlineMs)
} catch (err) {
if (attempt >= maxAttempts || !isTransientTransportError(err))
throw err
await sleep(retryBackoffMs(baseBackoffMs, attempt))
}
}
},

serverReady(): Promise<boolean> {
Expand Down
3 changes: 3 additions & 0 deletions packages/triton-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ export {
createTritonClient,
TritonTimeoutError,
DEFAULT_DEADLINE_MS,
DEFAULT_RETRY_MAX_ATTEMPTS,
DEFAULT_RETRY_BASE_BACKOFF_MS,
} from './client.ts'
export type {
TritonClient,
Expand All @@ -10,6 +12,7 @@ export type {
TensorInput,
TensorOutput,
ModelInferOptions,
ModelInferRetryOptions,
} from './client.ts'
export * from './constants.ts'
export * from './float16.ts'
Expand Down
Loading
Loading