Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 167 additions & 4 deletions src/providers/codex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const CODEX_VERBOSITY_ENV = "GAMBIT_CODEX_VERBOSITY";
const CODEX_BIN_ENV = "GAMBIT_CODEX_BIN";
const CODEX_SKIP_SANDBOX_CONFIG_ENV = "GAMBIT_CODEX_SKIP_SANDBOX_CONFIG";
const CODEX_DISABLE_WEBSOCKETS_ENV = "GAMBIT_CODEX_DISABLE_WEBSOCKETS";
const CODEX_APP_SERVER_TIMING_ENV = "GAMBIT_CODEX_APP_SERVER_TIMING";
const CODEX_DANGEROUS_BYPASS_ENV =
"GAMBIT_CODEX_DANGEROUSLY_BYPASS_APPROVALS_AND_SANDBOX";
const MCP_DENO_BIN_ENV = "GAMBIT_MCP_DENO_BIN";
Expand Down Expand Up @@ -253,6 +254,27 @@ function parseTruthy(value: string): boolean {
return normalized === "1" || normalized === "true" || normalized === "yes";
}

function elapsedMs(startedAt: number): number {
return Math.max(0, Math.round(performance.now() - startedAt));
}

function shouldLogCodexAppServerTiming(): boolean {
return parseTruthy(Deno.env.get(CODEX_APP_SERVER_TIMING_ENV) ?? "") ||
parseTruthy(Deno.env.get("GAMBIT_CODEX_APP_SERVER_DEBUG") ?? "");
}

function logCodexAppServerTiming(
event: string,
details?: Record<string, unknown>,
): void {
if (!shouldLogCodexAppServerTiming()) return;
globalThis.console.error(
"[gambit-codex-app-server-timing]",
event,
details ?? {},
);
}

function shouldDangerouslyBypassCodexApprovalsAndSandbox(
params?: Record<string, unknown>,
): boolean {
Expand Down Expand Up @@ -1122,6 +1144,7 @@ function appServerNotificationToCodexEvent(
async function defaultAppServerTurnRunner(
input: AppServerTurnRunnerInput,
): Promise<AppServerTurnRunnerOutput> {
const turnRunnerStartedAt = performance.now();
const codexBin = Deno.env.get(CODEX_BIN_ENV)?.trim() || "codex";
const dangerousBypass = shouldDangerouslyBypassCodexApprovalsAndSandbox(
input.params,
Expand All @@ -1141,6 +1164,15 @@ async function defaultAppServerTurnRunner(
: []),
"app-server",
];
logCodexAppServerTiming("turn:start", {
cwd: input.cwd,
deckPathPresent: Boolean(input.deckPath?.trim()),
messageCount: input.messages.length,
model: input.model,
priorThreadIdPresent: Boolean(input.priorThreadId),
promptLength: input.prompt.length,
toolCount: input.tools?.length ?? 0,
});
logCodexMcpDebug("appServer:spawn", {
argv: [codexBin, ...sanitizeCodexSpawnArgsForDebug(spawnArgs)],
codexBin,
Expand All @@ -1151,13 +1183,20 @@ async function defaultAppServerTurnRunner(
),
skipSandboxConfig,
});
const spawnStartedAt = performance.now();
const child = new Deno.Command(codexBin, {
args: spawnArgs,
cwd: input.cwd,
stdin: "piped",
stdout: "piped",
stderr: "piped",
}).spawn();
logCodexAppServerTiming("spawn:complete", {
argCount: spawnArgs.length,
codexBin,
durationMs: elapsedMs(spawnStartedAt),
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});

const abort = () => {
try {
Expand Down Expand Up @@ -1203,6 +1242,9 @@ async function defaultAppServerTurnRunner(
let cleanupError: Error | null = null;
let output: AppServerTurnRunnerOutput | null = null;
let childClosedError: Error | null = null;
let sawFirstAppServerMessage = false;
let sawFirstAppServerNotification = false;
let sawFirstCallerStreamEvent = false;

const appServerClosedError = () => {
if (childClosedError) {
Expand Down Expand Up @@ -1235,6 +1277,11 @@ async function defaultAppServerTurnRunner(
childClosed.catch(() => undefined);
const childStatus = child.status.then((status) => {
childExitStatus = status;
logCodexAppServerTiming("child:exit", {
code: status.code,
success: status.success,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
const error = appServerClosedError();
for (const pendingRequest of pending.values()) {
pendingRequest.reject(error);
Expand All @@ -1251,8 +1298,25 @@ async function defaultAppServerTurnRunner(
await stdinWriter.write(encoder.encode(`${JSON.stringify(message)}\n`));
};

const emitStreamEvent = (event: Record<string, JSONValue>) => {
if (!sawFirstCallerStreamEvent) {
sawFirstCallerStreamEvent = true;
logCodexAppServerTiming("stream_event:first", {
eventType: typeof event.type === "string" ? event.type : null,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
}
input.onStreamEvent?.(event);
};

const request = (method: string, params: Record<string, unknown>) => {
const id = String(nextRequestId++);
const requestStartedAt = performance.now();
logCodexAppServerTiming("request:start", {
method,
requestId: id,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
const promise = new Promise<unknown>((resolve, reject) => {
pending.set(id, { resolve, reject });
});
Expand All @@ -1263,9 +1327,39 @@ async function defaultAppServerTurnRunner(
return Promise.race([writeMessage({ id, method, params }), childClosed])
.catch((error) => {
pending.delete(id);
logCodexAppServerTiming("request:failed", {
durationMs: elapsedMs(requestStartedAt),
error: error instanceof Error ? error.message : String(error),
method,
phase: "write",
requestId: id,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
throw error;
})
.then(() => Promise.race([requestPromise, childClosed]));
.then(() => Promise.race([requestPromise, childClosed]))
.then(
(result) => {
logCodexAppServerTiming("request:complete", {
durationMs: elapsedMs(requestStartedAt),
method,
requestId: id,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
return result;
},
(error) => {
logCodexAppServerTiming("request:failed", {
durationMs: elapsedMs(requestStartedAt),
error: error instanceof Error ? error.message : String(error),
method,
phase: "response",
requestId: id,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
throw error;
},
);
};

const readLoop = async () => {
Expand Down Expand Up @@ -1295,13 +1389,22 @@ async function defaultAppServerTurnRunner(
});
const method = typeof parsed.method === "string" ? parsed.method : "";
const params = asRecord(parsed.params);
const hasId = Object.prototype.hasOwnProperty.call(parsed, "id");
if (!sawFirstAppServerMessage) {
sawFirstAppServerMessage = true;
logCodexAppServerTiming("stdout:first_message", {
hasId,
method: method || null,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
}
if (method) {
if (Object.prototype.hasOwnProperty.call(parsed, "id")) {
if (hasId) {
const requestId = typeof parsed.id === "string" ||
typeof parsed.id === "number" || parsed.id === null
? parsed.id
: null;
input.onStreamEvent?.({
emitStreamEvent({
type: "app_server.request",
requestId: requestId === null ? "null" : String(requestId),
method,
Expand All @@ -1327,9 +1430,16 @@ async function defaultAppServerTurnRunner(
});
continue;
}
if (!sawFirstAppServerNotification) {
sawFirstAppServerNotification = true;
logCodexAppServerTiming("notification:first", {
method,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
}
const pseudoEvent = appServerNotificationToCodexEvent(method, params);
if (pseudoEvent) {
input.onStreamEvent?.(pseudoEvent);
emitStreamEvent(pseudoEvent);
}
if (method === "rawResponseItem/completed") {
const rawItem = codexRawResponseItemRecord(params.item);
Expand Down Expand Up @@ -1371,6 +1481,10 @@ async function defaultAppServerTurnRunner(
if (typeof turn.id === "string") {
turnState.id = turn.id;
}
logCodexAppServerTiming("notification:turn_started", {
totalDurationMs: elapsedMs(turnRunnerStartedAt),
turnIdPresent: Boolean(turnState.id),
});
} else if (method === "turn/completed") {
const turn = asRecord(params.turn);
if (typeof turn.id === "string") {
Expand All @@ -1386,6 +1500,11 @@ async function defaultAppServerTurnRunner(
turnState.error = new DOMException("Run canceled", "AbortError");
}
turnState.completed = true;
logCodexAppServerTiming("notification:turn_completed", {
status: typeof turn.status === "string" ? turn.status : null,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
turnIdPresent: Boolean(turnState.id),
});
}
continue;
}
Expand Down Expand Up @@ -1445,6 +1564,7 @@ async function defaultAppServerTurnRunner(
const stdoutLoop = readLoop();

try {
const initializeStartedAt = performance.now();
await request("initialize", {
clientInfo: {
name: "gambit",
Expand All @@ -1455,10 +1575,26 @@ async function defaultAppServerTurnRunner(
experimentalApi: true,
},
});
logCodexAppServerTiming("initialize:complete", {
durationMs: elapsedMs(initializeStartedAt),
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
await writeMessage({ method: "initialized", params: {} });
logCodexAppServerTiming("initialized_notification:sent", {
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
const authStartedAt = performance.now();
await bootstrapCodexExternalAuth({ request });
logCodexAppServerTiming("auth_bootstrap:complete", {
durationMs: elapsedMs(authStartedAt),
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});

const model = normalizeCodexModel(input.model);
const threadOperation = input.priorThreadId
? "thread/resume"
: "thread/start";
const threadStartedAt = performance.now();
const threadResult = input.priorThreadId
? await request("thread/resume", {
threadId: input.priorThreadId,
Expand All @@ -1483,6 +1619,11 @@ async function defaultAppServerTurnRunner(
experimentalRawEvents: true,
persistExtendedHistory: false,
}) as Record<string, unknown>;
logCodexAppServerTiming("thread:complete", {
durationMs: elapsedMs(threadStartedAt),
operation: threadOperation,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});

const thread = asRecord(threadResult.thread);
const threadId = typeof thread.id === "string"
Expand All @@ -1496,6 +1637,7 @@ async function defaultAppServerTurnRunner(
? [{ type: "text", text: input.prompt }]
: [];

const turnStartRequestStartedAt = performance.now();
await request("turn/start", {
threadId,
input: turnInput,
Expand All @@ -1507,7 +1649,12 @@ async function defaultAppServerTurnRunner(
}),
model: model && model !== "default" ? model : null,
});
logCodexAppServerTiming("turn_start_request:complete", {
durationMs: elapsedMs(turnStartRequestStartedAt),
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});

const turnWaitStartedAt = performance.now();
while (!turnState.completed) {
if (input.signal?.aborted) {
throw new DOMException("Run canceled", "AbortError");
Expand All @@ -1530,6 +1677,11 @@ async function defaultAppServerTurnRunner(
clearTimeout(pollTimer);
}
}
logCodexAppServerTiming("turn_wait:complete", {
durationMs: elapsedMs(turnWaitStartedAt),
sawCompletedNotification: turnState.completed,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});

if (turnState.error) throw turnState.error;

Expand All @@ -1540,6 +1692,7 @@ async function defaultAppServerTurnRunner(
usage: turnState.usage,
};
} finally {
const cleanupStartedAt = performance.now();
for (const { reject } of pending.values()) {
reject(new Error("Codex app-server session closed"));
}
Expand All @@ -1562,13 +1715,23 @@ async function defaultAppServerTurnRunner(
} else if (!turnState.completed && !turnState.error) {
cleanupError = appServerClosedError();
}
logCodexAppServerTiming("cleanup:complete", {
durationMs: elapsedMs(cleanupStartedAt),
totalDurationMs: elapsedMs(turnRunnerStartedAt),
});
}
if (cleanupError) {
throw cleanupError;
}
if (!output) {
throw new Error("Codex app-server turn runner completed without a result.");
}
logCodexAppServerTiming("turn:complete", {
assistantMessageCount: output.assistantMessages.length,
rawResponseItemCount: output.rawResponseItems?.length ?? 0,
totalDurationMs: elapsedMs(turnRunnerStartedAt),
usagePresent: Boolean(output.usage),
});
return output;
}

Expand Down
Loading