Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions controller/src/app-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { resolve } from "node:path";
import { createConfig, type Config } from "./config/env";
import { createEventManager, type EventManager } from "./modules/system/event-manager";
import { createLaunchState, type LaunchState } from "./modules/engines/process/launch-state";
import {
createLaunchFailureBudget,
type LaunchFailureBudget,
} from "./modules/engines/process/launch-failure-budget";
import { createMetrics, type ControllerMetrics, type MetricsRegistry } from "./modules/system/metrics";
import { createProcessManager, type ProcessManager } from "./modules/engines/process/process-manager";
import { DownloadManager } from "./modules/engines/downloads/download-manager";
Expand All @@ -21,6 +25,7 @@ export interface AppContext {
logger: Logger;
eventManager: EventManager;
launchState: LaunchState;
launchFailureBudget: LaunchFailureBudget;
metrics: ControllerMetrics;
metricsRegistry: MetricsRegistry;
processManager: ProcessManager;
Expand Down Expand Up @@ -80,6 +85,7 @@ export const createAppContext = (): AppContext => {
}

const launchState = createLaunchState();
const launchFailureBudget = createLaunchFailureBudget();
const { registry: metricsRegistry, metrics } = createMetrics();
const processManager = createProcessManager(config, logger, eventManager);
const downloadManager = new DownloadManager(config, downloadStore, eventManager, logger);
Expand All @@ -92,6 +98,7 @@ export const createAppContext = (): AppContext => {
recipeStore,
downloadManager,
abortRunsForModel: () => 0,
launchFailureBudget,
});

lifetimeMetricsStore.ensureFirstStarted();
Expand All @@ -101,6 +108,7 @@ export const createAppContext = (): AppContext => {
logger,
eventManager,
launchState,
launchFailureBudget,
metrics,
metricsRegistry,
processManager,
Expand Down
37 changes: 30 additions & 7 deletions controller/src/modules/engines/engine-coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import type { Logger } from "../../core/logger"; import type { ProcessManager }
import type { RecipeStore } from "../models/recipes/recipe-store"; import { LIFECYCLE_READY_TIMEOUT_MS } from "./configs";
import type { EngineService, DownloadRequest, HfModel, SetActiveRecipeResult, SetActiveRecipeOptions } from "./engine-service"; import type { ModelDownload } from "../shared/recipe-types";
import type { DownloadManager } from "./downloads/download-manager";
import type { LaunchFailureBudget } from "./process/launch-failure-budget";
import { formatLaunchFailureBudgetMessage } from "./process/launch-failure-budget";
import { fetchHuggingFaceModelInfo } from "./downloads/huggingface-api";
interface CoordinatorDeps { config: Config;
logger: Logger; eventManager: EventManager;
processManager: ProcessManager; recipeStore: RecipeStore;
downloadManager: DownloadManager; abortRunsForModel?: (modelName: string) => number;
downloadManager: DownloadManager; launchFailureBudget: LaunchFailureBudget; abortRunsForModel?: (modelName: string) => number;
}
export class EngineCoordinator implements EngineService {
private readonly switchLock = new AsyncLock();
Expand Down Expand Up @@ -59,10 +61,17 @@ export class EngineCoordinator implements EngineService {
const postEvictAbort = await abortIfNeeded(recipe);
if (postEvictAbort) return postEvictAbort;
if (!recipe) { return { ok: true }; }
const blocked = this.deps.launchFailureBudget.isBlocked(recipe.id);
if (blocked) {
const message = formatLaunchFailureBudgetMessage(blocked);
await this.deps.eventManager.publishLaunchProgress(recipe.id, "error", message, 0);
return { ok: false, error: message };
}
await this.deps.eventManager.publishLaunchProgress(recipe.id, "launching", `Starting ${recipe.name}...`, 0.25);
const launch = await this.deps.processManager.launchModel(recipe); spawnedPid = launch.pid;
this.activeLaunchPid = launch.pid; if (!launch.success) {
await this.deps.eventManager.publishLaunchProgress(recipe.id, "error", launch.message, 0); return { ok: false, error: launch.message };
const failure = this.deps.launchFailureBudget.recordFailure(recipe.id);
await this.deps.eventManager.publishLaunchProgress(recipe.id, "error", `${launch.message} (${failure.failure_count}/${failure.limit} launch failures in the current window)`, 0); return { ok: false, error: launch.message };
}
const postLaunchAbort = await abortIfNeeded(recipe); if (postLaunchAbort) return postLaunchAbort;
await this.deps.eventManager.publishLaunchProgress(recipe.id, "waiting", "Loading model... (0s)", 0.5);
Expand All @@ -74,11 +83,13 @@ export class EngineCoordinator implements EngineService {
if (isAborted()) {
return publishCancelled(recipe); }
if (ready.ready) {
this.deps.launchFailureBudget.reset(recipe.id);
await this.deps.eventManager.publishLaunchProgress(recipe.id, "ready", "Model is ready!", 1);
return { ok: true }; }
if (launch.pid) {
await this.deps.processManager.killProcess(launch.pid, true); }
await this.deps.eventManager.publishLaunchProgress(recipe.id, "error", ready.message, 0); return { ok: false, error: ready.message };
const failure = this.deps.launchFailureBudget.recordFailure(recipe.id);
await this.deps.eventManager.publishLaunchProgress(recipe.id, "error", `${ready.message} (${failure.failure_count}/${failure.limit} launch failures in the current window)`, 0); return { ok: false, error: ready.message };
} finally { if (this.activeLifecycleAbort === lifecycleAbort) {
this.activeLifecycleAbort = null; }
if (this.activeLaunchPid === spawnedPid) { this.activeLaunchPid = null;
Expand Down Expand Up @@ -145,6 +156,10 @@ export class EngineCoordinator implements EngineService {
return { switched: false,
error: "Model auto-loading is disabled because the model was manually stopped. Start a model from vLLM Studio before sending local inference requests.", };
}
const blocked = this.deps.launchFailureBudget.isBlocked(recipe.id);
if (blocked) {
return { switched: false, error: formatLaunchFailureBudgetMessage(blocked) };
}
const publishEvents = options.publish_events !== false; const observedProcess = latest ?? existing;
const fromRecipe = observedProcess ? this.findRecipeForProcess(observedProcess) : null; const fromModel = fromRecipe ? (fromRecipe.served_model_name ?? fromRecipe.id) : observedProcess ? observedProcess.model_path : null;
const fromBackend = observedProcess?.backend ?? fromRecipe?.backend ?? "unknown";
Expand All @@ -161,7 +176,8 @@ export class EngineCoordinator implements EngineService {
return { switched: true, error: "Model switch cancelled" }; }
const launch = await this.deps.processManager.launchModel(recipe); launchPid = launch.pid;
this.activeLaunchPid = launch.pid; if (!launch.success) {
const message = `Failed to launch model ${recipe.id}: ${launch.message}`; if (publishEvents) {
const failure = this.deps.launchFailureBudget.recordFailure(recipe.id);
const message = `Failed to launch model ${recipe.id}: ${launch.message} (${failure.failure_count}/${failure.limit} launch failures in the current window)`; if (publishEvents) {
await this.deps.eventManager.publish( new Event(CONTROLLER_EVENTS.MODEL_SWITCH, {
status: "error", to_recipe_id: recipe.id,
to_model: recipe.served_model_name ?? recipe.id, to_backend: recipe.backend,
Expand All @@ -176,6 +192,7 @@ export class EngineCoordinator implements EngineService {
if (launch.pid) { await this.deps.processManager.killProcess(launch.pid, true);
} return { switched: true, error: "Model switch cancelled" };
} if (ready.ready) {
this.deps.launchFailureBudget.reset(recipe.id);
if (publishEvents) { await this.deps.eventManager.publish(
new Event(CONTROLLER_EVENTS.MODEL_SWITCH, { status: "ready",
to_recipe_id: recipe.id, to_model: recipe.served_model_name ?? recipe.id,
Expand All @@ -185,18 +202,24 @@ export class EngineCoordinator implements EngineService {
return { switched: true, error: null };
}
if (launch.pid) { await this.deps.processManager.killProcess(launch.pid, true);
} if (publishEvents) {
} const failure = this.deps.launchFailureBudget.recordFailure(recipe.id);
const message = `${ready.message} (${failure.failure_count}/${failure.limit} launch failures in the current window)`;
if (publishEvents) {
await this.deps.eventManager.publish( new Event(CONTROLLER_EVENTS.MODEL_SWITCH, {
status: "error", to_recipe_id: recipe.id,
to_model: recipe.served_model_name ?? recipe.id, to_backend: recipe.backend,
reason: ready.message, })
reason: message, })
); }
return { switched: true, error: ready.message }; } finally {
return { switched: true, error: message }; } finally {
if (this.activeLifecycleAbort === lifecycleAbort) { this.activeLifecycleAbort = null;
} if (this.activeLaunchPid === launchPid) {
this.activeLaunchPid = null; }
release(); }
}
resetLaunchFailureBudget(recipeId: string): void {
this.deps.launchFailureBudget.reset(recipeId);
}

async getCurrentProcess(): Promise<ProcessInfo | null> { return this.deps.processManager.findInferenceProcess(this.deps.config.inference_port);
}

Expand Down
1 change: 1 addition & 0 deletions controller/src/modules/engines/engine-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export interface EngineService {
options?: SetActiveRecipeOptions
): Promise<SetActiveRecipeResult>;
ensureActive(recipe: Recipe, options?: EnsureActiveOptions): Promise<EnsureActiveResult>;
resetLaunchFailureBudget(recipeId: string): void;

getCurrentProcess(): Promise<ProcessInfo | null>;

Expand Down
86 changes: 86 additions & 0 deletions controller/src/modules/engines/process/launch-failure-budget.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
export interface LaunchFailureBudgetSnapshot {
recipe_id: string;
failure_count: number;
limit: number;
window_ms: number;
reset_at: string;
blocked: boolean;
}

export interface LaunchFailureBudget {
get(recipeId: string): LaunchFailureBudgetSnapshot | null;
isBlocked(recipeId: string): LaunchFailureBudgetSnapshot | null;
listActive(): LaunchFailureBudgetSnapshot[];
recordFailure(recipeId: string): LaunchFailureBudgetSnapshot;
reset(recipeId: string): void;
}

export const LAUNCH_FAILURE_LIMIT = 3;
export const LAUNCH_FAILURE_WINDOW_MS = 10 * 60 * 1000;

export const formatLaunchFailureBudgetMessage = (
snapshot: LaunchFailureBudgetSnapshot
): string => {
return `Launch crash-loop budget exhausted for ${snapshot.recipe_id}: ${snapshot.failure_count}/${snapshot.limit} failed attempts in ${Math.round(snapshot.window_ms / 60_000)} minutes. Edit the recipe or retry after ${snapshot.reset_at}.`;
};

export const createLaunchFailureBudget = (
limit = LAUNCH_FAILURE_LIMIT,
windowMs = LAUNCH_FAILURE_WINDOW_MS
): LaunchFailureBudget => {
const failuresByRecipe = new Map<string, number[]>();

const prune = (recipeId: string, now = Date.now()): number[] => {
const cutoff = now - windowMs;
const kept = (failuresByRecipe.get(recipeId) ?? []).filter(
(timestamp) => timestamp > cutoff
);
if (kept.length > 0) {
failuresByRecipe.set(recipeId, kept);
} else {
failuresByRecipe.delete(recipeId);
}
return kept;
};

const snapshot = (
recipeId: string,
failures: number[]
): LaunchFailureBudgetSnapshot | null => {
if (failures.length === 0) return null;
const oldest = Math.min(...failures);
return {
recipe_id: recipeId,
failure_count: failures.length,
limit,
window_ms: windowMs,
reset_at: new Date(oldest + windowMs).toISOString(),
blocked: failures.length >= limit,
};
};

return {
get(recipeId) {
return snapshot(recipeId, prune(recipeId));
},
isBlocked(recipeId) {
const current = snapshot(recipeId, prune(recipeId));
return current?.blocked ? current : null;
},
listActive() {
const now = Date.now();
return [...failuresByRecipe.keys()]
.map((recipeId) => snapshot(recipeId, prune(recipeId, now)))
.filter((entry): entry is LaunchFailureBudgetSnapshot => entry !== null);
},
recordFailure(recipeId) {
const failures = prune(recipeId);
failures.push(Date.now());
failuresByRecipe.set(recipeId, failures);
return snapshot(recipeId, failures)!;
},
reset(recipeId) {
failuresByRecipe.delete(recipeId);
},
};
};
8 changes: 6 additions & 2 deletions controller/src/modules/engines/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ export const registerEngineRoutes: RouteRegistrar = (app, context) => {
// forever and a launching one as "stopped".)
const launchingId = context.launchState.getLaunchingRecipeId();
const result = recipes.map((recipe) => {
let status = "stopped";
const crashLoop = context.launchFailureBudget.get(recipe.id);
let status = crashLoop?.blocked ? "error" : "stopped";
if (launchingId === recipe.id) status = "starting";
if (current && isRecipeRunning(recipe, current)) status = "running";
return { ...recipe, status };
return { ...recipe, status, crash_loop: crashLoop };
});
return ctx.json(result);
});
Expand All @@ -105,6 +106,7 @@ export const registerEngineRoutes: RouteRegistrar = (app, context) => {
try {
const recipe = parseRecipe(body);
context.stores.recipeStore.save(recipe);
context.engineService.resetLaunchFailureBudget(recipe.id);
await context.eventManager.publish(new Event(CONTROLLER_EVENTS.RECIPE_CREATED, { recipe }));
return ctx.json({ success: true, id: recipe.id });
} catch (error) {
Expand All @@ -118,6 +120,7 @@ export const registerEngineRoutes: RouteRegistrar = (app, context) => {
try {
const recipe = parseRecipe({ ...body, id: recipeId });
context.stores.recipeStore.save(recipe);
context.engineService.resetLaunchFailureBudget(recipe.id);
await context.eventManager.publish(new Event(CONTROLLER_EVENTS.RECIPE_UPDATED, { recipe }));
return ctx.json({ success: true, id: recipe.id });
} catch (error) {
Expand All @@ -129,6 +132,7 @@ export const registerEngineRoutes: RouteRegistrar = (app, context) => {
const recipeId = ctx.req.param("recipeId");
const deleted = context.stores.recipeStore.delete(recipeId);
if (!deleted) throw notFound("Recipe not found");
context.engineService.resetLaunchFailureBudget(recipeId);
await context.eventManager.publish(
new Event(CONTROLLER_EVENTS.RECIPE_DELETED, { recipe_id: recipeId })
);
Expand Down
1 change: 1 addition & 0 deletions controller/src/modules/system/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export const registerSystemRoutes: RouteRegistrar = (app, context) => {
process: current,
inference_port: context.config.inference_port,
launching: context.launchState.getLaunchingRecipeId(),
launch_failures: context.launchFailureBudget.listActive(),
});
});

Expand Down
8 changes: 8 additions & 0 deletions frontend/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ export type Recipe = RecipePayload;

export interface RecipeWithStatus extends RecipeBase {
status: "running" | "stopped" | "starting" | "error";
crash_loop?: {
recipe_id: string;
failure_count: number;
limit: number;
window_ms: number;
reset_at: string;
blocked: boolean;
} | null;
tp?: number;
pp?: number;
}
Expand Down
Loading