diff --git a/packages/durabletask-js/src/utils/pb-helper.util.ts b/packages/durabletask-js/src/utils/pb-helper.util.ts index dd03077..100f0b2 100644 --- a/packages/durabletask-js/src/utils/pb-helper.util.ts +++ b/packages/durabletask-js/src/utils/pb-helper.util.ts @@ -236,6 +236,22 @@ export function newEventRaisedEvent(name: string, encodedInput?: string): pb.His return event; } +export function newEventSentEvent(eventId: number, instanceId: string, name: string, encodedInput?: string): pb.HistoryEvent { + const ts = new Timestamp(); + + const eventSent = new pb.EventSentEvent(); + eventSent.setInstanceid(instanceId); + eventSent.setName(name); + eventSent.setInput(getStringValue(encodedInput)); + + const event = new pb.HistoryEvent(); + event.setEventid(eventId); + event.setTimestamp(ts); + event.setEventsent(eventSent); + + return event; +} + export function newSuspendEvent(): pb.HistoryEvent { const ts = new Timestamp(); diff --git a/packages/durabletask-js/src/worker/orchestration-executor.ts b/packages/durabletask-js/src/worker/orchestration-executor.ts index e081d32..0699e6d 100644 --- a/packages/durabletask-js/src/worker/orchestration-executor.ts +++ b/packages/durabletask-js/src/worker/orchestration-executor.ts @@ -13,6 +13,7 @@ import { Logger, ConsoleLogger } from "../types/logger.type"; import { getName } from "../task"; import * as WorkerLogs from "./logs"; import { OrchestrationStateError } from "../task/exception/orchestration-state-error"; +import { NonDeterminismError } from "../task/exception/non-determinism-error"; import { CompletableTask } from "../task/completable-task"; import { RetryableTask } from "../task/retryable-task"; import { RetryHandlerTask } from "../task/retry-handler-task"; @@ -162,6 +163,9 @@ export class OrchestrationExecutor { case pb.HistoryEvent.EventtypeCase.EVENTRAISED: await this.handleEventRaised(ctx, event); break; + case pb.HistoryEvent.EventtypeCase.EVENTSENT: + await this.handleEventSent(ctx, event); + break; case pb.HistoryEvent.EventtypeCase.EXECUTIONSUSPENDED: await this.handleExecutionSuspended(ctx, event); break; @@ -455,6 +459,42 @@ export class OrchestrationExecutor { } } + private async handleEventSent(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + // This history event confirms that a sendEvent action was successfully processed by the sidecar. + const eventId = event.getEventid(); + const action = ctx._pendingActions[eventId]; + + const isSendEventAction = action?.hasSendevent(); + + if (!action) { + throw getNonDeterminismError(eventId, getName(ctx.sendEvent)); + } else if (!isSendEventAction) { + const expectedMethodName = getName(ctx.sendEvent); + throw new NonDeterminismError( + `A previous execution called ${expectedMethodName} with ID=${eventId}, but the current execution is instead trying to call a different method as part of rebuilding its history.`, + ); + } + + const eventSent = event.getEventsent(); + const expectedEventName = eventSent?.getName(); + const sendEventAction = action.getSendevent()!; + const actualEventName = sendEventAction.getName(); + if (expectedEventName !== actualEventName) { + throw getWrongActionNameError(eventId, getName(ctx.sendEvent), expectedEventName, actualEventName); + } + + const expectedInstanceId = eventSent?.getInstanceid(); + const actualInstanceId = sendEventAction.getInstance()?.getInstanceid(); + if (expectedInstanceId !== actualInstanceId) { + throw new NonDeterminismError( + `Failed to restore orchestration state due to a history mismatch: A previous execution called ${getName(ctx.sendEvent)} with target instance '${expectedInstanceId}' and sequence number ${eventId}, but the current execution is instead trying to target instance '${actualInstanceId}' as part of rebuilding its history.`, + ); + } + + // Remove the action from the pending action list only after replay validation succeeds. + delete ctx._pendingActions[eventId]; + } + private async handleExecutionSuspended(ctx: RuntimeOrchestrationContext, _event: pb.HistoryEvent): Promise { if (!this._isSuspended && !ctx._isReplaying) { WorkerLogs.orchestrationSuspended(this._logger, ctx._instanceId); diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index 599453f..a0b2edb 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -5,6 +5,7 @@ import { CompleteOrchestrationAction, OrchestratorAction } from "../src/proto/or import { OrchestrationContext } from "../src/task/context/orchestration-context"; import { newEventRaisedEvent, + newEventSentEvent, newExecutionStartedEvent, newOrchestratorStartedEvent, newResumeEvent, @@ -2119,6 +2120,176 @@ describe("Orchestration Executor", () => { }); }); +describe("EventSent Handler", () => { + it("should remove sendEvent action from pendingActions on replay when EVENTSENT event is received", async () => { + // Orchestrator sends an event then calls an activity + const myActivity = () => "activity-result"; + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + ctx.sendEvent("target-instance", "my-event", { key: "value" }); + const result = yield ctx.callActivity(myActivity); + return result; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const activityName = registry.addActivity(myActivity); + + // Simulate replay: oldEvents contain the execution start, EventSent confirmation, + // TaskScheduled confirmation, and task completion + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, "test-instance"), + // EventSent confirms the sendEvent action (ID=1 since it's the first action) + newEventSentEvent(1, "target-instance", "my-event", JSON.stringify({ key: "value" })), + // TaskScheduled confirms the activity action (ID=2) + newTaskScheduledEvent(2, activityName), + newTaskCompletedEvent(2, JSON.stringify("activity-result")), + ]; + + const newEvents = [newOrchestratorStartedEvent()]; + + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute("test-instance", oldEvents, newEvents); + + // Only the complete action should remain - sendEvent should NOT be re-sent + expect(result.actions.length).toEqual(1); + const action = result.actions[0]; + expect(action.hasCompleteorchestration()).toBe(true); + expect(action.getCompleteorchestration()?.getResult()?.getValue()).toEqual( + JSON.stringify("activity-result"), + ); + }); + + it("should throw NonDeterminismError when EVENTSENT event name does not match sendEvent action", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + ctx.sendEvent("target-instance", "my-event", { key: "value" }); + yield ctx.createTimer(1); + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, "test-instance"), + newEventSentEvent(1, "target-instance", "different-event", JSON.stringify({ key: "value" })), + ]; + + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute("test-instance", oldEvents, [newOrchestratorStartedEvent()]); + + const completeAction = result.actions.find((a) => a.hasCompleteorchestration()); + expect(completeAction).toBeDefined(); + expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + ); + const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails(); + expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError"); + expect(failureDetails?.getErrormessage()).toContain("different-event"); + expect(failureDetails?.getErrormessage()).toContain("my-event"); + }); + + it("should throw NonDeterminismError when EVENTSENT target instance does not match sendEvent action", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + ctx.sendEvent("target-instance", "my-event", { key: "value" }); + yield ctx.createTimer(1); + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, "test-instance"), + newEventSentEvent(1, "different-target", "my-event", JSON.stringify({ key: "value" })), + ]; + + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute("test-instance", oldEvents, [newOrchestratorStartedEvent()]); + + const completeAction = result.actions.find((a) => a.hasCompleteorchestration()); + expect(completeAction).toBeDefined(); + expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + ); + const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails(); + expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError"); + expect(failureDetails?.getErrormessage()).toContain("different-target"); + expect(failureDetails?.getErrormessage()).toContain("target-instance"); + }); + + it("should throw NonDeterminismError when EVENTSENT event has no matching action", async () => { + // Orchestrator does NOT call sendEvent but gets an EVENTSENT history event + const myActivity = () => "result"; + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(myActivity); + return result; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + registry.addActivity(myActivity); + + // oldEvents contain an EVENTSENT event with ID=99 that has no corresponding action + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, "test-instance"), + newEventSentEvent(99, "target-instance", "phantom-event"), + ]; + + const newEvents = [newOrchestratorStartedEvent()]; + + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute("test-instance", oldEvents, newEvents); + + // Should fail with NonDeterminismError + const completeAction = result.actions.find((a) => a.hasCompleteorchestration()); + expect(completeAction).toBeDefined(); + expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + ); + const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails(); + expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError"); + expect(failureDetails?.getErrormessage()).toMatch(/sendEvent.*ID=99/); + }); + + it("should throw when EVENTSENT event matches a non-sendEvent action type", async () => { + // Orchestrator calls an activity (creates ScheduleTask action with ID=1) + // but history says ID=1 is an EventSent event + const myActivity = () => "result"; + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(myActivity); + return result; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + registry.addActivity(myActivity); + + // EVENTSENT event with ID=1 conflicts with ScheduleTask action at ID=1 + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, "test-instance"), + newEventSentEvent(1, "target-instance", "my-event"), + ]; + + const newEvents = [newOrchestratorStartedEvent()]; + + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute("test-instance", oldEvents, newEvents); + + // Should fail with NonDeterminismError (wrong action type) + const completeAction = result.actions.find((a) => a.hasCompleteorchestration()); + expect(completeAction).toBeDefined(); + expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + ); + const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails(); + expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError"); + expect(failureDetails?.getErrormessage()).toMatch(/sendEvent/); + }); +}); + function getAndValidateSingleCompleteOrchestrationAction( result: OrchestrationExecutionResult, ): CompleteOrchestrationAction | undefined {