Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions controller/src/core/errors.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
import type { RuntimeFailureReason } from "../../../shared/contracts/runtime-failures";

export class HttpStatus extends Error {
public readonly status: number;
public readonly detail: string;
public readonly reason?: RuntimeFailureReason;
public readonly code?: string;

public constructor(status: number, detail: string) {
public constructor(status: number, detail: string, reason?: RuntimeFailureReason, code?: string) {
super(detail);
this.status = status;
this.detail = detail;
if (reason !== undefined) {
this.reason = reason;
}
if (code !== undefined) {
this.code = code;
}
}
}

export const isHttpStatus = (value: unknown): value is HttpStatus => value instanceof HttpStatus;

export const notFound = (detail: string): HttpStatus => new HttpStatus(404, detail);
export const notFound = (detail: string, reason?: RuntimeFailureReason, code?: string): HttpStatus =>
new HttpStatus(404, detail, reason, code);

export const badRequest = (detail: string): HttpStatus => new HttpStatus(400, detail);
export const badRequest = (detail: string, reason?: RuntimeFailureReason, code?: string): HttpStatus =>
new HttpStatus(400, detail, reason, code);

export const serviceUnavailable = (detail: string): HttpStatus => new HttpStatus(503, detail);
export const serviceUnavailable = (detail: string, reason?: RuntimeFailureReason, code?: string): HttpStatus =>
new HttpStatus(503, detail, reason, code);
9 changes: 8 additions & 1 deletion controller/src/http/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,14 @@ export const createApp = (context: AppContext): Hono => {

app.onError((error, ctx) => {
if (isHttpStatus(error)) {
return ctx.json({ detail: error.detail }, { status: error.status });
const body: Record<string, unknown> = { detail: error.detail };
if (error.reason !== undefined) {
body["reason"] = error.reason;
}
if (error.code !== undefined) {
body["code"] = error.code;
}
return ctx.json(body, { status: error.status });
}
// Client-initiated disconnects (stream cancel, page close, Droid
// cancelling an in-flight request to start a new turn) are not our
Expand Down
95 changes: 72 additions & 23 deletions controller/src/modules/engines/engine-coordinator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { AsyncLock, delay } from "../../core/async"; import { primaryLogPathFor, readFileTailBytes } from "../../core/log-files";
import { Event, type EventManager } from "../system/event-manager"; import { CONTROLLER_EVENTS } from "../../../../shared/contracts/controller-events";
import { pidExists } from "./process/process-utilities"; import { isRecipeRunning } from "../models/recipes/recipe-matching";
import { classifyLaunchFailure } from "./process/launch-failure-classifier";
import type { RuntimeFailureReason } from "../../../../shared/contracts/runtime-failures";
import type { ProcessInfo, Recipe } from "../models/types"; import type { Config } from "../../config/env";
import type { Logger } from "../../core/logger"; import type { ProcessManager } from "./process/process-manager";
import type { RecipeStore } from "../models/recipes/recipe-store"; import { LIFECYCLE_READY_TIMEOUT_MS } from "./configs";
Expand Down Expand Up @@ -62,7 +64,8 @@ export class EngineCoordinator implements EngineService {
await this.deps.eventManager.publishLaunchProgress(recipe.id, "launching", `Starting ${recipe.name}...`, 0.25);
const launch = await this.deps.processManager.launchModel(recipe); spawnedPid = launch.pid;
this.activeLaunchPid = launch.pid; if (!launch.success) {
await this.deps.eventManager.publishLaunchProgress(recipe.id, "error", launch.message, 0); return { ok: false, error: launch.message };
await this.deps.eventManager.publishLaunchProgress(recipe.id, "error", launch.message, 0, launch.reason, launch.code);
return { ok: false, error: launch.message, ...(launch.reason ? { reason: launch.reason } : {}), ...(launch.code ? { code: launch.code } : {}) };
}
const postLaunchAbort = await abortIfNeeded(recipe); if (postLaunchAbort) return postLaunchAbort;
await this.deps.eventManager.publishLaunchProgress(recipe.id, "waiting", "Loading model... (0s)", 0.5);
Expand All @@ -78,27 +81,55 @@ export class EngineCoordinator implements EngineService {
return { ok: true }; }
if (launch.pid) {
await this.deps.processManager.killProcess(launch.pid, true); }
await this.deps.eventManager.publishLaunchProgress(recipe.id, "error", ready.message, 0); return { ok: false, error: ready.message };
await this.deps.eventManager.publishLaunchProgress(recipe.id, "error", ready.message, 0, ready.reason, ready.code);
return { ok: false, error: ready.message, ...(ready.reason ? { reason: ready.reason } : {}), ...(ready.code ? { code: ready.code } : {}) };
} finally { if (this.activeLifecycleAbort === lifecycleAbort) {
this.activeLifecycleAbort = null; }
if (this.activeLaunchPid === spawnedPid) { this.activeLaunchPid = null;
} options.signal?.removeEventListener("abort", abortLifecycle);
release(); }
}
private async waitForReady(options: { recipe: Recipe; pid: number | null; logFilePath: string | null; cancel?: AbortSignal; timeoutMs?: number; fatalPatterns?: string[]; onProgress?: (elapsedSeconds: number) => Promise<void> }): Promise<{ ready: true } | { ready: false; message: string }> {
private async waitForReady(
options: {
recipe: Recipe;
pid: number | null;
logFilePath: string | null;
cancel?: AbortSignal;
timeoutMs?: number;
fatalPatterns?: string[];
onProgress?: (elapsedSeconds: number) => Promise<void>;
}
): Promise<{ ready: true } | { ready: false; message: string; reason?: RuntimeFailureReason; code?: string }> {
const timeout = options.timeoutMs ?? LIFECYCLE_READY_TIMEOUT_MS; const start = Date.now();
while (Date.now() - start < timeout) {
if (options.cancel?.aborted) { return { ready: false, message: "Launch cancelled" };
}
if (options.pid && !pidExists(options.pid)) { const errorTail = options.logFilePath ? readFileTailBytes(options.logFilePath, 500) : "";
return { ready: false,
message: `Model ${options.recipe.id} crashed during startup: ${errorTail.slice(-200)}`, };
if (options.pid && !pidExists(options.pid)) {
const errorTail = options.logFilePath ? readFileTailBytes(options.logFilePath, 500) : "";
const message = `Model ${options.recipe.id} crashed during startup: ${errorTail.slice(-200)}`;
return {
ready: false,
message,
reason: classifyLaunchFailure(message, { logTail: errorTail }) ?? "process_exited_early",
code: "crash",
};
}
if (options.logFilePath && options.fatalPatterns && options.fatalPatterns.length > 0) {
const logTail = readFileTailBytes(options.logFilePath, 3000);
for (const pattern of options.fatalPatterns) {
if (!logTail.includes(pattern)) continue;
const lines = logTail.split("\n");
const index = lines.findIndex((line) => line.includes(pattern));
const snippet = index >= 0 ? lines.slice(Math.max(0, index - 1), index + 3).join("\n") : pattern;
const message = `Fatal error: ${snippet.slice(0, 300)}`;
return {
ready: false,
message,
reason: classifyLaunchFailure(message, { logTail }) ?? "unknown",
code: "fatal-pattern",
};
}
}
if (options.logFilePath && options.fatalPatterns && options.fatalPatterns.length > 0) { const logTail = readFileTailBytes(options.logFilePath, 3000);
for (const pattern of options.fatalPatterns) { if (!logTail.includes(pattern)) continue;
const lines = logTail.split("\n"); const index = lines.findIndex((line) => line.includes(pattern));
const snippet = index >= 0 ? lines.slice(Math.max(0, index - 1), index + 3).join("\n") : pattern; return { ready: false, message: `Fatal error: ${snippet.slice(0, 300)}` };
} }
try {
const { fetchLocal } = await import("../../http/local-fetch"); const response = await fetchLocal(this.deps.config.inference_port, "/health", {
host: this.deps.config.inference_host,
Expand All @@ -110,8 +141,12 @@ export class EngineCoordinator implements EngineService {
await options.onProgress(elapsedSeconds); }
await delay(2000); }
return {
ready: false, message: `Model ${options.recipe.id} failed to become ready (timeout)`,
}; }
ready: false,
message: `Model ${options.recipe.id} failed to become ready (timeout)`,
reason: "health_timeout",
code: "timeout",
};
}
private findRecipeForProcess(current: ProcessInfo): Recipe | null {
for (const candidate of this.deps.recipeStore.list()) { if (isRecipeRunning(candidate, current, { allowEitherPathContains: true })) {
return candidate; }
Expand All @@ -128,12 +163,21 @@ export class EngineCoordinator implements EngineService {
recipe_id: recipe.id, aborted_runs: totalAborted,
}); }
}
async ensureActive(recipe: Recipe, options: { force_evict?: boolean; publish_events?: boolean } = {}): Promise<{ switched: boolean; error: string | null }> {
async ensureActive(
recipe: Recipe,
options: { force_evict?: boolean; publish_events?: boolean } = {}
): Promise<{ switched: boolean; error: string | null; reason?: RuntimeFailureReason; code?: string }> {
const existing = await this.deps.processManager.findInferenceProcess(this.deps.config.inference_port); if (existing && isRecipeRunning(recipe, existing)) {
return { switched: false, error: null }; }
if (this.autoActivationBlocked) { return {
switched: false, error: "Model auto-loading is disabled because the model was manually stopped. Start a model from vLLM Studio before sending local inference requests.",
}; }
if (this.autoActivationBlocked) {
const message = "Model auto-loading is disabled because the model was manually stopped. Start a model from vLLM Studio before sending local inference requests.";
return {
switched: false,
error: message,
reason: classifyLaunchFailure(message) ?? "model_not_served",
code: "auto-loading-blocked",
};
}
const intentSerial = ++this.lifecycleIntentSerial;
const lifecycleAbort = new AbortController(); this.activeLifecycleAbort = lifecycleAbort;
let launchPid: number | null = null;
Expand All @@ -142,8 +186,13 @@ export class EngineCoordinator implements EngineService {
} const latest = await this.deps.processManager.findInferenceProcess(this.deps.config.inference_port);
if (latest && isRecipeRunning(recipe, latest)) { return { switched: false, error: null };
} if (this.autoActivationBlocked) {
return { switched: false,
error: "Model auto-loading is disabled because the model was manually stopped. Start a model from vLLM Studio before sending local inference requests.", };
const message = "Model auto-loading is disabled because the model was manually stopped. Start a model from vLLM Studio before sending local inference requests.";
return {
switched: false,
error: message,
reason: classifyLaunchFailure(message) ?? "model_not_served",
code: "auto-loading-blocked",
};
}
const publishEvents = options.publish_events !== false; const observedProcess = latest ?? existing;
const fromRecipe = observedProcess ? this.findRecipeForProcess(observedProcess) : null; const fromModel = fromRecipe ? (fromRecipe.served_model_name ?? fromRecipe.id) : observedProcess ? observedProcess.model_path : null;
Expand All @@ -165,9 +214,9 @@ export class EngineCoordinator implements EngineService {
await this.deps.eventManager.publish( new Event(CONTROLLER_EVENTS.MODEL_SWITCH, {
status: "error", to_recipe_id: recipe.id,
to_model: recipe.served_model_name ?? recipe.id, to_backend: recipe.backend,
reason: message, })
reason: message, code: launch.code, })
); }
return { switched: true, error: message }; }
return { switched: true, error: message, ...(launch.reason ? { reason: launch.reason } : {}), ...(launch.code ? { code: launch.code } : {}) }; }
const logFilePath = primaryLogPathFor(this.deps.config.data_dir, recipe.id);
const ready = await this.waitForReady({ recipe,
pid: launch.pid, logFilePath,
Expand All @@ -189,9 +238,9 @@ export class EngineCoordinator implements EngineService {
await this.deps.eventManager.publish( new Event(CONTROLLER_EVENTS.MODEL_SWITCH, {
status: "error", to_recipe_id: recipe.id,
to_model: recipe.served_model_name ?? recipe.id, to_backend: recipe.backend,
reason: ready.message, })
reason: ready.message, code: ready.code, })
); }
return { switched: true, error: ready.message }; } finally {
return { switched: true, error: ready.message, ...(ready.reason ? { reason: ready.reason } : {}), ...(ready.code ? { code: ready.code } : {}) }; } finally {
if (this.activeLifecycleAbort === lifecycleAbort) { this.activeLifecycleAbort = null;
} if (this.activeLaunchPid === launchPid) {
this.activeLaunchPid = null; }
Expand Down
7 changes: 6 additions & 1 deletion controller/src/modules/engines/engine-service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Recipe, ProcessInfo } from "../models/types";
import type { ModelDownload } from "../shared/recipe-types";
import type { RuntimeFailureReason } from "../../../../shared/contracts/runtime-failures";

export type { Recipe, ProcessInfo };
export type { ModelDownload };
Expand All @@ -22,14 +23,18 @@ export interface HfModel {
export interface EnsureActiveResult {
switched: boolean;
error: string | null;
reason?: RuntimeFailureReason;
code?: string;
}

export interface EnsureActiveOptions {
force_evict?: boolean;
publish_events?: boolean;
}

export type SetActiveRecipeResult = { ok: true } | { ok: false; error: string };
export type SetActiveRecipeResult =
| { ok: true }
| { ok: false; error: string; reason?: RuntimeFailureReason; code?: string };

/** Options for setting the active recipe. */
export interface SetActiveRecipeOptions {
Expand Down
Loading