From 367f7d47aeb04d081d5b193af813681e603d0cfb Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Tue, 5 May 2026 09:23:35 +0000 Subject: [PATCH 1/3] fix: add missing EVENTSENT handler to orchestration executor The orchestration executor's processEvent switch statement was missing a case for the EVENTSENT history event type. When an orchestrator calls ctx.sendEvent(), the sidecar records an EventSentEvent to confirm the action was processed. Without handling this event during replay, the sendEvent action remained in _pendingActions and was returned to the sidecar again via getActions(), potentially causing duplicate event delivery. This adds a handleEventSent method that validates and removes the sendEvent action from _pendingActions on replay, consistent with how handleTaskScheduled, handleTimerCreated, and handleSubOrchestrationCreated handle their respective confirmation events. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/utils/pb-helper.util.ts | 16 +++ .../src/worker/orchestration-executor.ts | 20 ++++ .../test/orchestration_executor.spec.ts | 113 ++++++++++++++++++ 3 files changed, 149 insertions(+) diff --git a/packages/durabletask-js/src/utils/pb-helper.util.ts b/packages/durabletask-js/src/utils/pb-helper.util.ts index dd03077..100f0b2 100644 --- a/packages/durabletask-js/src/utils/pb-helper.util.ts +++ b/packages/durabletask-js/src/utils/pb-helper.util.ts @@ -236,6 +236,22 @@ export function newEventRaisedEvent(name: string, encodedInput?: string): pb.His return event; } +export function newEventSentEvent(eventId: number, instanceId: string, name: string, encodedInput?: string): pb.HistoryEvent { + const ts = new Timestamp(); + + const eventSent = new pb.EventSentEvent(); + eventSent.setInstanceid(instanceId); + eventSent.setName(name); + eventSent.setInput(getStringValue(encodedInput)); + + const event = new pb.HistoryEvent(); + event.setEventid(eventId); + event.setTimestamp(ts); + event.setEventsent(eventSent); + + return event; +} + export function newSuspendEvent(): pb.HistoryEvent { const ts = new Timestamp(); diff --git a/packages/durabletask-js/src/worker/orchestration-executor.ts b/packages/durabletask-js/src/worker/orchestration-executor.ts index e081d32..a6f17c2 100644 --- a/packages/durabletask-js/src/worker/orchestration-executor.ts +++ b/packages/durabletask-js/src/worker/orchestration-executor.ts @@ -162,6 +162,9 @@ export class OrchestrationExecutor { case pb.HistoryEvent.EventtypeCase.EVENTRAISED: await this.handleEventRaised(ctx, event); break; + case pb.HistoryEvent.EventtypeCase.EVENTSENT: + await this.handleEventSent(ctx, event); + break; case pb.HistoryEvent.EventtypeCase.EXECUTIONSUSPENDED: await this.handleExecutionSuspended(ctx, event); break; @@ -455,6 +458,23 @@ export class OrchestrationExecutor { } } + private async handleEventSent(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + // This history event confirms that a sendEvent action was successfully processed by the sidecar. + // Remove the action from the pending action list so we don't send it again. + const eventId = event.getEventid(); + const action = ctx._pendingActions[eventId]; + delete ctx._pendingActions[eventId]; + + const isSendEventAction = action?.hasSendevent(); + + if (!action) { + throw getNonDeterminismError(eventId, getName(ctx.sendEvent)); + } else if (!isSendEventAction) { + const expectedMethodName = getName(ctx.sendEvent); + throw getWrongActionTypeError(eventId, expectedMethodName, action); + } + } + private async handleExecutionSuspended(ctx: RuntimeOrchestrationContext, _event: pb.HistoryEvent): Promise { if (!this._isSuspended && !ctx._isReplaying) { WorkerLogs.orchestrationSuspended(this._logger, ctx._instanceId); diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index 599453f..c4d41ab 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -5,6 +5,7 @@ import { CompleteOrchestrationAction, OrchestratorAction } from "../src/proto/or import { OrchestrationContext } from "../src/task/context/orchestration-context"; import { newEventRaisedEvent, + newEventSentEvent, newExecutionStartedEvent, newOrchestratorStartedEvent, newResumeEvent, @@ -2119,6 +2120,118 @@ describe("Orchestration Executor", () => { }); }); +describe("EventSent Handler", () => { + it("should remove sendEvent action from pendingActions on replay when EVENTSENT event is received", async () => { + // Orchestrator sends an event then calls an activity + const myActivity = () => "activity-result"; + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + ctx.sendEvent("target-instance", "my-event", { key: "value" }); + const result = yield ctx.callActivity(myActivity); + return result; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + registry.addActivity(myActivity); + + // Simulate replay: oldEvents contain the execution start, EventSent confirmation, + // TaskScheduled confirmation, and task completion + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, "test-instance"), + // EventSent confirms the sendEvent action (ID=1 since it's the first action) + newEventSentEvent(1, "target-instance", "my-event", JSON.stringify({ key: "value" })), + // TaskScheduled confirms the activity action (ID=2) + newTaskScheduledEvent(2, "myActivity"), + newTaskCompletedEvent(2, JSON.stringify("activity-result")), + ]; + + const newEvents = [newOrchestratorStartedEvent()]; + + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute("test-instance", oldEvents, newEvents); + + // Only the complete action should remain - sendEvent should NOT be re-sent + expect(result.actions.length).toEqual(1); + const action = result.actions[0]; + expect(action.hasCompleteorchestration()).toBe(true); + expect(action.getCompleteorchestration()?.getResult()?.getValue()).toEqual( + JSON.stringify("activity-result"), + ); + }); + + it("should throw NonDeterminismError when EVENTSENT event has no matching action", async () => { + // Orchestrator does NOT call sendEvent but gets an EVENTSENT history event + const myActivity = () => "result"; + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(myActivity); + return result; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + registry.addActivity(myActivity); + + // oldEvents contain an EVENTSENT event with ID=99 that has no corresponding action + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, "test-instance"), + newEventSentEvent(99, "target-instance", "phantom-event"), + ]; + + const newEvents = [newOrchestratorStartedEvent()]; + + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute("test-instance", oldEvents, newEvents); + + // Should fail with NonDeterminismError + const completeAction = result.actions.find((a) => a.hasCompleteorchestration()); + expect(completeAction).toBeDefined(); + expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + ); + const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails(); + expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError"); + expect(failureDetails?.getErrormessage()).toMatch(/sendEvent.*ID=99/); + }); + + it("should throw when EVENTSENT event matches a non-sendEvent action type", async () => { + // Orchestrator calls an activity (creates ScheduleTask action with ID=1) + // but history says ID=1 is an EventSent event + const myActivity = () => "result"; + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(myActivity); + return result; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + registry.addActivity(myActivity); + + // EVENTSENT event with ID=1 conflicts with ScheduleTask action at ID=1 + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, "test-instance"), + newEventSentEvent(1, "target-instance", "my-event"), + ]; + + const newEvents = [newOrchestratorStartedEvent()]; + + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute("test-instance", oldEvents, newEvents); + + // Should fail with NonDeterminismError (wrong action type) + const completeAction = result.actions.find((a) => a.hasCompleteorchestration()); + expect(completeAction).toBeDefined(); + expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + ); + const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails(); + expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError"); + expect(failureDetails?.getErrormessage()).toMatch(/sendEvent/); + }); +}); + function getAndValidateSingleCompleteOrchestrationAction( result: OrchestrationExecutionResult, ): CompleteOrchestrationAction | undefined { From d9cebc5f6d7a961ecf1a0f04929dd181475c5464 Mon Sep 17 00:00:00 2001 From: wangbill Date: Fri, 12 Jun 2026 09:22:23 -0700 Subject: [PATCH 2/3] fix: validate EventSent replay details Validate sendEvent names when replaying EventSent history and throw a focused NonDeterminismError for wrong action types without relying on getMethodNameForAction. Update tests to use registered activity names and cover mismatched event names. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/worker/orchestration-executor.ts | 12 ++++++- .../test/orchestration_executor.spec.ts | 33 +++++++++++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/packages/durabletask-js/src/worker/orchestration-executor.ts b/packages/durabletask-js/src/worker/orchestration-executor.ts index a6f17c2..3566e5c 100644 --- a/packages/durabletask-js/src/worker/orchestration-executor.ts +++ b/packages/durabletask-js/src/worker/orchestration-executor.ts @@ -13,6 +13,7 @@ import { Logger, ConsoleLogger } from "../types/logger.type"; import { getName } from "../task"; import * as WorkerLogs from "./logs"; import { OrchestrationStateError } from "../task/exception/orchestration-state-error"; +import { NonDeterminismError } from "../task/exception/non-determinism-error"; import { CompletableTask } from "../task/completable-task"; import { RetryableTask } from "../task/retryable-task"; import { RetryHandlerTask } from "../task/retry-handler-task"; @@ -471,7 +472,16 @@ export class OrchestrationExecutor { throw getNonDeterminismError(eventId, getName(ctx.sendEvent)); } else if (!isSendEventAction) { const expectedMethodName = getName(ctx.sendEvent); - throw getWrongActionTypeError(eventId, expectedMethodName, action); + throw new NonDeterminismError( + `A previous execution called ${expectedMethodName} with ID=${eventId}, but the current execution is instead trying to call a different method as part of rebuilding its history.`, + ); + } + + const eventSent = event.getEventsent(); + const expectedEventName = eventSent?.getName(); + const actualEventName = action.getSendevent()?.getName(); + if (expectedEventName !== actualEventName) { + throw getWrongActionNameError(eventId, getName(ctx.sendEvent), expectedEventName, actualEventName); } } diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index c4d41ab..d667c17 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -2132,7 +2132,7 @@ describe("EventSent Handler", () => { const registry = new Registry(); const name = registry.addOrchestrator(orchestrator); - registry.addActivity(myActivity); + const activityName = registry.addActivity(myActivity); // Simulate replay: oldEvents contain the execution start, EventSent confirmation, // TaskScheduled confirmation, and task completion @@ -2142,7 +2142,7 @@ describe("EventSent Handler", () => { // EventSent confirms the sendEvent action (ID=1 since it's the first action) newEventSentEvent(1, "target-instance", "my-event", JSON.stringify({ key: "value" })), // TaskScheduled confirms the activity action (ID=2) - newTaskScheduledEvent(2, "myActivity"), + newTaskScheduledEvent(2, activityName), newTaskCompletedEvent(2, JSON.stringify("activity-result")), ]; @@ -2160,6 +2160,35 @@ describe("EventSent Handler", () => { ); }); + it("should throw NonDeterminismError when EVENTSENT event name does not match sendEvent action", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + ctx.sendEvent("target-instance", "my-event", { key: "value" }); + yield ctx.createTimer(1); + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, "test-instance"), + newEventSentEvent(1, "target-instance", "different-event", JSON.stringify({ key: "value" })), + ]; + + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute("test-instance", oldEvents, [newOrchestratorStartedEvent()]); + + const completeAction = result.actions.find((a) => a.hasCompleteorchestration()); + expect(completeAction).toBeDefined(); + expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + ); + const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails(); + expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError"); + expect(failureDetails?.getErrormessage()).toContain("different-event"); + expect(failureDetails?.getErrormessage()).toContain("my-event"); + }); + it("should throw NonDeterminismError when EVENTSENT event has no matching action", async () => { // Orchestrator does NOT call sendEvent but gets an EVENTSENT history event const myActivity = () => "result"; From d91c5d36b4db2392221683981e1aa8c3a17616cf Mon Sep 17 00:00:00 2001 From: wangbill Date: Fri, 12 Jun 2026 10:11:13 -0700 Subject: [PATCH 3/3] fix: validate EventSent target instance Validate EventSent target instance IDs before removing replayed sendEvent actions from pending actions. Move pending-action cleanup after replay validation succeeds and add regression coverage for target mismatch. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/worker/orchestration-executor.ts | 16 ++++++++-- .../test/orchestration_executor.spec.ts | 29 +++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/packages/durabletask-js/src/worker/orchestration-executor.ts b/packages/durabletask-js/src/worker/orchestration-executor.ts index 3566e5c..0699e6d 100644 --- a/packages/durabletask-js/src/worker/orchestration-executor.ts +++ b/packages/durabletask-js/src/worker/orchestration-executor.ts @@ -461,10 +461,8 @@ export class OrchestrationExecutor { private async handleEventSent(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { // This history event confirms that a sendEvent action was successfully processed by the sidecar. - // Remove the action from the pending action list so we don't send it again. const eventId = event.getEventid(); const action = ctx._pendingActions[eventId]; - delete ctx._pendingActions[eventId]; const isSendEventAction = action?.hasSendevent(); @@ -479,10 +477,22 @@ export class OrchestrationExecutor { const eventSent = event.getEventsent(); const expectedEventName = eventSent?.getName(); - const actualEventName = action.getSendevent()?.getName(); + const sendEventAction = action.getSendevent()!; + const actualEventName = sendEventAction.getName(); if (expectedEventName !== actualEventName) { throw getWrongActionNameError(eventId, getName(ctx.sendEvent), expectedEventName, actualEventName); } + + const expectedInstanceId = eventSent?.getInstanceid(); + const actualInstanceId = sendEventAction.getInstance()?.getInstanceid(); + if (expectedInstanceId !== actualInstanceId) { + throw new NonDeterminismError( + `Failed to restore orchestration state due to a history mismatch: A previous execution called ${getName(ctx.sendEvent)} with target instance '${expectedInstanceId}' and sequence number ${eventId}, but the current execution is instead trying to target instance '${actualInstanceId}' as part of rebuilding its history.`, + ); + } + + // Remove the action from the pending action list only after replay validation succeeds. + delete ctx._pendingActions[eventId]; } private async handleExecutionSuspended(ctx: RuntimeOrchestrationContext, _event: pb.HistoryEvent): Promise { diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index d667c17..a0b2edb 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -2189,6 +2189,35 @@ describe("EventSent Handler", () => { expect(failureDetails?.getErrormessage()).toContain("my-event"); }); + it("should throw NonDeterminismError when EVENTSENT target instance does not match sendEvent action", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + ctx.sendEvent("target-instance", "my-event", { key: "value" }); + yield ctx.createTimer(1); + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, "test-instance"), + newEventSentEvent(1, "different-target", "my-event", JSON.stringify({ key: "value" })), + ]; + + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute("test-instance", oldEvents, [newOrchestratorStartedEvent()]); + + const completeAction = result.actions.find((a) => a.hasCompleteorchestration()); + expect(completeAction).toBeDefined(); + expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + ); + const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails(); + expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError"); + expect(failureDetails?.getErrormessage()).toContain("different-target"); + expect(failureDetails?.getErrormessage()).toContain("target-instance"); + }); + it("should throw NonDeterminismError when EVENTSENT event has no matching action", async () => { // Orchestrator does NOT call sendEvent but gets an EVENTSENT history event const myActivity = () => "result";