From a640737f2d9bcba65021f6d26ce372253ff41253 Mon Sep 17 00:00:00 2001 From: Siddarth Chalasani Date: Fri, 29 May 2026 15:39:50 -0700 Subject: [PATCH 1/4] feat(http2): multiplex over a bounded undici pool (undici 7.26) Configure the opt-in http2 transport's undici Agent with connections=4 and pipelining=64 so concurrent requests multiplex over a few TLS sessions instead of opening one connection per request, and bump undici ^6.21 -> ^7.26. undici only multiplexes H2 streams when pipelining > 1 (default 1), and the multiplexed-H2 path assert-crashes on undici 6.x (fixed upstream in 7.23.0, nodejs/undici#4845). Verified on the pinned 7.26.0: 0 crashes and 4 sessions for 2000 concurrent requests. Raises the supported Node floor to >= 20.18.1. - src/lib/undici-fetch.ts: bounded multiplexing Agent + @internal test hooks - package.json: undici ^7.26.0, engines.node >= 20.18.1 - README: Node requirement + HTTP/2 transport section - tests/lib/undici-fetch.test.ts: hermetic normalizeBody / multiplexing / abort it-test - verify-http2.mjs: concurrent-multiplexing assertion (Pass D) - h2-transport-bench.mjs: committed perf/repro harness Co-Authored-By: Claude Opus 4.8 --- README.md | 15 +- package.json | 5 +- src/lib/undici-fetch.ts | 66 +++- tests/lib/undici-fetch.test.ts | 161 ++++++++ .../smoketests/scripts/h2-transport-bench.mjs | 369 ++++++++++++++++++ tests/smoketests/scripts/verify-http2.mjs | 19 + yarn.lock | 8 +- 7 files changed, 618 insertions(+), 25 deletions(-) create mode 100644 tests/lib/undici-fetch.test.ts create mode 100644 tests/smoketests/scripts/h2-transport-bench.mjs diff --git a/README.md b/README.md index e0bad3390..101e978bb 100644 --- a/README.md +++ b/README.md @@ -355,6 +355,19 @@ await runloop.devboxes.create({...}, { }); ``` +### HTTP/2 transport (experimental) + +On Node.js, the SDK can send requests over HTTP/2, which multiplexes many concurrent requests over a small number of TLS connections instead of opening a connection per request. Enable it with the `http2` option: + + +```ts +const runloop = new RunloopSDK({ + http2: true, +}); +``` + +Requests are routed through an [undici](https://github.com/nodejs/undici) connection pool with HTTP/2 enabled, falling back to HTTP/1.1 for origins that don't negotiate h2 via ALPN. It is intended for HTTP/2-capable origins such as the Runloop API. This transport uses undici and therefore **requires Node.js >= 20.18.1** (see Requirements). + ## Semantic versioning This package generally follows [SemVer](https://semver.org/spec/v2.0.0.html) conventions, though certain backwards-incompatible changes may be released as minor versions: @@ -374,7 +387,7 @@ TypeScript >= 4.5 is supported. The following runtimes are supported: - Web browsers (Up-to-date Chrome, Firefox, Safari, Edge, and more) -- Node.js 18 LTS or later ([non-EOL](https://endoflife.date/nodejs)) versions. +- Node.js 20.18.1 LTS or later ([non-EOL](https://endoflife.date/nodejs)) versions. (Raised from 18 because the SDK now depends on undici 7 on Node; the HTTP/2 transport requires undici >= 7.23.0.) - Deno v1.28.0 or higher. - Bun 1.0 or later. - Cloudflare Workers. diff --git a/package.json b/package.json index cf151ab2a..cef690fc1 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,9 @@ "repository": "github:runloopai/api-client-ts", "license": "MIT", "packageManager": "yarn@1.22.22", + "engines": { + "node": ">=20.18.1" + }, "files": [ "**/*" ], @@ -50,7 +53,7 @@ "formdata-node": "^4.3.2", "node-fetch": "^2.6.7", "tar": "^7.5.2", - "undici": "^6.21.0", + "undici": "^7.26.0", "uuidv7": "^1.0.2", "zod": "^3.24.1" }, diff --git a/src/lib/undici-fetch.ts b/src/lib/undici-fetch.ts index db112b416..bfe87a3c5 100644 --- a/src/lib/undici-fetch.ts +++ b/src/lib/undici-fetch.ts @@ -1,34 +1,52 @@ /** - * A fetch-compatible adapter backed by undici's HTTP/2 support. + * A fetch-compatible adapter backed by undici's HTTP/2 support, using a bounded + * connection pool that multiplexes many concurrent requests over a few TLS sessions. * - * undici is the same engine that powers Node's built-in global `fetch`. - * Constructing an `Agent` with `allowH2: true` and passing it as the - * per-request `dispatcher` makes requests negotiate HTTP/2 via ALPN, with - * automatic fallback to HTTP/1.1 when the origin doesn't advertise h2. undici - * returns a standard WHATWG `Response`, so the rest of core.ts — which only + * undici is the same engine that powers Node's built-in global `fetch`. An `Agent` + * with `allowH2: true` negotiates HTTP/2 via ALPN and transparently falls back to + * HTTP/1.1 when the origin doesn't advertise h2. Two options make it actually + * multiplex rather than open one connection per request: + * - `connections` bounds the pool to a few TLS sessions per origin. Without it + * undici opens a fresh connection for every concurrent request (a connection + * storm) instead of reusing sessions. + * - `pipelining` (undici default: 1) is the max concurrent streams undici runs + * per session; it must be > 1 for H2 stream multiplexing to happen at all. + * + * undici returns a standard WHATWG `Response`, so the rest of core.ts — which only * touches standard Response members (`.status`, `.ok`, `.headers`, `.text()`, - * `.json()`, `.body`, `.arrayBuffer()`, `.blob()`) — is unchanged. + * `.json()`, `.body`, `.arrayBuffer()`, `.blob()`) — is unchanged. undici is dual + * CJS/ESM and `require`-able from this `"type": "commonjs"` package, so there is no + * dynamic-import hack and no second HTTP stack. * - * Unlike the previous got@14 approach, undici is dual CJS/ESM and `require`-able - * from this `"type": "commonjs"` package, so there is no dynamic-import hack and - * no second HTTP stack to keep in sync. + * Note: `pipelining > 1` also enables HTTP/1.1 request pipelining on the fallback + * path, so `http2: true` (opt-in) is intended for h2-capable origins. Requires + * undici >= 7.23.0 — multiplexed H2 assert-crashes on 6.x (undici PR #4845) — and + * therefore Node >= 20.18.1. * - * This lives in src/lib/ (the Stainless custom-code dir) so it survives - * regeneration; the only generated file touched is the one-line wiring change - * in src/_shims/node-runtime.ts. + * Lives in src/lib/ (the Stainless custom-code dir) so it survives regeneration. */ import { Agent, fetch as undiciFetchImpl } from 'undici'; import { Readable } from 'node:stream'; import { MultipartBody } from '../_shims/MultipartBody'; import { type Fetch } from '../core'; -// One module-scoped dispatcher, reused across requests: this is the HTTP/2 -// transport, with keep-alive. `allowH2` negotiates h2 over TLS via ALPN and -// transparently falls back to HTTP/1.1 when the origin doesn't offer h2. +const KEEP_ALIVE_TIMEOUT_MS = 10 * 60 * 1000; +// Bound the pool to a few TLS sessions per origin and multiplex many H2 streams +// over each. 4 x 64 = 256 concurrent requests in flight before undici queues the rest. +// Exported so tests can verify the shipped values actually multiplex. @internal +export const H2_MAX_CONNECTIONS = 4; +export const H2_MAX_CONCURRENT_STREAMS = 64; + +// One module-scoped dispatcher, reused across requests: a bounded HTTP/2 pool with +// keep-alive. `allowH2` negotiates h2 over TLS via ALPN and transparently falls back +// to HTTP/1.1 when the origin doesn't offer h2; `connections`/`pipelining` make it +// multiplex (see the file header). const h2Dispatcher = new Agent({ allowH2: true, - keepAliveTimeout: 10 * 60 * 1000, - keepAliveMaxTimeout: 10 * 60 * 1000, + connections: H2_MAX_CONNECTIONS, + pipelining: H2_MAX_CONCURRENT_STREAMS, + keepAliveTimeout: KEEP_ALIVE_TIMEOUT_MS, + keepAliveMaxTimeout: KEEP_ALIVE_TIMEOUT_MS, }); type NormalizedBody = { body: any; isStream: boolean }; @@ -36,7 +54,8 @@ type NormalizedBody = { body: any; isStream: boolean }; // Map the body shapes core.ts produces (string | Buffer/ArrayBufferView | // Node Readable for multipart | null) onto a valid undici BodyInit. A Node // Readable must become a Web ReadableStream and requires `duplex: 'half'`. -function normalizeBody(body: unknown): NormalizedBody { +// Exported for unit tests. @internal +export function normalizeBody(body: unknown): NormalizedBody { if (body == null) return { body: undefined, isStream: false }; if (typeof body === 'string') return { body, isStream: false }; if (Buffer.isBuffer(body)) return { body, isStream: false }; @@ -79,3 +98,12 @@ export const undiciFetch: Fetch = async (url, init) => { }; export default undiciFetch; + +/** + * Test-only: close the module-scoped dispatcher so a test process can exit cleanly + * after exercising the adapter. Not part of the public API. + * @internal + */ +export async function __closeDispatchersForTest(): Promise { + await h2Dispatcher.close(); +} diff --git a/tests/lib/undici-fetch.test.ts b/tests/lib/undici-fetch.test.ts new file mode 100644 index 000000000..9cc3f1925 --- /dev/null +++ b/tests/lib/undici-fetch.test.ts @@ -0,0 +1,161 @@ +import http from 'node:http'; +import http2 from 'node:http2'; +import os from 'node:os'; +import fs from 'node:fs'; +import path from 'node:path'; +import { execFileSync } from 'node:child_process'; +import { Readable } from 'node:stream'; +import diagnostics_channel from 'node:diagnostics_channel'; +import { Agent, fetch as undiciFetchImpl } from 'undici'; +import { + undiciFetch, + normalizeBody, + __closeDispatchersForTest, + H2_MAX_CONNECTIONS, + H2_MAX_CONCURRENT_STREAMS, +} from '../../src/lib/undici-fetch'; +import { MultipartBody } from '../../src/_shims/MultipartBody'; + +describe('undiciFetch adapter', () => { + // ── normalizeBody: maps core.ts's body shapes onto valid undici BodyInit ── + describe('normalizeBody', () => { + test('passes string / Buffer / typed array through unchanged (non-stream)', () => { + expect(normalizeBody('hi')).toEqual({ body: 'hi', isStream: false }); + const buf = Buffer.from('b'); + expect(normalizeBody(buf)).toEqual({ body: buf, isStream: false }); + const u8 = new Uint8Array([1, 2]); + expect(normalizeBody(u8)).toEqual({ body: u8, isStream: false }); + }); + + test('wraps an ArrayBuffer in a Buffer', () => { + const out = normalizeBody(new Uint8Array([1, 2, 3]).buffer); + expect(out.isStream).toBe(false); + expect(Buffer.isBuffer(out.body)).toBe(true); + }); + + test('returns an undefined body for null / undefined', () => { + expect(normalizeBody(null)).toEqual({ body: undefined, isStream: false }); + expect(normalizeBody(undefined)).toEqual({ body: undefined, isStream: false }); + }); + + test('converts a Node Readable to a web ReadableStream and flags isStream', () => { + const out = normalizeBody(Readable.from(['x'])); + expect(out.isStream).toBe(true); + expect(typeof out.body.getReader).toBe('function'); // WHATWG ReadableStream + }); + + test('unwraps MultipartBody to its inner stream', () => { + const out = normalizeBody(new MultipartBody(Readable.from(['x']))); + expect(out.isStream).toBe(true); + expect(typeof out.body.getReader).toBe('function'); + }); + }); + + // ── multiplexing: the shipped pool config multiplexes streams over few sessions ── + // jest workers don't honor a runtime NODE_TLS_REJECT_UNAUTHORIZED, so the adapter's + // module-scoped Agent can't reach a self-signed cert here. Instead we build an Agent from + // the adapter's *exported* constants with an explicit `connect.rejectUnauthorized:false` + // (which jest does honor) — verifying the values the adapter ships actually multiplex. + describe('HTTP/2 multiplexing (shipped pool config)', () => { + let server: http2.Http2SecureServer; + let url: string; + let agent: Agent; + const state = { sessions: 0, inFlight: 0, maxInFlight: 0 }; + const alpn: string[] = []; + const onConnected = (m: unknown) => { + const p = (m as { socket?: { alpnProtocol?: string } } | undefined)?.socket?.alpnProtocol; + if (p) alpn.push(p); + }; + + beforeAll(async () => { + diagnostics_channel.subscribe('undici:client:connected', onConnected); + const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'h2cert-')); + const k = path.join(dir, 'k'); + const c = path.join(dir, 'c'); + execFileSync( + 'openssl', + // prettier-ignore + ['req','-x509','-newkey','rsa:2048','-nodes','-keyout',k,'-out',c,'-days','1','-subj','/CN=localhost','-addext','subjectAltName=DNS:localhost,IP:127.0.0.1'], + { stdio: 'ignore' }, + ); + server = http2.createSecureServer({ + key: fs.readFileSync(k), + cert: fs.readFileSync(c), + settings: { maxConcurrentStreams: 200 }, + }); + server.on('session', () => { + state.sessions++; + }); + server.on('request', (_req, res) => { + state.inFlight++; + state.maxInFlight = Math.max(state.maxInFlight, state.inFlight); + setTimeout(() => { + state.inFlight--; + res.writeHead(200, { 'content-type': 'application/json' }); + res.end('{"ok":true}'); + }, 25); + }); + await new Promise((r) => server.listen(0, '127.0.0.1', () => r())); + url = `https://127.0.0.1:${(server.address() as { port: number }).port}/`; + agent = new Agent({ + allowH2: true, + connections: H2_MAX_CONNECTIONS, + pipelining: H2_MAX_CONCURRENT_STREAMS, + connect: { rejectUnauthorized: false }, + }); + }); + + afterAll(async () => { + diagnostics_channel.unsubscribe('undici:client:connected', onConnected); + await agent.close(); + await new Promise((r) => server.close(() => r())); + }); + + test('N concurrent requests reuse <= H2_MAX_CONNECTIONS sessions and overlap', async () => { + const N = 50; + const responses = await Promise.all( + Array.from({ length: N }, () => undiciFetchImpl(url, { dispatcher: agent })), + ); + const bodies = await Promise.all(responses.map((r) => r.json())); + + expect(bodies).toHaveLength(N); + expect((bodies as Array<{ ok?: boolean }>).every((b) => b.ok === true)).toBe(true); + expect(alpn).toContain('h2'); // really h2, not an h1 fallback + expect(state.sessions).toBeLessThanOrEqual(H2_MAX_CONNECTIONS); // bounded pool, not 1-per-request + // Genuine stream concurrency: many requests in flight at once. A `pipelining: 1` regression + // would cap maxInFlight at the connection count, so this is what guards the multiplexing. + expect(state.maxInFlight).toBeGreaterThan(H2_MAX_CONNECTIONS); + }); + }); + + // ── abort: the real adapter forwards the AbortSignal (plain http, no TLS needed) ── + describe('abort handling (real undiciFetch)', () => { + let server: http.Server; + let base: string; + + beforeAll(async () => { + server = http.createServer((req, res) => { + const timer = setTimeout(() => { + res.writeHead(200); + res.end('{}'); + }, 10_000); + timer.unref?.(); + req.on('close', () => clearTimeout(timer)); + }); + await new Promise((r) => server.listen(0, '127.0.0.1', () => r())); + base = `http://127.0.0.1:${(server.address() as { port: number }).port}`; + }); + + afterAll(async () => { + await __closeDispatchersForTest(); + await new Promise((r) => server.close(() => r())); + }); + + test('rejects with AbortError when the request signal is aborted', async () => { + const controller = new AbortController(); + const promise = undiciFetch(`${base}/slow`, { method: 'GET', signal: controller.signal } as any); + setTimeout(() => controller.abort(), 50); + await expect(promise).rejects.toMatchObject({ name: 'AbortError' }); + }); + }); +}); diff --git a/tests/smoketests/scripts/h2-transport-bench.mjs b/tests/smoketests/scripts/h2-transport-bench.mjs new file mode 100644 index 000000000..ab223c3c9 --- /dev/null +++ b/tests/smoketests/scripts/h2-transport-bench.mjs @@ -0,0 +1,369 @@ +/** + * HTTP/2 transport micro-benchmark — settles "#791 lacks the perf bump that #792 adds". + * + * Runs entirely locally against a self-signed node:http2 server (no API key, no network). + * The SERVER counts how many TLS connections / H2 sessions each transport opens for N + * concurrent requests — that is the ground truth for "does it multiplex?". We also record + * latency percentiles, wall time, failures, and whether a transport crashes the process. + * + * Transports under test (faithful to each branch's src/lib/undici-fetch.ts): + * - undici-allowH2 = PR #791 (undici Agent({ allowH2:true }), default pipelining=1 → no multiplexing) + * - undici-Pool/Agent pipelining=64 = THIS branch (bounded pool that multiplexes; needs undici >= 7.23.0) + * - node-http2-pool = PR #792 (verbatim copy of the hand-rolled H2Pool, types stripped) + * - undici-h1 = baseline (undici Agent({ allowH2:false, connections:4 })) + * + * Usage (needs undici resolvable from cwd; >= 7.23.0 for the pipelining variants — multiplexed + * H2 assert-crashes on 6.x, see undici PR #4845): + * node h2-transport-bench.mjs + * REQUESTS=2000 SERVER_DELAY_MS=25 MAX_STREAMS=100 node h2-transport-bench.mjs + * ONLY=poolpipe,agentpipe,792 node h2-transport-bench.mjs + */ +process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; // localhost self-signed cert only + +import http2 from 'node:http2'; +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import diagnosticsChannel from 'node:diagnostics_channel'; +import { execFileSync } from 'node:child_process'; +import { performance } from 'node:perf_hooks'; +import { Readable } from 'node:stream'; +import { Agent, Pool, Headers, Response, fetch as undiciFetchImpl, request as undiciRequest } from 'undici'; + +const REQUESTS = Number(process.env.REQUESTS ?? 1000); +const SERVER_DELAY_MS = Number(process.env.SERVER_DELAY_MS ?? 20); +const MAX_STREAMS = Number(process.env.MAX_STREAMS ?? 100); // server's SETTINGS_MAX_CONCURRENT_STREAMS + +let crashes = 0; +const crashMsgs = new Set(); +process.on('uncaughtException', (e) => { crashes++; crashMsgs.add(String(e?.message ?? e)); }); +process.on('unhandledRejection', (e) => { crashes++; crashMsgs.add(String(e?.message ?? e)); }); + +// ───────────────────────── transport #791: undici Agent allowH2 ───────────────────────── +const agent791 = new Agent({ allowH2: true, keepAliveTimeout: 600000, keepAliveMaxTimeout: 600000 }); +function fetch791(url, init) { + const { agent: _a, signal, ...rest } = init ?? {}; + return undiciFetchImpl(url, { ...rest, dispatcher: agent791, signal: signal ?? undefined }); +} + +// ───────────────────────── baseline: undici h1 (4 connections) ───────────────────────── +const agentH1 = new Agent({ allowH2: false, connections: 4, keepAliveTimeout: 600000, keepAliveMaxTimeout: 600000 }); +function fetchH1(url, init) { + const { agent: _a, signal, ...rest } = init ?? {}; + return undiciFetchImpl(url, { ...rest, dispatcher: agentH1, signal: signal ?? undefined }); +} + +// ════════════════ transport #792: verbatim copy of H2Pool (types stripped) ════════════════ +// Source: PR #792 tode/http2-updates-shared-pool-test:src/lib/undici-fetch.ts. Re-sync if it changes. +class MultipartBody { constructor(body) { this.body = body; } } +const MAX_H2_SESSIONS = 4; +const MAX_H2_STREAMS_PER_SESSION = 64; +const KEEP_ALIVE_TIMEOUT_MS = 10 * 60 * 1000; +const h1Dispatcher = new Agent({ allowH2: false, connections: 4, keepAliveTimeout: KEEP_ALIVE_TIMEOUT_MS, keepAliveMaxTimeout: KEEP_ALIVE_TIMEOUT_MS }); +const connectedChannel = diagnosticsChannel.channel('undici:client:connected'); +const pools = new Map(); +function normalizeBody(body) { + if (body == null) return undefined; + if (typeof body === 'string') return body; + if (Buffer.isBuffer(body)) return body; + if (body instanceof MultipartBody) return normalizeBody(body.body); + if (body instanceof Readable) return body; + if (ArrayBuffer.isView(body)) return body; + if (body instanceof ArrayBuffer) return Buffer.from(body); + return String(body); +} +function toResponseHeaders(headers) { + const responseHeaders = new Headers(); + for (const [name, value] of Object.entries(headers)) { + if (value === undefined) continue; + if (Array.isArray(value)) { for (const item of value) responseHeaders.append(name, item); } + else responseHeaders.append(name, value); + } + return responseHeaders; +} +function statusMustNotHaveBody(status) { return status === 204 || status === 205 || status === 304; } +function abortError() { const e = new Error('The operation was aborted'); e.name = 'AbortError'; return e; } +function originFor(url) { return `${url.protocol}//${url.host}`; } +function pathFor(url) { return `${url.pathname}${url.search}`; } +function filterH2RequestHeaders(headers) { + const filtered = {}; + if (!headers) return filtered; + for (const [rawName, rawValue] of Object.entries(headers)) { + if (rawValue == null) continue; + const name = rawName.toLowerCase(); + if (name === 'connection' || name === 'keep-alive' || name === 'proxy-connection' || name === 'transfer-encoding' || name === 'upgrade' || name === 'host') continue; + filtered[name] = String(rawValue); + } + return filtered; +} +function toH2ResponseHeaders(headers) { + const responseHeaders = new Headers(); + for (const [name, value] of Object.entries(headers)) { + if (name.startsWith(':') || value === undefined) continue; + if (Array.isArray(value)) { for (const item of value) responseHeaders.append(name, String(item)); } + else responseHeaders.append(name, String(value)); + } + return responseHeaders; +} +function getPool(url) { + const origin = originFor(url); + let pool = pools.get(origin); + if (!pool) { pool = new H2Pool(origin); pools.set(origin, pool); } + return pool; +} +class H2Pool { + constructor(origin) { this.origin = origin; this.sessions = []; this.waiters = []; } + async request(url, init) { + const entry = await this.acquire(); + try { await entry.ready; } catch (error) { this.release(entry); throw error; } + if (entry.alpnProtocol !== 'h2') { + this.release(entry); + entry.session.close(); + throw new Error(`HTTP/2 was not negotiated; ALPN=${String(entry.alpnProtocol || 'none')}`); + } + return this.dispatch(entry, url, init); + } + acquire() { + const existing = this.sessions.find((entry) => !entry.closed && entry.activeStreams < MAX_H2_STREAMS_PER_SESSION); + if (existing) { + if (existing.idleTimer) { clearTimeout(existing.idleTimer); existing.idleTimer = undefined; } + existing.session.ref?.(); + existing.activeStreams++; + return Promise.resolve(existing); + } + if (this.sessions.filter((entry) => !entry.closed).length < MAX_H2_SESSIONS) { + const created = this.createSession(); + created.session.ref?.(); + created.activeStreams++; + return Promise.resolve(created); + } + return new Promise((resolve) => { this.waiters.push(resolve); }); + } + createSession() { + const session = http2.connect(this.origin, { ALPNProtocols: ['h2', 'http/1.1'] }); + const entry = { session, activeStreams: 0, ready: Promise.resolve(), alpnProtocol: undefined, closed: false, idleTimer: undefined }; + entry.ready = new Promise((resolve, reject) => { + session.once('connect', () => { + entry.alpnProtocol = session.socket.alpnProtocol; + connectedChannel.publish({ socket: session.socket }); + resolve(); + }); + session.once('error', reject); + }); + session.once('close', () => { + entry.closed = true; + if (entry.idleTimer) clearTimeout(entry.idleTimer); + this.sessions = this.sessions.filter((item) => item !== entry); + this.drainWaiters(); + }); + session.on('error', () => { entry.closed = true; }); + this.sessions.push(entry); + return entry; + } + dispatch(entry, url, init) { + return new Promise((resolve, reject) => { + if (init.signal?.aborted) { this.release(entry); reject(abortError()); return; } + const body = normalizeBody(init.body); + const requestHeaders = { + ...filterH2RequestHeaders(init.headers), + [http2.constants.HTTP2_HEADER_METHOD]: init.method ?? 'GET', + [http2.constants.HTTP2_HEADER_SCHEME]: url.protocol.slice(0, -1), + [http2.constants.HTTP2_HEADER_AUTHORITY]: url.host, + [http2.constants.HTTP2_HEADER_PATH]: pathFor(url), + }; + const stream = entry.session.request(requestHeaders, { endStream: body === undefined }); + let settled = false, released = false; + const releaseOnce = () => { if (released) return; released = true; this.release(entry); }; + const rejectOnce = (error) => { if (settled) return; settled = true; releaseOnce(); reject(error); }; + const onAbort = () => { stream.close(http2.constants.NGHTTP2_CANCEL); rejectOnce(abortError()); }; + init.signal?.addEventListener('abort', onAbort, { once: true }); + stream.once('response', (headers) => { + if (settled) return; + settled = true; + const status = Number(headers[http2.constants.HTTP2_HEADER_STATUS] ?? 0); + const responseBody = statusMustNotHaveBody(status) || init.method === 'HEAD' ? null : stream; + if (responseBody === null) stream.resume(); + const response = new Response(responseBody, { status, headers: toH2ResponseHeaders(headers) }); + Object.defineProperty(response, 'url', { value: String(url) }); + stream.once('end', releaseOnce); + stream.once('close', releaseOnce); + stream.once('error', releaseOnce); + resolve(response); + }); + stream.once('error', (error) => { rejectOnce(error); }); + stream.once('close', () => { + init.signal?.removeEventListener('abort', onAbort); + if (!settled) rejectOnce(new Error('HTTP/2 stream closed before response headers')); + }); + if (body instanceof Readable) { body.once('error', (error) => stream.destroy(error)); body.pipe(stream); } + else if (body !== undefined) { stream.end(body); } + }); + } + release(entry) { + entry.activeStreams = Math.max(0, entry.activeStreams - 1); + this.drainWaiters(); + if (entry.activeStreams === 0 && !entry.closed && !entry.idleTimer) { + entry.session.unref?.(); + entry.idleTimer = setTimeout(() => { entry.session.close(); }, KEEP_ALIVE_TIMEOUT_MS); + entry.idleTimer.unref?.(); + } + } + drainWaiters() { + while (this.waiters.length > 0) { + const entry = this.sessions.find((candidate) => !candidate.closed && candidate.activeStreams < MAX_H2_STREAMS_PER_SESSION); + if (!entry) return; + if (entry.idleTimer) { clearTimeout(entry.idleTimer); entry.idleTimer = undefined; } + entry.session.ref?.(); + entry.activeStreams++; + const resolve = this.waiters.shift(); + resolve?.(entry); + } + } +} +async function undiciFallbackFetch(url, init) { + const result = await undiciRequest(url, { ...init, body: normalizeBody(init.body), dispatcher: h1Dispatcher }); + const responseBody = statusMustNotHaveBody(result.statusCode) || init.method === 'HEAD' ? null : result.body; + if (responseBody === null) await result.body.dump(); + const response = new Response(responseBody, { status: result.statusCode, headers: toResponseHeaders(result.headers) }); + Object.defineProperty(response, 'url', { value: String(url) }); + return response; +} +const fetch792 = async (url, init) => { + const { agent: _a, body: rawBody, duplex: _d, redirect, signal, ...rest } = init ?? {}; + const requestURL = new URL(String(url)); + const requestInit = { ...rest, body: rawBody, maxRedirections: redirect === 'manual' || redirect === 'error' ? 0 : 20, signal: signal ?? undefined }; + if (requestURL.protocol !== 'https:') return undiciFallbackFetch(requestURL, requestInit); + try { + return await getPool(requestURL).request(requestURL, requestInit); + } catch (error) { + if (error instanceof Error && /^HTTP\/2 was not negotiated/.test(error.message)) return undiciFallbackFetch(requestURL, requestInit); + throw error; + } +}; +// ════════════════════════════════ end #792 copy ════════════════════════════════ + +// ───────────────────────────────── local h2 server ───────────────────────────────── +function makeCert() { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'h2cert-')); + const keyPath = path.join(dir, 'key.pem'), certPath = path.join(dir, 'cert.pem'); + execFileSync('openssl', ['req', '-x509', '-newkey', 'rsa:2048', '-nodes', '-keyout', keyPath, '-out', certPath, + '-days', '1', '-subj', '/CN=localhost', '-addext', 'subjectAltName=DNS:localhost,IP:127.0.0.1'], { stdio: 'ignore' }); + return { key: fs.readFileSync(keyPath), cert: fs.readFileSync(certPath), dir }; +} + +const counters = { tls: 0, sessions: 0, reqs: 0 }; +function startServer() { + const { key, cert } = makeCert(); + const server = http2.createSecureServer({ key, cert, allowHTTP1: true, settings: { maxConcurrentStreams: MAX_STREAMS } }); + server.on('secureConnection', () => counters.tls++); + server.on('session', () => counters.sessions++); + // Use ONLY the http1/http2 compatibility `request` API — it serves BOTH h1 and h2 + // and responds exactly once per request (no 'stream' listener => no double-respond). + server.on('request', (req, res) => { + counters.reqs++; + req.resume(); + const send = () => { res.writeHead(200, { 'content-type': 'application/json' }); res.end('{"ok":true}'); }; + SERVER_DELAY_MS ? setTimeout(send, SERVER_DELAY_MS) : send(); + }); + return new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => resolve({ server, port: server.address().port })); + }); +} + +// ───────────────────────────────── load driver ───────────────────────────────── +function pct(sorted, p) { return sorted.length ? sorted[Math.min(sorted.length - 1, Math.ceil((p / 100) * sorted.length) - 1)] : 0; } +function r(n) { return Math.round(n); } + +async function runPass(name, fetchFn, url) { + const snap = { ...counters }; + const crashStart = crashes; + const lat = []; + let fails = 0; + const t0 = performance.now(); + await Promise.all(Array.from({ length: REQUESTS }, async () => { + const r0 = performance.now(); + try { + const res = await fetchFn(url, { method: 'GET', headers: { accept: 'application/json' } }); + await res.text(); // consume body so the stream completes & releases + if (!res.ok) fails++; + } catch { fails++; } + lat.push(performance.now() - r0); + })); + const wall = performance.now() - t0; + await new Promise((res) => setTimeout(res, 300)); // let late async H2 uncaught errors surface + lat.sort((a, b) => a - b); + return { + transport: name, + tlsConns: counters.tls - snap.tls, + h2sessions: counters.sessions - snap.sessions, + reqsServed: counters.reqs - snap.reqs, + fails, + crashes: crashes - crashStart, + p50: r(pct(lat, 50)), p90: r(pct(lat, 90)), p99: r(pct(lat, 99)), max: r(lat.at(-1) ?? 0), + wallMs: r(wall), + }; +} + +async function main() { + console.log(`node ${process.version} | undici ${(await import('undici/package.json', { with: { type: 'json' } })).default.version} | REQUESTS=${REQUESTS} concurrent | server delay=${SERVER_DELAY_MS}ms | server maxConcurrentStreams=${MAX_STREAMS}\n`); + const { server, port } = await startServer(); + const url = `https://127.0.0.1:${port}/health`; + + // Bounded undici Pool with allowH2, default pipelining=1 — pools connections but does NOT + // multiplex streams (one in-flight request per session) → as slow as h1. + const pool4 = new Pool(`https://127.0.0.1:${port}`, { allowH2: true, connections: 4, keepAliveTimeout: 600000, keepAliveMaxTimeout: 600000 }); + const fetchPool4 = (u, init) => { const { agent: _a, signal, ...rest } = init ?? {}; return undiciFetchImpl(u, { ...rest, dispatcher: pool4, signal: signal ?? undefined }); }; + + // Bounded undici Pool with pipelining>1 — the approach this branch ships: real H2 stream + // multiplexing over a few sessions. 4 x 64 matches src/lib/undici-fetch.ts. + // (On undici < 7.23.0 this assert-crashes; on >= 7.23.0 it is fast and stable.) + const pool4pipe = new Pool(`https://127.0.0.1:${port}`, { allowH2: true, connections: 4, pipelining: 64, keepAliveTimeout: 600000, keepAliveMaxTimeout: 600000 }); + const fetchPool4pipe = (u, init) => { const { agent: _a, signal, ...rest } = init ?? {}; return undiciFetchImpl(u, { ...rest, dispatcher: pool4pipe, signal: signal ?? undefined }); }; + + // Same, via Agent (what the SDK actually constructs) instead of a single-origin Pool. + const agentPipe = new Agent({ allowH2: true, connections: 4, pipelining: 64, keepAliveTimeout: 600000, keepAliveMaxTimeout: 600000 }); + const fetchAgentPipe = (u, init) => { const { agent: _a, signal, ...rest } = init ?? {}; return undiciFetchImpl(u, { ...rest, dispatcher: agentPipe, signal: signal ?? undefined }); }; + + const all = [ + ['h1', 'undici-h1 (baseline)', fetchH1], + ['791', 'undici-allowH2 Agent (#791)', fetch791], + ['pool', 'undici-Pool allowH2 conns=4 (pipelining=1)', fetchPool4], + ['poolpipe', 'undici-Pool conns=4 pipelining=64', fetchPool4pipe], + ['agentpipe', 'undici-Agent conns=4 pipelining=64', fetchAgentPipe], + ['792', 'node-http2-pool (#792)', fetch792], + ]; + const only = process.env.ONLY ? process.env.ONLY.split(',') : null; + const selected = only ? all.filter(([k]) => only.includes(k)) : all; + const rows = []; + for (const [, name, fn] of selected) rows.push(await runPass(name, fn, url)); + + console.log('Each row = ' + REQUESTS + ' requests fired concurrently:\n'); + console.table( + rows.map((x) => ({ + transport: x.transport, + 'TLS conns opened': x.tlsConns, + 'h2 sessions': x.h2sessions, + 'reqs served': x.reqsServed, + fails: x.fails, + crashes: x.crashes, + 'p50 ms': x.p50, 'p90 ms': x.p90, 'p99 ms': x.p99, + 'wall ms': x.wallMs, + })), + ); + console.log(`\ntotal process-crashing errors: ${crashes} (unique: ${[...crashMsgs].join(' | ') || 'none'})`); + await pool4.close().catch(() => {}); + await pool4pipe.close().catch(() => {}); + await agentPipe.close().catch(() => {}); + console.log('\nKey question: for ' + REQUESTS + ' concurrent requests, how many TLS conns / h2 sessions did #791 open vs #792?'); + console.log(' - If #791 opens few sessions & is as fast as #792 -> #791 already has the perf bump.'); + console.log(' - If #791 opens ~1 per request or is far slower -> it does not multiplex.'); + + await agent791.close().catch(() => {}); + await agentH1.close().catch(() => {}); + await h1Dispatcher.close().catch(() => {}); + for (const pool of pools.values()) for (const e of pool.sessions) try { e.session.close(); } catch {} + server.close(); + setTimeout(() => process.exit(0), 200).unref(); +} + +main().catch((e) => { console.error('bench failed:', e); process.exit(1); }); diff --git a/tests/smoketests/scripts/verify-http2.mjs b/tests/smoketests/scripts/verify-http2.mjs index 8120f7f35..cfb2b480f 100644 --- a/tests/smoketests/scripts/verify-http2.mjs +++ b/tests/smoketests/scripts/verify-http2.mjs @@ -32,7 +32,9 @@ if (!apiKey) { // keyed by name globally, so this catches the SDK's own undici regardless of // which module instance created the connection. const alpnSeen = []; +let connectCount = 0; diagnostics_channel.subscribe('undici:client:connected', (msg) => { + connectCount++; const proto = msg?.socket?.alpnProtocol; if (proto) alpnSeen.push(proto); }); @@ -76,5 +78,22 @@ try { check(false, `h1: GET devboxes.list resolved (threw ${e?.constructor?.name}: ${e?.message})`); } +// ── Pass D: HTTP/2 multiplexing — many concurrent requests reuse few connections ── +// The whole point of the bounded H2 pool: N concurrent requests share a small number of +// TLS sessions instead of one connection per request. Default config (pipelining=1) or the +// pre-fix undici Agent would open ~N connections here. +try { + const N = 25; + const before = connectCount; + const client = newClient({ http2: true }); + const results = await Promise.allSettled(Array.from({ length: N }, () => client.devboxes.list({ limit: 1 }))); + const ok = results.filter((r) => r.status === 'fulfilled').length; + const opened = connectCount - before; + check(ok === N, `h2: ${N} concurrent requests all resolved (${ok}/${N})`); + check(opened <= 4, `h2: ${N} concurrent requests multiplexed over <= 4 connections (opened ${opened})`); +} catch (e) { + check(false, `h2: concurrent multiplexing pass threw ${e?.constructor?.name}: ${e?.message}`); +} + console.log(failures === 0 ? '\n✓ ALL HTTP/2 CHECKS PASSED' : `\n✗ ${failures} CHECK(S) FAILED`); process.exit(failures === 0 ? 0 : 1); diff --git a/yarn.lock b/yarn.lock index 93a5f5d29..2b029521b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3844,10 +3844,10 @@ undici-types@~5.26.4: resolved "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz" integrity sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA== -undici@^6.21.0: - version "6.26.0" - resolved "https://registry.yarnpkg.com/undici/-/undici-6.26.0.tgz#333a35b7f519c48d2dc6aeb38e4e91d9274e0652" - integrity sha512-4yqz8a3n5HmGTlsbADNtr/dJlhkh/55Rq798G6ibiULcXbDtaLpTl1pvdqcbFfeoj3iSi52lePFM7h9H21cw/A== +undici@^7.26.0: + version "7.26.0" + resolved "https://registry.yarnpkg.com/undici/-/undici-7.26.0.tgz#d413a2b5752e3e71e003bb268dec32b9a0ad0ce7" + integrity sha512-3O9Tf67pGhgOv9jM35AbhkXAKi13f3oy3aE4CSgr+TckGeY+/iu97ZXN+J7DpHPzLbVApFd1IFhcnBjREYXYcg== universalify@^2.0.0: version "2.0.1" From 882d0f86a9393adbbefee101f2173534b8d05eca Mon Sep 17 00:00:00 2001 From: Siddarth Chalasani Date: Fri, 29 May 2026 15:45:36 -0700 Subject: [PATCH 2/4] fix(http2): drop root engines field that broke the node-18 lint install yarn classic hard-errors on the ROOT package's own engines during yarn install even without engine-strict, so engines.node>=20.18.1 failed the node-18 lint job. Keep the undici 7 bump + the README Node-floor docs; undici's transitive engine constraint still warns node-18 installers. Co-Authored-By: Claude Opus 4.8 --- package.json | 3 --- 1 file changed, 3 deletions(-) diff --git a/package.json b/package.json index cef690fc1..6512ad39b 100644 --- a/package.json +++ b/package.json @@ -9,9 +9,6 @@ "repository": "github:runloopai/api-client-ts", "license": "MIT", "packageManager": "yarn@1.22.22", - "engines": { - "node": ">=20.18.1" - }, "files": [ "**/*" ], From e80135dc139ca601385620439b1c80c76e9e9ac0 Mon Sep 17 00:00:00 2001 From: Siddarth Chalasani Date: Fri, 29 May 2026 15:48:41 -0700 Subject: [PATCH 3/4] ci: run the lint job on Node 20 (undici 7 can't install on Node 18) The only node-18 job in CI was lint; undici 7's engine constraint hard-fails yarn install there. All other jobs already run on Node >= 20. Bump lint to Node 20 to match the build job and the SDK's documented Node floor. Co-Authored-By: Claude Opus 4.8 --- .github/workflows/ci.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dd296ba19..4e0dd560b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,9 @@ jobs: - name: Set up Node uses: runloopai/setup-node@main with: - node-version: '18' + # undici 7 (the http2 transport dep) requires Node >= 20.18.1, so the + # lint job can no longer bootstrap on Node 18. See README Requirements. + node-version: '20' - name: Bootstrap run: ./scripts/bootstrap From 33f8f01c22a44586af7a6ab7d84b759ec37e32a7 Mon Sep 17 00:00:00 2001 From: Siddarth Chalasani Date: Fri, 29 May 2026 15:58:52 -0700 Subject: [PATCH 4/4] refactor(http2): trim test footprint to match the 2-line change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The behavioral change is 2 lines (connections + pipelining). Drop the 369-line manual bench harness (its numbers live in the PR description) and the hermetic TLS-multiplexing/abort cases — the multiplexing case exercised a config-equivalent undici Agent (testing undici, not our adapter), and multiplexing stays covered by verify-http2.mjs Pass D against the real API. Keep the normalizeBody unit tests; un-export the now-unused test-only symbols from the adapter. Co-Authored-By: Claude Opus 4.8 --- src/lib/undici-fetch.ts | 14 +- tests/lib/undici-fetch.test.ts | 178 ++------- .../smoketests/scripts/h2-transport-bench.mjs | 369 ------------------ 3 files changed, 30 insertions(+), 531 deletions(-) delete mode 100644 tests/smoketests/scripts/h2-transport-bench.mjs diff --git a/src/lib/undici-fetch.ts b/src/lib/undici-fetch.ts index bfe87a3c5..61fad7aa4 100644 --- a/src/lib/undici-fetch.ts +++ b/src/lib/undici-fetch.ts @@ -33,9 +33,8 @@ import { type Fetch } from '../core'; const KEEP_ALIVE_TIMEOUT_MS = 10 * 60 * 1000; // Bound the pool to a few TLS sessions per origin and multiplex many H2 streams // over each. 4 x 64 = 256 concurrent requests in flight before undici queues the rest. -// Exported so tests can verify the shipped values actually multiplex. @internal -export const H2_MAX_CONNECTIONS = 4; -export const H2_MAX_CONCURRENT_STREAMS = 64; +const H2_MAX_CONNECTIONS = 4; +const H2_MAX_CONCURRENT_STREAMS = 64; // One module-scoped dispatcher, reused across requests: a bounded HTTP/2 pool with // keep-alive. `allowH2` negotiates h2 over TLS via ALPN and transparently falls back @@ -98,12 +97,3 @@ export const undiciFetch: Fetch = async (url, init) => { }; export default undiciFetch; - -/** - * Test-only: close the module-scoped dispatcher so a test process can exit cleanly - * after exercising the adapter. Not part of the public API. - * @internal - */ -export async function __closeDispatchersForTest(): Promise { - await h2Dispatcher.close(); -} diff --git a/tests/lib/undici-fetch.test.ts b/tests/lib/undici-fetch.test.ts index 9cc3f1925..754c0528c 100644 --- a/tests/lib/undici-fetch.test.ts +++ b/tests/lib/undici-fetch.test.ts @@ -1,161 +1,39 @@ -import http from 'node:http'; -import http2 from 'node:http2'; -import os from 'node:os'; -import fs from 'node:fs'; -import path from 'node:path'; -import { execFileSync } from 'node:child_process'; import { Readable } from 'node:stream'; -import diagnostics_channel from 'node:diagnostics_channel'; -import { Agent, fetch as undiciFetchImpl } from 'undici'; -import { - undiciFetch, - normalizeBody, - __closeDispatchersForTest, - H2_MAX_CONNECTIONS, - H2_MAX_CONCURRENT_STREAMS, -} from '../../src/lib/undici-fetch'; +import { normalizeBody } from '../../src/lib/undici-fetch'; import { MultipartBody } from '../../src/_shims/MultipartBody'; -describe('undiciFetch adapter', () => { - // ── normalizeBody: maps core.ts's body shapes onto valid undici BodyInit ── - describe('normalizeBody', () => { - test('passes string / Buffer / typed array through unchanged (non-stream)', () => { - expect(normalizeBody('hi')).toEqual({ body: 'hi', isStream: false }); - const buf = Buffer.from('b'); - expect(normalizeBody(buf)).toEqual({ body: buf, isStream: false }); - const u8 = new Uint8Array([1, 2]); - expect(normalizeBody(u8)).toEqual({ body: u8, isStream: false }); - }); - - test('wraps an ArrayBuffer in a Buffer', () => { - const out = normalizeBody(new Uint8Array([1, 2, 3]).buffer); - expect(out.isStream).toBe(false); - expect(Buffer.isBuffer(out.body)).toBe(true); - }); - - test('returns an undefined body for null / undefined', () => { - expect(normalizeBody(null)).toEqual({ body: undefined, isStream: false }); - expect(normalizeBody(undefined)).toEqual({ body: undefined, isStream: false }); - }); - - test('converts a Node Readable to a web ReadableStream and flags isStream', () => { - const out = normalizeBody(Readable.from(['x'])); - expect(out.isStream).toBe(true); - expect(typeof out.body.getReader).toBe('function'); // WHATWG ReadableStream - }); - - test('unwraps MultipartBody to its inner stream', () => { - const out = normalizeBody(new MultipartBody(Readable.from(['x']))); - expect(out.isStream).toBe(true); - expect(typeof out.body.getReader).toBe('function'); - }); +// The adapter's only non-trivial logic: mapping the body shapes core.ts produces onto a valid +// undici BodyInit. End-to-end behavior over both transports is covered by the smoke matrix +// (http1/http2) and verify-http2.mjs; this just pins the shape-conversion rules. +describe('undiciFetch / normalizeBody', () => { + test('passes string / Buffer / typed array through unchanged (non-stream)', () => { + expect(normalizeBody('hi')).toEqual({ body: 'hi', isStream: false }); + const buf = Buffer.from('b'); + expect(normalizeBody(buf)).toEqual({ body: buf, isStream: false }); + const u8 = new Uint8Array([1, 2]); + expect(normalizeBody(u8)).toEqual({ body: u8, isStream: false }); }); - // ── multiplexing: the shipped pool config multiplexes streams over few sessions ── - // jest workers don't honor a runtime NODE_TLS_REJECT_UNAUTHORIZED, so the adapter's - // module-scoped Agent can't reach a self-signed cert here. Instead we build an Agent from - // the adapter's *exported* constants with an explicit `connect.rejectUnauthorized:false` - // (which jest does honor) — verifying the values the adapter ships actually multiplex. - describe('HTTP/2 multiplexing (shipped pool config)', () => { - let server: http2.Http2SecureServer; - let url: string; - let agent: Agent; - const state = { sessions: 0, inFlight: 0, maxInFlight: 0 }; - const alpn: string[] = []; - const onConnected = (m: unknown) => { - const p = (m as { socket?: { alpnProtocol?: string } } | undefined)?.socket?.alpnProtocol; - if (p) alpn.push(p); - }; - - beforeAll(async () => { - diagnostics_channel.subscribe('undici:client:connected', onConnected); - const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'h2cert-')); - const k = path.join(dir, 'k'); - const c = path.join(dir, 'c'); - execFileSync( - 'openssl', - // prettier-ignore - ['req','-x509','-newkey','rsa:2048','-nodes','-keyout',k,'-out',c,'-days','1','-subj','/CN=localhost','-addext','subjectAltName=DNS:localhost,IP:127.0.0.1'], - { stdio: 'ignore' }, - ); - server = http2.createSecureServer({ - key: fs.readFileSync(k), - cert: fs.readFileSync(c), - settings: { maxConcurrentStreams: 200 }, - }); - server.on('session', () => { - state.sessions++; - }); - server.on('request', (_req, res) => { - state.inFlight++; - state.maxInFlight = Math.max(state.maxInFlight, state.inFlight); - setTimeout(() => { - state.inFlight--; - res.writeHead(200, { 'content-type': 'application/json' }); - res.end('{"ok":true}'); - }, 25); - }); - await new Promise((r) => server.listen(0, '127.0.0.1', () => r())); - url = `https://127.0.0.1:${(server.address() as { port: number }).port}/`; - agent = new Agent({ - allowH2: true, - connections: H2_MAX_CONNECTIONS, - pipelining: H2_MAX_CONCURRENT_STREAMS, - connect: { rejectUnauthorized: false }, - }); - }); - - afterAll(async () => { - diagnostics_channel.unsubscribe('undici:client:connected', onConnected); - await agent.close(); - await new Promise((r) => server.close(() => r())); - }); - - test('N concurrent requests reuse <= H2_MAX_CONNECTIONS sessions and overlap', async () => { - const N = 50; - const responses = await Promise.all( - Array.from({ length: N }, () => undiciFetchImpl(url, { dispatcher: agent })), - ); - const bodies = await Promise.all(responses.map((r) => r.json())); - - expect(bodies).toHaveLength(N); - expect((bodies as Array<{ ok?: boolean }>).every((b) => b.ok === true)).toBe(true); - expect(alpn).toContain('h2'); // really h2, not an h1 fallback - expect(state.sessions).toBeLessThanOrEqual(H2_MAX_CONNECTIONS); // bounded pool, not 1-per-request - // Genuine stream concurrency: many requests in flight at once. A `pipelining: 1` regression - // would cap maxInFlight at the connection count, so this is what guards the multiplexing. - expect(state.maxInFlight).toBeGreaterThan(H2_MAX_CONNECTIONS); - }); + test('wraps an ArrayBuffer in a Buffer', () => { + const out = normalizeBody(new Uint8Array([1, 2, 3]).buffer); + expect(out.isStream).toBe(false); + expect(Buffer.isBuffer(out.body)).toBe(true); }); - // ── abort: the real adapter forwards the AbortSignal (plain http, no TLS needed) ── - describe('abort handling (real undiciFetch)', () => { - let server: http.Server; - let base: string; - - beforeAll(async () => { - server = http.createServer((req, res) => { - const timer = setTimeout(() => { - res.writeHead(200); - res.end('{}'); - }, 10_000); - timer.unref?.(); - req.on('close', () => clearTimeout(timer)); - }); - await new Promise((r) => server.listen(0, '127.0.0.1', () => r())); - base = `http://127.0.0.1:${(server.address() as { port: number }).port}`; - }); + test('returns an undefined body for null / undefined', () => { + expect(normalizeBody(null)).toEqual({ body: undefined, isStream: false }); + expect(normalizeBody(undefined)).toEqual({ body: undefined, isStream: false }); + }); - afterAll(async () => { - await __closeDispatchersForTest(); - await new Promise((r) => server.close(() => r())); - }); + test('converts a Node Readable to a web ReadableStream and flags isStream', () => { + const out = normalizeBody(Readable.from(['x'])); + expect(out.isStream).toBe(true); + expect(typeof out.body.getReader).toBe('function'); // WHATWG ReadableStream + }); - test('rejects with AbortError when the request signal is aborted', async () => { - const controller = new AbortController(); - const promise = undiciFetch(`${base}/slow`, { method: 'GET', signal: controller.signal } as any); - setTimeout(() => controller.abort(), 50); - await expect(promise).rejects.toMatchObject({ name: 'AbortError' }); - }); + test('unwraps MultipartBody to its inner stream', () => { + const out = normalizeBody(new MultipartBody(Readable.from(['x']))); + expect(out.isStream).toBe(true); + expect(typeof out.body.getReader).toBe('function'); }); }); diff --git a/tests/smoketests/scripts/h2-transport-bench.mjs b/tests/smoketests/scripts/h2-transport-bench.mjs deleted file mode 100644 index ab223c3c9..000000000 --- a/tests/smoketests/scripts/h2-transport-bench.mjs +++ /dev/null @@ -1,369 +0,0 @@ -/** - * HTTP/2 transport micro-benchmark — settles "#791 lacks the perf bump that #792 adds". - * - * Runs entirely locally against a self-signed node:http2 server (no API key, no network). - * The SERVER counts how many TLS connections / H2 sessions each transport opens for N - * concurrent requests — that is the ground truth for "does it multiplex?". We also record - * latency percentiles, wall time, failures, and whether a transport crashes the process. - * - * Transports under test (faithful to each branch's src/lib/undici-fetch.ts): - * - undici-allowH2 = PR #791 (undici Agent({ allowH2:true }), default pipelining=1 → no multiplexing) - * - undici-Pool/Agent pipelining=64 = THIS branch (bounded pool that multiplexes; needs undici >= 7.23.0) - * - node-http2-pool = PR #792 (verbatim copy of the hand-rolled H2Pool, types stripped) - * - undici-h1 = baseline (undici Agent({ allowH2:false, connections:4 })) - * - * Usage (needs undici resolvable from cwd; >= 7.23.0 for the pipelining variants — multiplexed - * H2 assert-crashes on 6.x, see undici PR #4845): - * node h2-transport-bench.mjs - * REQUESTS=2000 SERVER_DELAY_MS=25 MAX_STREAMS=100 node h2-transport-bench.mjs - * ONLY=poolpipe,agentpipe,792 node h2-transport-bench.mjs - */ -process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; // localhost self-signed cert only - -import http2 from 'node:http2'; -import fs from 'node:fs'; -import os from 'node:os'; -import path from 'node:path'; -import diagnosticsChannel from 'node:diagnostics_channel'; -import { execFileSync } from 'node:child_process'; -import { performance } from 'node:perf_hooks'; -import { Readable } from 'node:stream'; -import { Agent, Pool, Headers, Response, fetch as undiciFetchImpl, request as undiciRequest } from 'undici'; - -const REQUESTS = Number(process.env.REQUESTS ?? 1000); -const SERVER_DELAY_MS = Number(process.env.SERVER_DELAY_MS ?? 20); -const MAX_STREAMS = Number(process.env.MAX_STREAMS ?? 100); // server's SETTINGS_MAX_CONCURRENT_STREAMS - -let crashes = 0; -const crashMsgs = new Set(); -process.on('uncaughtException', (e) => { crashes++; crashMsgs.add(String(e?.message ?? e)); }); -process.on('unhandledRejection', (e) => { crashes++; crashMsgs.add(String(e?.message ?? e)); }); - -// ───────────────────────── transport #791: undici Agent allowH2 ───────────────────────── -const agent791 = new Agent({ allowH2: true, keepAliveTimeout: 600000, keepAliveMaxTimeout: 600000 }); -function fetch791(url, init) { - const { agent: _a, signal, ...rest } = init ?? {}; - return undiciFetchImpl(url, { ...rest, dispatcher: agent791, signal: signal ?? undefined }); -} - -// ───────────────────────── baseline: undici h1 (4 connections) ───────────────────────── -const agentH1 = new Agent({ allowH2: false, connections: 4, keepAliveTimeout: 600000, keepAliveMaxTimeout: 600000 }); -function fetchH1(url, init) { - const { agent: _a, signal, ...rest } = init ?? {}; - return undiciFetchImpl(url, { ...rest, dispatcher: agentH1, signal: signal ?? undefined }); -} - -// ════════════════ transport #792: verbatim copy of H2Pool (types stripped) ════════════════ -// Source: PR #792 tode/http2-updates-shared-pool-test:src/lib/undici-fetch.ts. Re-sync if it changes. -class MultipartBody { constructor(body) { this.body = body; } } -const MAX_H2_SESSIONS = 4; -const MAX_H2_STREAMS_PER_SESSION = 64; -const KEEP_ALIVE_TIMEOUT_MS = 10 * 60 * 1000; -const h1Dispatcher = new Agent({ allowH2: false, connections: 4, keepAliveTimeout: KEEP_ALIVE_TIMEOUT_MS, keepAliveMaxTimeout: KEEP_ALIVE_TIMEOUT_MS }); -const connectedChannel = diagnosticsChannel.channel('undici:client:connected'); -const pools = new Map(); -function normalizeBody(body) { - if (body == null) return undefined; - if (typeof body === 'string') return body; - if (Buffer.isBuffer(body)) return body; - if (body instanceof MultipartBody) return normalizeBody(body.body); - if (body instanceof Readable) return body; - if (ArrayBuffer.isView(body)) return body; - if (body instanceof ArrayBuffer) return Buffer.from(body); - return String(body); -} -function toResponseHeaders(headers) { - const responseHeaders = new Headers(); - for (const [name, value] of Object.entries(headers)) { - if (value === undefined) continue; - if (Array.isArray(value)) { for (const item of value) responseHeaders.append(name, item); } - else responseHeaders.append(name, value); - } - return responseHeaders; -} -function statusMustNotHaveBody(status) { return status === 204 || status === 205 || status === 304; } -function abortError() { const e = new Error('The operation was aborted'); e.name = 'AbortError'; return e; } -function originFor(url) { return `${url.protocol}//${url.host}`; } -function pathFor(url) { return `${url.pathname}${url.search}`; } -function filterH2RequestHeaders(headers) { - const filtered = {}; - if (!headers) return filtered; - for (const [rawName, rawValue] of Object.entries(headers)) { - if (rawValue == null) continue; - const name = rawName.toLowerCase(); - if (name === 'connection' || name === 'keep-alive' || name === 'proxy-connection' || name === 'transfer-encoding' || name === 'upgrade' || name === 'host') continue; - filtered[name] = String(rawValue); - } - return filtered; -} -function toH2ResponseHeaders(headers) { - const responseHeaders = new Headers(); - for (const [name, value] of Object.entries(headers)) { - if (name.startsWith(':') || value === undefined) continue; - if (Array.isArray(value)) { for (const item of value) responseHeaders.append(name, String(item)); } - else responseHeaders.append(name, String(value)); - } - return responseHeaders; -} -function getPool(url) { - const origin = originFor(url); - let pool = pools.get(origin); - if (!pool) { pool = new H2Pool(origin); pools.set(origin, pool); } - return pool; -} -class H2Pool { - constructor(origin) { this.origin = origin; this.sessions = []; this.waiters = []; } - async request(url, init) { - const entry = await this.acquire(); - try { await entry.ready; } catch (error) { this.release(entry); throw error; } - if (entry.alpnProtocol !== 'h2') { - this.release(entry); - entry.session.close(); - throw new Error(`HTTP/2 was not negotiated; ALPN=${String(entry.alpnProtocol || 'none')}`); - } - return this.dispatch(entry, url, init); - } - acquire() { - const existing = this.sessions.find((entry) => !entry.closed && entry.activeStreams < MAX_H2_STREAMS_PER_SESSION); - if (existing) { - if (existing.idleTimer) { clearTimeout(existing.idleTimer); existing.idleTimer = undefined; } - existing.session.ref?.(); - existing.activeStreams++; - return Promise.resolve(existing); - } - if (this.sessions.filter((entry) => !entry.closed).length < MAX_H2_SESSIONS) { - const created = this.createSession(); - created.session.ref?.(); - created.activeStreams++; - return Promise.resolve(created); - } - return new Promise((resolve) => { this.waiters.push(resolve); }); - } - createSession() { - const session = http2.connect(this.origin, { ALPNProtocols: ['h2', 'http/1.1'] }); - const entry = { session, activeStreams: 0, ready: Promise.resolve(), alpnProtocol: undefined, closed: false, idleTimer: undefined }; - entry.ready = new Promise((resolve, reject) => { - session.once('connect', () => { - entry.alpnProtocol = session.socket.alpnProtocol; - connectedChannel.publish({ socket: session.socket }); - resolve(); - }); - session.once('error', reject); - }); - session.once('close', () => { - entry.closed = true; - if (entry.idleTimer) clearTimeout(entry.idleTimer); - this.sessions = this.sessions.filter((item) => item !== entry); - this.drainWaiters(); - }); - session.on('error', () => { entry.closed = true; }); - this.sessions.push(entry); - return entry; - } - dispatch(entry, url, init) { - return new Promise((resolve, reject) => { - if (init.signal?.aborted) { this.release(entry); reject(abortError()); return; } - const body = normalizeBody(init.body); - const requestHeaders = { - ...filterH2RequestHeaders(init.headers), - [http2.constants.HTTP2_HEADER_METHOD]: init.method ?? 'GET', - [http2.constants.HTTP2_HEADER_SCHEME]: url.protocol.slice(0, -1), - [http2.constants.HTTP2_HEADER_AUTHORITY]: url.host, - [http2.constants.HTTP2_HEADER_PATH]: pathFor(url), - }; - const stream = entry.session.request(requestHeaders, { endStream: body === undefined }); - let settled = false, released = false; - const releaseOnce = () => { if (released) return; released = true; this.release(entry); }; - const rejectOnce = (error) => { if (settled) return; settled = true; releaseOnce(); reject(error); }; - const onAbort = () => { stream.close(http2.constants.NGHTTP2_CANCEL); rejectOnce(abortError()); }; - init.signal?.addEventListener('abort', onAbort, { once: true }); - stream.once('response', (headers) => { - if (settled) return; - settled = true; - const status = Number(headers[http2.constants.HTTP2_HEADER_STATUS] ?? 0); - const responseBody = statusMustNotHaveBody(status) || init.method === 'HEAD' ? null : stream; - if (responseBody === null) stream.resume(); - const response = new Response(responseBody, { status, headers: toH2ResponseHeaders(headers) }); - Object.defineProperty(response, 'url', { value: String(url) }); - stream.once('end', releaseOnce); - stream.once('close', releaseOnce); - stream.once('error', releaseOnce); - resolve(response); - }); - stream.once('error', (error) => { rejectOnce(error); }); - stream.once('close', () => { - init.signal?.removeEventListener('abort', onAbort); - if (!settled) rejectOnce(new Error('HTTP/2 stream closed before response headers')); - }); - if (body instanceof Readable) { body.once('error', (error) => stream.destroy(error)); body.pipe(stream); } - else if (body !== undefined) { stream.end(body); } - }); - } - release(entry) { - entry.activeStreams = Math.max(0, entry.activeStreams - 1); - this.drainWaiters(); - if (entry.activeStreams === 0 && !entry.closed && !entry.idleTimer) { - entry.session.unref?.(); - entry.idleTimer = setTimeout(() => { entry.session.close(); }, KEEP_ALIVE_TIMEOUT_MS); - entry.idleTimer.unref?.(); - } - } - drainWaiters() { - while (this.waiters.length > 0) { - const entry = this.sessions.find((candidate) => !candidate.closed && candidate.activeStreams < MAX_H2_STREAMS_PER_SESSION); - if (!entry) return; - if (entry.idleTimer) { clearTimeout(entry.idleTimer); entry.idleTimer = undefined; } - entry.session.ref?.(); - entry.activeStreams++; - const resolve = this.waiters.shift(); - resolve?.(entry); - } - } -} -async function undiciFallbackFetch(url, init) { - const result = await undiciRequest(url, { ...init, body: normalizeBody(init.body), dispatcher: h1Dispatcher }); - const responseBody = statusMustNotHaveBody(result.statusCode) || init.method === 'HEAD' ? null : result.body; - if (responseBody === null) await result.body.dump(); - const response = new Response(responseBody, { status: result.statusCode, headers: toResponseHeaders(result.headers) }); - Object.defineProperty(response, 'url', { value: String(url) }); - return response; -} -const fetch792 = async (url, init) => { - const { agent: _a, body: rawBody, duplex: _d, redirect, signal, ...rest } = init ?? {}; - const requestURL = new URL(String(url)); - const requestInit = { ...rest, body: rawBody, maxRedirections: redirect === 'manual' || redirect === 'error' ? 0 : 20, signal: signal ?? undefined }; - if (requestURL.protocol !== 'https:') return undiciFallbackFetch(requestURL, requestInit); - try { - return await getPool(requestURL).request(requestURL, requestInit); - } catch (error) { - if (error instanceof Error && /^HTTP\/2 was not negotiated/.test(error.message)) return undiciFallbackFetch(requestURL, requestInit); - throw error; - } -}; -// ════════════════════════════════ end #792 copy ════════════════════════════════ - -// ───────────────────────────────── local h2 server ───────────────────────────────── -function makeCert() { - const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'h2cert-')); - const keyPath = path.join(dir, 'key.pem'), certPath = path.join(dir, 'cert.pem'); - execFileSync('openssl', ['req', '-x509', '-newkey', 'rsa:2048', '-nodes', '-keyout', keyPath, '-out', certPath, - '-days', '1', '-subj', '/CN=localhost', '-addext', 'subjectAltName=DNS:localhost,IP:127.0.0.1'], { stdio: 'ignore' }); - return { key: fs.readFileSync(keyPath), cert: fs.readFileSync(certPath), dir }; -} - -const counters = { tls: 0, sessions: 0, reqs: 0 }; -function startServer() { - const { key, cert } = makeCert(); - const server = http2.createSecureServer({ key, cert, allowHTTP1: true, settings: { maxConcurrentStreams: MAX_STREAMS } }); - server.on('secureConnection', () => counters.tls++); - server.on('session', () => counters.sessions++); - // Use ONLY the http1/http2 compatibility `request` API — it serves BOTH h1 and h2 - // and responds exactly once per request (no 'stream' listener => no double-respond). - server.on('request', (req, res) => { - counters.reqs++; - req.resume(); - const send = () => { res.writeHead(200, { 'content-type': 'application/json' }); res.end('{"ok":true}'); }; - SERVER_DELAY_MS ? setTimeout(send, SERVER_DELAY_MS) : send(); - }); - return new Promise((resolve) => { - server.listen(0, '127.0.0.1', () => resolve({ server, port: server.address().port })); - }); -} - -// ───────────────────────────────── load driver ───────────────────────────────── -function pct(sorted, p) { return sorted.length ? sorted[Math.min(sorted.length - 1, Math.ceil((p / 100) * sorted.length) - 1)] : 0; } -function r(n) { return Math.round(n); } - -async function runPass(name, fetchFn, url) { - const snap = { ...counters }; - const crashStart = crashes; - const lat = []; - let fails = 0; - const t0 = performance.now(); - await Promise.all(Array.from({ length: REQUESTS }, async () => { - const r0 = performance.now(); - try { - const res = await fetchFn(url, { method: 'GET', headers: { accept: 'application/json' } }); - await res.text(); // consume body so the stream completes & releases - if (!res.ok) fails++; - } catch { fails++; } - lat.push(performance.now() - r0); - })); - const wall = performance.now() - t0; - await new Promise((res) => setTimeout(res, 300)); // let late async H2 uncaught errors surface - lat.sort((a, b) => a - b); - return { - transport: name, - tlsConns: counters.tls - snap.tls, - h2sessions: counters.sessions - snap.sessions, - reqsServed: counters.reqs - snap.reqs, - fails, - crashes: crashes - crashStart, - p50: r(pct(lat, 50)), p90: r(pct(lat, 90)), p99: r(pct(lat, 99)), max: r(lat.at(-1) ?? 0), - wallMs: r(wall), - }; -} - -async function main() { - console.log(`node ${process.version} | undici ${(await import('undici/package.json', { with: { type: 'json' } })).default.version} | REQUESTS=${REQUESTS} concurrent | server delay=${SERVER_DELAY_MS}ms | server maxConcurrentStreams=${MAX_STREAMS}\n`); - const { server, port } = await startServer(); - const url = `https://127.0.0.1:${port}/health`; - - // Bounded undici Pool with allowH2, default pipelining=1 — pools connections but does NOT - // multiplex streams (one in-flight request per session) → as slow as h1. - const pool4 = new Pool(`https://127.0.0.1:${port}`, { allowH2: true, connections: 4, keepAliveTimeout: 600000, keepAliveMaxTimeout: 600000 }); - const fetchPool4 = (u, init) => { const { agent: _a, signal, ...rest } = init ?? {}; return undiciFetchImpl(u, { ...rest, dispatcher: pool4, signal: signal ?? undefined }); }; - - // Bounded undici Pool with pipelining>1 — the approach this branch ships: real H2 stream - // multiplexing over a few sessions. 4 x 64 matches src/lib/undici-fetch.ts. - // (On undici < 7.23.0 this assert-crashes; on >= 7.23.0 it is fast and stable.) - const pool4pipe = new Pool(`https://127.0.0.1:${port}`, { allowH2: true, connections: 4, pipelining: 64, keepAliveTimeout: 600000, keepAliveMaxTimeout: 600000 }); - const fetchPool4pipe = (u, init) => { const { agent: _a, signal, ...rest } = init ?? {}; return undiciFetchImpl(u, { ...rest, dispatcher: pool4pipe, signal: signal ?? undefined }); }; - - // Same, via Agent (what the SDK actually constructs) instead of a single-origin Pool. - const agentPipe = new Agent({ allowH2: true, connections: 4, pipelining: 64, keepAliveTimeout: 600000, keepAliveMaxTimeout: 600000 }); - const fetchAgentPipe = (u, init) => { const { agent: _a, signal, ...rest } = init ?? {}; return undiciFetchImpl(u, { ...rest, dispatcher: agentPipe, signal: signal ?? undefined }); }; - - const all = [ - ['h1', 'undici-h1 (baseline)', fetchH1], - ['791', 'undici-allowH2 Agent (#791)', fetch791], - ['pool', 'undici-Pool allowH2 conns=4 (pipelining=1)', fetchPool4], - ['poolpipe', 'undici-Pool conns=4 pipelining=64', fetchPool4pipe], - ['agentpipe', 'undici-Agent conns=4 pipelining=64', fetchAgentPipe], - ['792', 'node-http2-pool (#792)', fetch792], - ]; - const only = process.env.ONLY ? process.env.ONLY.split(',') : null; - const selected = only ? all.filter(([k]) => only.includes(k)) : all; - const rows = []; - for (const [, name, fn] of selected) rows.push(await runPass(name, fn, url)); - - console.log('Each row = ' + REQUESTS + ' requests fired concurrently:\n'); - console.table( - rows.map((x) => ({ - transport: x.transport, - 'TLS conns opened': x.tlsConns, - 'h2 sessions': x.h2sessions, - 'reqs served': x.reqsServed, - fails: x.fails, - crashes: x.crashes, - 'p50 ms': x.p50, 'p90 ms': x.p90, 'p99 ms': x.p99, - 'wall ms': x.wallMs, - })), - ); - console.log(`\ntotal process-crashing errors: ${crashes} (unique: ${[...crashMsgs].join(' | ') || 'none'})`); - await pool4.close().catch(() => {}); - await pool4pipe.close().catch(() => {}); - await agentPipe.close().catch(() => {}); - console.log('\nKey question: for ' + REQUESTS + ' concurrent requests, how many TLS conns / h2 sessions did #791 open vs #792?'); - console.log(' - If #791 opens few sessions & is as fast as #792 -> #791 already has the perf bump.'); - console.log(' - If #791 opens ~1 per request or is far slower -> it does not multiplex.'); - - await agent791.close().catch(() => {}); - await agentH1.close().catch(() => {}); - await h1Dispatcher.close().catch(() => {}); - for (const pool of pools.values()) for (const e of pool.sessions) try { e.session.close(); } catch {} - server.close(); - setTimeout(() => process.exit(0), 200).unref(); -} - -main().catch((e) => { console.error('bench failed:', e); process.exit(1); });