diff --git a/.changeset/fix-slack-schedule-legacy-workflow-resume.md b/.changeset/fix-slack-schedule-legacy-workflow-resume.md new file mode 100644 index 00000000..fa04968e --- /dev/null +++ b/.changeset/fix-slack-schedule-legacy-workflow-resume.md @@ -0,0 +1,5 @@ +--- +"eve": patch +--- + +Resume parked sessions created before eve's agent-scoped Workflow queues by issuing a legacy queue wake-up when the target deployment does not consume the current namespace. This restores follow-up handling for Slack threads started by scheduled runs before the queue namespace change. diff --git a/packages/eve/src/internal/workflow/queue-namespace.ts b/packages/eve/src/internal/workflow/queue-namespace.ts index 8a660ec8..7c9a4a28 100644 --- a/packages/eve/src/internal/workflow/queue-namespace.ts +++ b/packages/eve/src/internal/workflow/queue-namespace.ts @@ -1,4 +1,5 @@ export const WORKFLOW_QUEUE_NAMESPACE_ENV = "WORKFLOW_QUEUE_NAMESPACE"; +export const LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE = "eve"; /** Derives a stable Workflow queue namespace from an eve agent's unique name. */ export function deriveEveWorkflowQueueNamespace(agentName: string): string { diff --git a/packages/eve/src/internal/workflow/runtime.test.ts b/packages/eve/src/internal/workflow/runtime.test.ts new file mode 100644 index 00000000..421f4955 --- /dev/null +++ b/packages/eve/src/internal/workflow/runtime.test.ts @@ -0,0 +1,132 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; + +import { + LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE, + WORKFLOW_QUEUE_NAMESPACE_ENV, +} from "#internal/workflow/queue-namespace.js"; +import { resumeHook } from "#internal/workflow/runtime.js"; + +const getWorldMock = vi.fn(); +const healthCheckMock = vi.fn(); +const queueMock = vi.fn(); +const resumeHookMock = vi.fn(); +const setWorldMock = vi.fn(); + +vi.mock("#compiled/@workflow/core/runtime.js", () => ({ + getWorld: (...args: unknown[]) => getWorldMock(...args), + healthCheck: (...args: unknown[]) => healthCheckMock(...args), + resumeHook: (...args: unknown[]) => resumeHookMock(...args), + setWorld: (...args: unknown[]) => setWorldMock(...args), +})); + +afterEach(() => { + getWorldMock.mockReset(); + healthCheckMock.mockReset(); + queueMock.mockReset(); + resumeHookMock.mockReset(); + setWorldMock.mockReset(); + vi.unstubAllEnvs(); +}); + +describe("workflow runtime resumeHook", () => { + const hook = { + createdAt: new Date(), + environment: "production", + hookId: "hook_session", + ownerId: "team_test", + projectId: "project_test", + runId: "wrun_session", + token: "slack:C1:T1", + }; + + function mockWorld(run: { + readonly deploymentId: string; + readonly executionContext?: Record; + readonly runId?: string; + readonly specVersion?: number; + readonly workflowName?: string; + }) { + getWorldMock.mockResolvedValue({ + queue: queueMock, + runs: { + get: vi.fn().mockResolvedValue({ + createdAt: new Date(), + deploymentId: run.deploymentId, + executionContext: run.executionContext, + runId: run.runId ?? hook.runId, + specVersion: run.specVersion, + status: "running", + updatedAt: new Date(), + workflowName: run.workflowName ?? "workflowEntry", + }), + }, + }); + } + + it("does not probe or fan out when the active namespace is legacy", async () => { + vi.stubEnv(WORKFLOW_QUEUE_NAMESPACE_ENV, LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE); + vi.stubEnv("VERCEL_DEPLOYMENT_ID", "dpl_current"); + resumeHookMock.mockResolvedValue(hook); + + await expect(resumeHook(hook.token, { kind: "deliver" })).resolves.toEqual(hook); + + expect(resumeHookMock).toHaveBeenCalledWith(hook.token, { kind: "deliver" }); + expect(getWorldMock).not.toHaveBeenCalled(); + expect(queueMock).not.toHaveBeenCalled(); + }); + + it("does not fan out for hooks owned by the current deployment", async () => { + vi.stubEnv(WORKFLOW_QUEUE_NAMESPACE_ENV, "eve6167656e74"); + vi.stubEnv("VERCEL_DEPLOYMENT_ID", "dpl_current"); + resumeHookMock.mockResolvedValue(hook); + mockWorld({ deploymentId: "dpl_current" }); + + await resumeHook(hook.token, { kind: "deliver" }); + + expect(healthCheckMock).not.toHaveBeenCalled(); + expect(queueMock).not.toHaveBeenCalled(); + }); + + it("does not fan out when the target deployment responds on the current namespace", async () => { + vi.stubEnv(WORKFLOW_QUEUE_NAMESPACE_ENV, "eve6167656e74"); + vi.stubEnv("VERCEL_DEPLOYMENT_ID", "dpl_current"); + resumeHookMock.mockResolvedValue(hook); + healthCheckMock.mockResolvedValue({ healthy: true }); + mockWorld({ deploymentId: "dpl_previous" }); + + await resumeHook(hook.token, { kind: "deliver" }); + + expect(healthCheckMock).toHaveBeenCalledWith(expect.anything(), "workflow", { + deploymentId: "dpl_previous", + namespace: "eve6167656e74", + timeout: 1_000, + }); + expect(queueMock).not.toHaveBeenCalled(); + }); + + it("queues a legacy wake-up for old deployments that do not consume the current namespace", async () => { + vi.stubEnv(WORKFLOW_QUEUE_NAMESPACE_ENV, "eve6167656e74"); + vi.stubEnv("VERCEL_DEPLOYMENT_ID", "dpl_current"); + resumeHookMock.mockResolvedValue(hook); + healthCheckMock.mockResolvedValue({ healthy: false }); + mockWorld({ + deploymentId: "dpl_legacy", + executionContext: { traceCarrier: { traceparent: "00-test" } }, + specVersion: 4, + }); + + await resumeHook(hook.token, { kind: "deliver" }); + + expect(queueMock).toHaveBeenCalledWith( + "__eve_wkf_workflow_workflowEntry", + { + runId: hook.runId, + traceCarrier: { traceparent: "00-test" }, + }, + { + deploymentId: "dpl_legacy", + specVersion: 4, + }, + ); + }); +}); diff --git a/packages/eve/src/internal/workflow/runtime.ts b/packages/eve/src/internal/workflow/runtime.ts index 71bb520b..2105f647 100644 --- a/packages/eve/src/internal/workflow/runtime.ts +++ b/packages/eve/src/internal/workflow/runtime.ts @@ -1,4 +1,15 @@ import * as workflowRuntime from "#compiled/@workflow/core/runtime.js"; +import type { CryptoKey } from "#compiled/@workflow/core/encryption.js"; +import type { + Hook, + ValidQueueName, + WorkflowRunWithoutData, +} from "#compiled/@workflow/world/index.js"; +import { createLogger, logError } from "#internal/logging.js"; +import { + LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE, + WORKFLOW_QUEUE_NAMESPACE_ENV, +} from "#internal/workflow/queue-namespace.js"; // Workflow turbo backgrounds run_started and forces optimistic inline start. // Keep eve on the fully ordered runtime path until that beta behavior is safe. @@ -11,7 +22,122 @@ export type { WorkflowMetadata, } from "#compiled/@workflow/core/runtime/start.js"; +const log = createLogger("workflow.runtime"); +const VERCEL_DEPLOYMENT_ID_ENV = "VERCEL_DEPLOYMENT_ID"; +const QUEUE_NAMESPACE_PROBE_TIMEOUT_MS = 1_000; +const namespaceHealthCache = new Map>(); + /** Installs a World across source and vendored Workflow package identities. */ export function setWorld(world: unknown): void { workflowRuntime.setWorld(world as Parameters[0]); } + +/** + * Resumes a Workflow hook and wakes legacy eve deployments whose workflow + * handlers still subscribe to the pre-agent-scoped queue namespace. + */ +export async function resumeHook( + tokenOrHook: string | Hook, + payload: T, + encryptionKeyOverride?: CryptoKey, +): Promise { + const hook = + encryptionKeyOverride === undefined + ? await workflowRuntime.resumeHook(tokenOrHook, payload) + : await workflowRuntime.resumeHook(tokenOrHook, payload, encryptionKeyOverride); + + try { + await enqueueLegacyWorkflowResumeIfNeeded(hook); + } catch (error) { + logError(log, "failed to enqueue legacy workflow resume", error, { + hookId: hook.hookId, + runId: hook.runId, + }); + } + + return hook; +} + +async function enqueueLegacyWorkflowResumeIfNeeded(hook: Hook): Promise { + const currentNamespace = currentWorkflowQueueNamespace(); + if (currentNamespace === undefined || currentNamespace === LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE) { + return; + } + + const currentDeploymentId = currentVercelDeploymentId(); + if (currentDeploymentId === undefined) return; + + const world = await workflowRuntime.getWorld(); + const run = await world.runs.get(hook.runId, { resolveData: "none" }); + if (run.deploymentId === currentDeploymentId) return; + + const currentNamespaceIsReachable = await isWorkflowNamespaceReachable({ + deploymentId: run.deploymentId, + namespace: currentNamespace, + }); + if (currentNamespaceIsReachable) return; + + await world.queue( + workflowQueueName(run.workflowName, LEGACY_EVE_WORKFLOW_QUEUE_NAMESPACE), + { + runId: run.runId, + traceCarrier: readTraceCarrier(run), + }, + { + deploymentId: run.deploymentId, + specVersion: run.specVersion ?? 1, + }, + ); +} + +async function isWorkflowNamespaceReachable(input: { + readonly deploymentId: string; + readonly namespace: string; +}): Promise { + const cacheKey = `${input.deploymentId}\0${input.namespace}`; + let cached = namespaceHealthCache.get(cacheKey); + if (cached === undefined) { + cached = probeWorkflowNamespace(input); + namespaceHealthCache.set(cacheKey, cached); + } + return cached; +} + +async function probeWorkflowNamespace(input: { + readonly deploymentId: string; + readonly namespace: string; +}): Promise { + try { + const world = await workflowRuntime.getWorld(); + const result = await workflowRuntime.healthCheck(world, "workflow", { + deploymentId: input.deploymentId, + namespace: input.namespace, + timeout: QUEUE_NAMESPACE_PROBE_TIMEOUT_MS, + }); + return result.healthy; + } catch { + return false; + } +} + +function currentWorkflowQueueNamespace(): string | undefined { + const value = process.env[WORKFLOW_QUEUE_NAMESPACE_ENV]?.trim(); + return value ? value : undefined; +} + +function currentVercelDeploymentId(): string | undefined { + const value = process.env[VERCEL_DEPLOYMENT_ID_ENV]?.trim(); + return value ? value : undefined; +} + +function readTraceCarrier(run: WorkflowRunWithoutData): Record | undefined { + const traceCarrier = run.executionContext?.["traceCarrier"]; + if (traceCarrier === undefined || traceCarrier === null || typeof traceCarrier !== "object") { + return undefined; + } + return traceCarrier as Record; +} + +function workflowQueueName(workflowName: string, namespace: string): ValidQueueName { + return `__${namespace}_wkf_workflow_${workflowName}` as ValidQueueName; +}