Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ export class OrchestrationExecutor {

for (const oldEvent of oldEvents) {
await this.processEvent(ctx, oldEvent);
// Stop replaying once the orchestration has completed (e.g. EVENTRAISED resolved a
// whenAny that caused the workflow to return). Processing additional history events
// after completion can trigger spurious non-determinism errors (e.g. a TIMERCREATED
// confirmation that follows EVENTRAISED in the same batch) which would overwrite the
// correct COMPLETED status with a FAILED one and delay state visibility.
if (ctx._isComplete) break;
}

// Get new actions by executing newly received events into the orchestrator function
Expand All @@ -76,6 +82,11 @@ export class OrchestrationExecutor {

for (const newEvent of newEvents) {
await this.processEvent(ctx, newEvent);
// Same guard for new events: once complete, don't process further events in this
// batch. This is the primary fix for issue #734 where TIMERCREATED arrives in the
// same new-events batch as EVENTRAISED and (when ordered after it) incorrectly fails
// the already-completed orchestration.
if (ctx._isComplete) break;
}
} catch (e: any) {
ctx.setFailed(e);
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/helpers/CustomNodeEnvironment.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ limitations under the License.
*/

// eslint-disable-next-line @typescript-eslint/no-require-imports
const { TestEnvironment: NodeEnvironment } = require("jest-environment-node");
const NodeEnvironment = require("jest-environment-node");

/**
* Custom Jest test environment that installs a filter on the process object
Expand Down
68 changes: 68 additions & 0 deletions test/e2e/workflow/workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,74 @@ describe("workflow", () => {
expect(state?.serializedOutput).toEqual(JSON.stringify(16));
}, 31000);

// Regression test for https://github.com/dapr/js-sdk/issues/734
//
// The bug: after a workflow completes via whenAny([externalEvent, timer]) where
// the external event arrives *after* a preceding activity has already finished,
// getWorkflowState() returns RUNNING indefinitely despite internal completion.
//
// Root cause: the event is delivered "live" to the waiting workflow (not buffered
// before waitForExternalEvent is registered), taking a code path that fails to
// flush the completed status back through the state store in a timely manner.
//
// This test will FAIL when the bug is present (getWorkflowState stays RUNNING for
// 10+ seconds) and PASS once the fix lands (status syncs within ~5 seconds).
it("should reflect COMPLETED in getWorkflowState after whenAny resolves via live external event following activity (issue #734)", async () => {
// A trivial activity that just echoes its input; enough to force the workflow
// through an activity step before reaching the whenAny yield point.
const setupActivity = async (_: WorkflowActivityContext, jobName: string) => {
return { jobName, status: "created" };
};

const workflow: TWorkflow = async function* (ctx: WorkflowContext, jobName: string): any {
// Step 1: run an activity so the whenAny tasks are created *after* at least
// one history replay round-trip. This is the precondition that surfaces #734.
yield ctx.callActivity(setupActivity, jobName);

// Step 2: the problematic whenAny pattern from the issue
const eventTask = ctx.waitForExternalEvent("job-done");
const timerTask = ctx.createTimer(30); // long timeout so the event wins

const winner = yield ctx.whenAny([eventTask, timerTask]);

if (winner === eventTask) {
return "completed";
} else {
return "timeout";
}
};

workflowRuntime.registerWorkflow(workflow).registerActivity(setupActivity);
await workflowRuntime.start();

const id = await workflowClient.scheduleNewWorkflow(workflow, "test-job-734");

// Wait for the workflow to start and the activity to finish before we raise
// the event. Raising it before waitForExternalEvent is registered causes Dapr
// to buffer the event (a different code path that does *not* reproduce #734).
await workflowClient.waitForWorkflowStart(id, false, 15);
// Extra headroom for the activity round-trip through the Dapr sidecar.
await new Promise((resolve) => setTimeout(resolve, 2000));

// Simulate the K8s job callback: raise the event *after* the workflow is
// already sitting at the whenAny yield point.
await workflowClient.raiseEvent(id, "job-done", { success: true });

// Poll getWorkflowState() directly — this is the API that the issue reports
// as stuck. waitForWorkflowCompletion uses a server-side blocking RPC and
// would mask the bug; we intentionally use the snapshot query here.
const pollDeadlineMs = Date.now() + 8000; // 8 s: passes after fix, fails with bug
let lastState = await workflowClient.getWorkflowState(id, true);

while (Date.now() < pollDeadlineMs && lastState?.runtimeStatus !== WorkflowRuntimeStatus.COMPLETED) {
await new Promise((resolve) => setTimeout(resolve, 250));
lastState = await workflowClient.getWorkflowState(id, true);
}

expect(lastState?.runtimeStatus).toEqual(WorkflowRuntimeStatus.COMPLETED);
expect(lastState?.serializedOutput).toEqual(JSON.stringify("completed"));
}, 60000);

it("should be able to purge orchestration by id", async () => {
const plusOneActivity = async (_: WorkflowActivityContext, input: number) => {
return input + 1;
Expand Down
Loading