Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-slack-schedule-legacy-workflow-resume.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"eve": patch
---

Resume parked sessions created before eve's agent-scoped Workflow queues by issuing a legacy queue wake-up when the target deployment does not consume the current namespace. This restores follow-up handling for Slack threads started by scheduled runs before the queue namespace change.
1 change: 1 addition & 0 deletions packages/eve/src/internal/workflow/queue-namespace.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export const WORKFLOW_QUEUE_NAMESPACE_ENV = "WORKFLOW_QUEUE_NAMESPACE";
export const LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE = "eve";

/** Derives a stable Workflow queue namespace from an eve agent's unique name. */
export function deriveEveWorkflowQueueNamespace(agentName: string): string {
Expand Down
132 changes: 132 additions & 0 deletions packages/eve/src/internal/workflow/runtime.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { afterEach, describe, expect, it, vi } from "vitest";

import {
LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE,
WORKFLOW_QUEUE_NAMESPACE_ENV,
} from "#internal/workflow/queue-namespace.js";
import { resumeHook } from "#internal/workflow/runtime.js";

const getWorldMock = vi.fn();
const healthCheckMock = vi.fn();
const queueMock = vi.fn();
const resumeHookMock = vi.fn();
const setWorldMock = vi.fn();

vi.mock("#compiled/@workflow/core/runtime.js", () => ({
getWorld: (...args: unknown[]) => getWorldMock(...args),
healthCheck: (...args: unknown[]) => healthCheckMock(...args),
resumeHook: (...args: unknown[]) => resumeHookMock(...args),
setWorld: (...args: unknown[]) => setWorldMock(...args),
}));

afterEach(() => {
getWorldMock.mockReset();
healthCheckMock.mockReset();
queueMock.mockReset();
resumeHookMock.mockReset();
setWorldMock.mockReset();
vi.unstubAllEnvs();
});

describe("workflow runtime resumeHook", () => {
const hook = {
createdAt: new Date(),
environment: "production",
hookId: "hook_session",
ownerId: "team_test",
projectId: "project_test",
runId: "wrun_session",
token: "slack:C1:T1",
};

function mockWorld(run: {
readonly deploymentId: string;
readonly executionContext?: Record<string, unknown>;
readonly runId?: string;
readonly specVersion?: number;
readonly workflowName?: string;
}) {
getWorldMock.mockResolvedValue({
queue: queueMock,
runs: {
get: vi.fn().mockResolvedValue({
createdAt: new Date(),
deploymentId: run.deploymentId,
executionContext: run.executionContext,
runId: run.runId ?? hook.runId,
specVersion: run.specVersion,
status: "running",
updatedAt: new Date(),
workflowName: run.workflowName ?? "workflowEntry",
}),
},
});
}

it("does not probe or fan out when the active namespace is legacy", async () => {
vi.stubEnv(WORKFLOW_QUEUE_NAMESPACE_ENV, LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE);
vi.stubEnv("VERCEL_DEPLOYMENT_ID", "dpl_current");
resumeHookMock.mockResolvedValue(hook);

await expect(resumeHook(hook.token, { kind: "deliver" })).resolves.toEqual(hook);

expect(resumeHookMock).toHaveBeenCalledWith(hook.token, { kind: "deliver" });
expect(getWorldMock).not.toHaveBeenCalled();
expect(queueMock).not.toHaveBeenCalled();
});

it("does not fan out for hooks owned by the current deployment", async () => {
vi.stubEnv(WORKFLOW_QUEUE_NAMESPACE_ENV, "eve6167656e74");
vi.stubEnv("VERCEL_DEPLOYMENT_ID", "dpl_current");
resumeHookMock.mockResolvedValue(hook);
mockWorld({ deploymentId: "dpl_current" });

await resumeHook(hook.token, { kind: "deliver" });

expect(healthCheckMock).not.toHaveBeenCalled();
expect(queueMock).not.toHaveBeenCalled();
});

it("does not fan out when the target deployment responds on the current namespace", async () => {
vi.stubEnv(WORKFLOW_QUEUE_NAMESPACE_ENV, "eve6167656e74");
vi.stubEnv("VERCEL_DEPLOYMENT_ID", "dpl_current");
resumeHookMock.mockResolvedValue(hook);
healthCheckMock.mockResolvedValue({ healthy: true });
mockWorld({ deploymentId: "dpl_previous" });

await resumeHook(hook.token, { kind: "deliver" });

expect(healthCheckMock).toHaveBeenCalledWith(expect.anything(), "workflow", {
deploymentId: "dpl_previous",
namespace: "eve6167656e74",
timeout: 1_000,
});
expect(queueMock).not.toHaveBeenCalled();
});

it("queues a legacy wake-up for old deployments that do not consume the current namespace", async () => {
vi.stubEnv(WORKFLOW_QUEUE_NAMESPACE_ENV, "eve6167656e74");
vi.stubEnv("VERCEL_DEPLOYMENT_ID", "dpl_current");
resumeHookMock.mockResolvedValue(hook);
healthCheckMock.mockResolvedValue({ healthy: false });
mockWorld({
deploymentId: "dpl_legacy",
executionContext: { traceCarrier: { traceparent: "00-test" } },
specVersion: 4,
});

await resumeHook(hook.token, { kind: "deliver" });

expect(queueMock).toHaveBeenCalledWith(
"__eve_wkf_workflow_workflowEntry",
{
runId: hook.runId,
traceCarrier: { traceparent: "00-test" },
},
{
deploymentId: "dpl_legacy",
specVersion: 4,
},
);
});
});
126 changes: 126 additions & 0 deletions packages/eve/src/internal/workflow/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
import * as workflowRuntime from "#compiled/@workflow/core/runtime.js";
import type { CryptoKey } from "#compiled/@workflow/core/encryption.js";
import type {
Hook,
ValidQueueName,
WorkflowRunWithoutData,
} from "#compiled/@workflow/world/index.js";
import { createLogger, logError } from "#internal/logging.js";
import {
LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE,
WORKFLOW_QUEUE_NAMESPACE_ENV,
} from "#internal/workflow/queue-namespace.js";

// Workflow turbo backgrounds run_started and forces optimistic inline start.
// Keep eve on the fully ordered runtime path until that beta behavior is safe.
Expand All @@ -11,7 +22,122 @@ export type {
WorkflowMetadata,
} from "#compiled/@workflow/core/runtime/start.js";

const log = createLogger("workflow.runtime");
const VERCEL_DEPLOYMENT_ID_ENV = "VERCEL_DEPLOYMENT_ID";
const QUEUE_NAMESPACE_PROBE_TIMEOUT_MS = 1_000;
const namespaceHealthCache = new Map<string, Promise<boolean>>();

/** Installs a World across source and vendored Workflow package identities. */
export function setWorld(world: unknown): void {
workflowRuntime.setWorld(world as Parameters<typeof workflowRuntime.setWorld>[0]);
}

/**
* Resumes a Workflow hook and wakes legacy eve deployments whose workflow
* handlers still subscribe to the pre-agent-scoped queue namespace.
*/
export async function resumeHook<T = any>(
tokenOrHook: string | Hook,
payload: T,
encryptionKeyOverride?: CryptoKey,
): Promise<Hook> {
const hook =
encryptionKeyOverride === undefined
? await workflowRuntime.resumeHook(tokenOrHook, payload)
: await workflowRuntime.resumeHook(tokenOrHook, payload, encryptionKeyOverride);

try {
await enqueueLegacyWorkflowResumeIfNeeded(hook);
} catch (error) {
logError(log, "failed to enqueue legacy workflow resume", error, {
hookId: hook.hookId,
runId: hook.runId,
});
}

return hook;
}

async function enqueueLegacyWorkflowResumeIfNeeded(hook: Hook): Promise<void> {
const currentNamespace = currentWorkflowQueueNamespace();
if (currentNamespace === undefined || currentNamespace === LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE) {
return;
}

const currentDeploymentId = currentVercelDeploymentId();
if (currentDeploymentId === undefined) return;

const world = await workflowRuntime.getWorld();
const run = await world.runs.get(hook.runId, { resolveData: "none" });
if (run.deploymentId === currentDeploymentId) return;

const currentNamespaceIsReachable = await isWorkflowNamespaceReachable({
deploymentId: run.deploymentId,
namespace: currentNamespace,
});
if (currentNamespaceIsReachable) return;

await world.queue(
workflowQueueName(run.workflowName, LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE),
{
runId: run.runId,
traceCarrier: readTraceCarrier(run),
},
{
deploymentId: run.deploymentId,
specVersion: run.specVersion ?? 1,
},
);
}

async function isWorkflowNamespaceReachable(input: {
readonly deploymentId: string;
readonly namespace: string;
}): Promise<boolean> {
const cacheKey = `${input.deploymentId}\0${input.namespace}`;
let cached = namespaceHealthCache.get(cacheKey);
if (cached === undefined) {
cached = probeWorkflowNamespace(input);
namespaceHealthCache.set(cacheKey, cached);
}
return cached;
}

async function probeWorkflowNamespace(input: {
readonly deploymentId: string;
readonly namespace: string;
}): Promise<boolean> {
try {
const world = await workflowRuntime.getWorld();
const result = await workflowRuntime.healthCheck(world, "workflow", {
deploymentId: input.deploymentId,
namespace: input.namespace,
timeout: QUEUE_NAMESPACE_PROBE_TIMEOUT_MS,
});
return result.healthy;
} catch {
return false;
}
}

function currentWorkflowQueueNamespace(): string | undefined {
const value = process.env[WORKFLOW_QUEUE_NAMESPACE_ENV]?.trim();
return value ? value : undefined;
}

function currentVercelDeploymentId(): string | undefined {
const value = process.env[VERCEL_DEPLOYMENT_ID_ENV]?.trim();
return value ? value : undefined;
}

function readTraceCarrier(run: WorkflowRunWithoutData): Record<string, string> | undefined {
const traceCarrier = run.executionContext?.["traceCarrier"];
if (traceCarrier === undefined || traceCarrier === null || typeof traceCarrier !== "object") {
return undefined;
}
return traceCarrier as Record<string, string>;
}

function workflowQueueName(workflowName: string, namespace: string): ValidQueueName {
return `__${namespace}_wkf_workflow_${workflowName}` as ValidQueueName;
}
Loading