Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/agent-worker/.dev.vars.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ OUTPUT_DOWNLOAD_SIGNING_SECRET=

# Used to verify internal DSR maintenance calls from webhooks-worker.
INTERNAL_MAINTENANCE_SECRET=

# Platform DeepSeek API key for the free 200K-token tier (https://platform.deepseek.com/api_keys).
# Leave blank to disable the free tier locally; set a real key to test free DeepSeek runs.
DEEPSEEK_PLATFORM_API_KEY=
1 change: 1 addition & 0 deletions apps/agent-worker/src/agent-routing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ export async function startAgentRun(
...(run.importRepoUrl ? { importRepoUrl: run.importRepoUrl } : {}),
messageText,
model: body.model ?? run.modelId,
modelExplicit: Boolean(body.model?.trim()),
projectId: run.projectId,
...(run.projectMode ? { projectMode: run.projectMode } : {}),
...(policy.quotaWarning ? { quotaWarning: policy.quotaWarning } : {}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { StartRunInput } from "./agent-run-schemas";
import { persistAgentRunUsage } from "./agent-run-status-persistence";
import {
appendBudgetEvent,
getRunStateValue,
readStoredRunSnapshot,
type StoredBudgetSnapshot,
} from "./agent-run-storage";
Expand All @@ -30,8 +31,14 @@ export async function recordBudgetDelta(
event: BudgetDelta,
): Promise<StoredBudgetSnapshot> {
const storedBeforeAppend = readStoredRunSnapshot(ctx);
const model = input.model ?? storedBeforeAppend?.modelId ?? "unknown";
const usd = await budgetEventUsd(model, event);
// Prefer the credential's resolved accounting slug (set for DeepSeek runs) so usage is
// attributed to the model that actually served the run — not the Auto/default request.
const resolvedModelId = getRunStateValue(ctx, "resolved_model_id");
const model = resolvedModelId ?? input.model ?? storedBeforeAppend?.modelId ?? "unknown";
const isPlatformFree = getRunStateValue(ctx, "credit_source") === "platform_free";
// Free DeepSeek runs are $0 to the user (metered by tokens, not USD), so they never
// consume the user's run/daily USD budget cap.
const usd = isPlatformFree ? 0 : await budgetEventUsd(model, event);
appendBudgetEvent(ctx, {
kind: event.kind,
modelId: model,
Expand All @@ -41,10 +48,13 @@ export async function recordBudgetDelta(
});
const stored = readStoredRunSnapshot(ctx);
const provider = providerFromModel(model);
const freeDeepseekTokens =
isPlatformFree && event.kind === "llm_usage" ? event.tokensIn + event.tokensOut : 0;
ctx.waitUntil(
persistAgentRunUsage(env, {
costUsd: usd,
eventType: event.kind,
...(freeDeepseekTokens > 0 ? { freeDeepseekTokens } : {}),
inputTokens: event.tokensIn,
model,
outputTokens: event.tokensOut,
Expand Down Expand Up @@ -76,10 +86,13 @@ async function budgetEventUsd(model: string, event: BudgetDelta): Promise<number
});
}

function providerFromModel(model: string): "anthropic" | "openai" | undefined {
function providerFromModel(model: string): "anthropic" | "deepseek" | "openai" | undefined {
if (model.startsWith("anthropic/") || model.startsWith("claude-")) {
return "anthropic";
}
if (model.startsWith("deepseek/") || model.startsWith("deepseek-")) {
return "deepseek";
}
if (model.startsWith("openai/") || model.startsWith("gpt-") || model.startsWith("o")) {
return "openai";
}
Expand Down
13 changes: 13 additions & 0 deletions apps/agent-worker/src/durable-objects/agent-run-budget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ export function dailyCostCapReachedChunk(): UIMessageChunk {
};
}

export function freeDeepseekQuotaReachedChunk(): UIMessageChunk {
return {
type: "data-error",
data: {
v: 1,
code: "deepseek_free_quota_exhausted",
message:
"Your 200,000 free DeepSeek tokens are used up. Add your own key in Settings → Models to keep building.",
retriable: false,
},
};
}

export function effectiveRunBudgetCapUsd(input: BudgetedRunInput): number {
return input.budgetCapUsd ?? DEFAULT_RUN_BUDGET_CAP_USD;
}
53 changes: 52 additions & 1 deletion apps/agent-worker/src/durable-objects/agent-run-cost-caps.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { APIError, emitUserEvent } from "@cheatcode/observability";
import { FREE_DEEPSEEK_TOKEN_LIMIT } from "@cheatcode/types";
import type { UIMessageChunk } from "ai";
import {
budgetCapReachedChunk,
budgetChunk,
dailyCostCapReachedChunk,
freeDeepseekQuotaReachedChunk,
isBudgetExhausted,
isDailyCostCapExhausted,
} from "./agent-run-budget";
Expand All @@ -14,7 +16,11 @@ import {
} from "./agent-run-budget-persistence";
import type { AgentRunEnv } from "./agent-run-env";
import type { StartRunInput } from "./agent-run-schemas";
import { readStoredRunSnapshot, type StoredBudgetSnapshot } from "./agent-run-storage";
import {
getRunStateValue,
readStoredRunSnapshot,
type StoredBudgetSnapshot,
} from "./agent-run-storage";

export type CostCapExhaustion = {
chunk: UIMessageChunk;
Expand Down Expand Up @@ -105,6 +111,51 @@ export async function enforceCostCaps(
});
}

/**
* Hard-stops a platform_free DeepSeek run synchronously from DO-local state once the
* lifetime allowance would be crossed: startUsed (captured at credential resolution) plus
* this run's accumulated tokens. No mid-run DB read — the per-step DB increment is
* best-effort and lags, so it can't be the bound. Bounds a single run; residual cross-run
* overshoot (~one run) is negligible. Free runs are $0, so the USD cap never bounds them.
*/
export async function enforceFreeDeepseekCap(
deps: BudgetAccountingDeps,
input: StartRunInput,
snapshot: { tokensIn: number; tokensOut: number },
isAnswerTextOpen: boolean,
): Promise<void> {
if (getRunStateValue(deps.ctx, "credit_source") !== "platform_free") {
return;
}
const projected = freeDeepseekStartUsed(deps.ctx) + snapshot.tokensIn + snapshot.tokensOut;
if (projected < FREE_DEEPSEEK_TOKEN_LIMIT) {
return;
}
await deps.append(freeDeepseekQuotaReachedChunk());
if (isAnswerTextOpen) {
await deps.append({ id: "answer", type: "text-end" });
}
await appendStoredBudgetStatus(deps, input);
await deps.append({ finishReason: "stop", type: "finish" });
await deps.markCompleted(input);
deps.closeSubscribers();
throw new APIError(
402,
"deepseek_free_quota_exhausted",
"Free DeepSeek token allowance reached.",
{ retriable: false },
);
}

function freeDeepseekStartUsed(ctx: DurableObjectState): number {
const raw = getRunStateValue(ctx, "free_deepseek_start_used");
if (raw === undefined) {
return 0;
}
const parsed = Number(raw);
return Number.isFinite(parsed) && parsed > 0 ? Math.trunc(parsed) : 0;
}

export function costCapExhaustion(
input: StartRunInput,
nextRunCostUsd: number,
Expand Down
1 change: 1 addition & 0 deletions apps/agent-worker/src/durable-objects/agent-run-env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { ProjectSandbox } from "./project-sandbox";

export interface AgentRunEnv extends AnalyticsBindings {
COMPOSIO_API_KEY?: WorkerSecret;
DEEPSEEK_PLATFORM_API_KEY?: WorkerSecret;
HYPERDRIVE: Hyperdrive;
OUTPUT_DOWNLOAD_BASE_URL?: string;
OUTPUT_DOWNLOAD_SIGNING_SECRET: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export async function runMastraStream(options: MastraStreamOptions): Promise<voi
composioConnectedAccounts: toolCredentials.composioConnectedAccounts,
composioQuotaMeter: toolCredentials.composioQuotaMeter,
composioUserId: toolCredentials.composioUserId,
deepseekApiKey: credential.provider === "deepseek" ? credential.apiKey : undefined,
elevenlabsApiKey: toolCredentials.elevenlabsApiKey,
exaApiKey: toolCredentials.exaApiKey,
falApiKey: toolCredentials.falApiKey,
Expand All @@ -81,6 +82,11 @@ export async function runMastraStream(options: MastraStreamOptions): Promise<voi
researchFanoutSubagentLimit: input.researchFanoutSubagentLimit,
},
),
// DeepSeek V4 defaults to thinking mode; disable it so tool-calling stays a clean
// OpenAI-style loop (avoids the reasoning_content round-trip). No-op for other providers.
...(credential.provider === "deepseek"
? { providerOptions: { deepseek: { thinking: { type: "disabled" } } } }
: {}),
stopWhen: stepCountIs(AGENT_LOOP_MAX_STEPS),
}),
timeoutMs: MASTRA_FIRST_CHUNK_TIMEOUT_MS,
Expand Down
3 changes: 3 additions & 0 deletions apps/agent-worker/src/durable-objects/agent-run-schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ export const StartRunInputSchema = z
userId: z.string().uuid(),
messageText: z.string().min(1),
model: z.string().trim().min(1).max(200).optional(),
// Whether the user explicitly picked `model` (vs an Auto/implicit default). Gates
// the free-DeepSeek last-resort fallback so explicit non-free picks aren't swapped.
modelExplicit: z.boolean().optional(),
projectMode: z.enum(["app-builder", "app-builder-mobile", "general"]).default("general"),
isFirstRun: z.boolean().default(false),
researchFanoutSubagentLimit: z.number().int().positive().max(25).default(3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface PersistAgentRunStatusInput {
export interface PersistAgentRunUsageInput {
costUsd: number;
eventType: string;
freeDeepseekTokens?: number;
inputTokens: number;
model?: string;
outputTokens: number;
Expand Down Expand Up @@ -79,6 +80,7 @@ export async function persistAgentRunUsage(
agentRunId: AgentRunId(input.runId),
costUsd: input.costUsd,
eventType: input.eventType,
...(input.freeDeepseekTokens ? { freeDeepseekTokens: input.freeDeepseekTokens } : {}),
inputTokens: input.inputTokens,
...(input.model ? { model: input.model } : {}),
outputTokens: input.outputTokens,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface StreamDriverDeps {
createBroker: () => ApprovalBroker;
env: AgentRunEnv;
hasPendingDecision: () => boolean;
persistResolvedCredential: (credential: LlmCredential) => void;
setRunStage: (stage: string) => void;
}

Expand Down Expand Up @@ -98,6 +99,10 @@ async function streamMastraRun(
params: StreamRunParams,
credential: LlmCredential,
): Promise<void> {
// Persist the resolved credit source + accounting slug + free-tier baseline to run-state
// (covers both the primary and OpenAI-fallback credentials) before any tokens stream, so
// per-step metering and the free-quota hard stop read the model that actually served.
deps.persistResolvedCredential(credential);
await runMastraStream({
abortSignal: params.abortSignal,
...(params.agentContextNote === undefined ? {} : { agentContextNote: params.agentContextNote }),
Expand Down
23 changes: 23 additions & 0 deletions apps/agent-worker/src/durable-objects/agent-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
type BudgetAccountingDeps,
costCapExhaustion,
enforceCostCaps,
enforceFreeDeepseekCap,
isCostCapAPIError,
recordBudgetDelta,
} from "./agent-run-cost-caps";
Expand Down Expand Up @@ -72,6 +73,7 @@ import {
saveTakeoverStateInStorage,
} from "./agent-run-takeover-state";
import { isAppBuilderRequest, missingInternalUserResponse } from "./agent-run-utils";
import type { LlmCredential } from "./llm-provider";
import {
mastraChunkError,
mastraChunkToUiChunks,
Expand Down Expand Up @@ -548,10 +550,30 @@ export class AgentRun extends DurableObject<AgentRunEnv> {
createBroker: () => this.approvals.createBroker(),
env: this.env,
hasPendingDecision: () => this.approvals.hasPendingDecision(),
persistResolvedCredential: (credential) => this.persistResolvedCredential(credential),
setRunStage: (stage) => this.setRunStage(stage),
};
}

private persistResolvedCredential(credential: LlmCredential): void {
setRunStateValue(this.ctx, "credit_source", credential.creditSource);
// DeepSeek runs (free or BYOK) carry the bare provider id; persist the catalog/accounting
// slug so usage attribution matches the catalog format even for Auto-resolved runs.
if (credential.provider === "deepseek") {
setRunStateValue(this.ctx, "resolved_model_id", `deepseek/${credential.modelId}`);
}
if (
credential.creditSource === "platform_free" &&
credential.freeTokensUsedAtResolve !== undefined
) {
setRunStateValue(
this.ctx,
"free_deepseek_start_used",
String(credential.freeTokensUsedAtResolve),
);
}
}

private async appendMastraChunk(chunk: unknown): Promise<number> {
let appendedCount = 0;
for (const uiChunk of mastraChunkToUiChunks(chunk)) {
Expand Down Expand Up @@ -590,6 +612,7 @@ export class AgentRun extends DurableObject<AgentRunEnv> {
usd: usage.costUsd ?? 0,
});
await enforceCostCaps(this.accountingDeps(), input, snapshot, true);
await enforceFreeDeepseekCap(this.accountingDeps(), input, snapshot, true);
}
return appendedCount;
}
Expand Down
Loading
Loading