Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/durabletask-js/src/utils/pb-helper.util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,22 @@ export function newEventRaisedEvent(name: string, encodedInput?: string): pb.His
return event;
}

export function newEventSentEvent(eventId: number, instanceId: string, name: string, encodedInput?: string): pb.HistoryEvent {
const ts = new Timestamp();

const eventSent = new pb.EventSentEvent();
eventSent.setInstanceid(instanceId);
eventSent.setName(name);
eventSent.setInput(getStringValue(encodedInput));

const event = new pb.HistoryEvent();
event.setEventid(eventId);
event.setTimestamp(ts);
event.setEventsent(eventSent);

return event;
}

export function newSuspendEvent(): pb.HistoryEvent {
const ts = new Timestamp();

Expand Down
40 changes: 40 additions & 0 deletions packages/durabletask-js/src/worker/orchestration-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { Logger, ConsoleLogger } from "../types/logger.type";
import { getName } from "../task";
import * as WorkerLogs from "./logs";
import { OrchestrationStateError } from "../task/exception/orchestration-state-error";
import { NonDeterminismError } from "../task/exception/non-determinism-error";
import { CompletableTask } from "../task/completable-task";
import { RetryableTask } from "../task/retryable-task";
import { RetryHandlerTask } from "../task/retry-handler-task";
Expand Down Expand Up @@ -162,6 +163,9 @@ export class OrchestrationExecutor {
case pb.HistoryEvent.EventtypeCase.EVENTRAISED:
await this.handleEventRaised(ctx, event);
break;
case pb.HistoryEvent.EventtypeCase.EVENTSENT:
await this.handleEventSent(ctx, event);
break;
case pb.HistoryEvent.EventtypeCase.EXECUTIONSUSPENDED:
await this.handleExecutionSuspended(ctx, event);
break;
Expand Down Expand Up @@ -455,6 +459,42 @@ export class OrchestrationExecutor {
}
}

private async handleEventSent(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise<void> {
// This history event confirms that a sendEvent action was successfully processed by the sidecar.
const eventId = event.getEventid();
const action = ctx._pendingActions[eventId];

const isSendEventAction = action?.hasSendevent();

if (!action) {
throw getNonDeterminismError(eventId, getName(ctx.sendEvent));
} else if (!isSendEventAction) {
const expectedMethodName = getName(ctx.sendEvent);
throw new NonDeterminismError(
`A previous execution called ${expectedMethodName} with ID=${eventId}, but the current execution is instead trying to call a different method as part of rebuilding its history.`,
);
}

const eventSent = event.getEventsent();
const expectedEventName = eventSent?.getName();
const sendEventAction = action.getSendevent()!;
const actualEventName = sendEventAction.getName();
if (expectedEventName !== actualEventName) {
throw getWrongActionNameError(eventId, getName(ctx.sendEvent), expectedEventName, actualEventName);
}

const expectedInstanceId = eventSent?.getInstanceid();
const actualInstanceId = sendEventAction.getInstance()?.getInstanceid();
if (expectedInstanceId !== actualInstanceId) {
throw new NonDeterminismError(
`Failed to restore orchestration state due to a history mismatch: A previous execution called ${getName(ctx.sendEvent)} with target instance '${expectedInstanceId}' and sequence number ${eventId}, but the current execution is instead trying to target instance '${actualInstanceId}' as part of rebuilding its history.`,
);
}

// Remove the action from the pending action list only after replay validation succeeds.
delete ctx._pendingActions[eventId];
}
Comment thread
YunchuWang marked this conversation as resolved.

private async handleExecutionSuspended(ctx: RuntimeOrchestrationContext, _event: pb.HistoryEvent): Promise<void> {
if (!this._isSuspended && !ctx._isReplaying) {
WorkerLogs.orchestrationSuspended(this._logger, ctx._instanceId);
Expand Down
171 changes: 171 additions & 0 deletions packages/durabletask-js/test/orchestration_executor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { CompleteOrchestrationAction, OrchestratorAction } from "../src/proto/or
import { OrchestrationContext } from "../src/task/context/orchestration-context";
import {
newEventRaisedEvent,
newEventSentEvent,
newExecutionStartedEvent,
newOrchestratorStartedEvent,
newResumeEvent,
Expand Down Expand Up @@ -2119,6 +2120,176 @@ describe("Orchestration Executor", () => {
});
});

describe("EventSent Handler", () => {
it("should remove sendEvent action from pendingActions on replay when EVENTSENT event is received", async () => {
// Orchestrator sends an event then calls an activity
const myActivity = () => "activity-result";
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
ctx.sendEvent("target-instance", "my-event", { key: "value" });
const result = yield ctx.callActivity(myActivity);
return result;
};

const registry = new Registry();
const name = registry.addOrchestrator(orchestrator);
const activityName = registry.addActivity(myActivity);

// Simulate replay: oldEvents contain the execution start, EventSent confirmation,
// TaskScheduled confirmation, and task completion
const oldEvents = [
newOrchestratorStartedEvent(),
newExecutionStartedEvent(name, "test-instance"),
// EventSent confirms the sendEvent action (ID=1 since it's the first action)
newEventSentEvent(1, "target-instance", "my-event", JSON.stringify({ key: "value" })),
// TaskScheduled confirms the activity action (ID=2)
newTaskScheduledEvent(2, activityName),
newTaskCompletedEvent(2, JSON.stringify("activity-result")),
Comment thread
Copilot marked this conversation as resolved.
];

const newEvents = [newOrchestratorStartedEvent()];

const executor = new OrchestrationExecutor(registry);
const result = await executor.execute("test-instance", oldEvents, newEvents);

// Only the complete action should remain - sendEvent should NOT be re-sent
expect(result.actions.length).toEqual(1);
const action = result.actions[0];
expect(action.hasCompleteorchestration()).toBe(true);
expect(action.getCompleteorchestration()?.getResult()?.getValue()).toEqual(
JSON.stringify("activity-result"),
);
});

it("should throw NonDeterminismError when EVENTSENT event name does not match sendEvent action", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
ctx.sendEvent("target-instance", "my-event", { key: "value" });
yield ctx.createTimer(1);
};

const registry = new Registry();
const name = registry.addOrchestrator(orchestrator);

const oldEvents = [
newOrchestratorStartedEvent(),
newExecutionStartedEvent(name, "test-instance"),
newEventSentEvent(1, "target-instance", "different-event", JSON.stringify({ key: "value" })),
];

const executor = new OrchestrationExecutor(registry);
const result = await executor.execute("test-instance", oldEvents, [newOrchestratorStartedEvent()]);

const completeAction = result.actions.find((a) => a.hasCompleteorchestration());
expect(completeAction).toBeDefined();
expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual(
pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED,
);
const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails();
expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError");
expect(failureDetails?.getErrormessage()).toContain("different-event");
expect(failureDetails?.getErrormessage()).toContain("my-event");
});

it("should throw NonDeterminismError when EVENTSENT target instance does not match sendEvent action", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
ctx.sendEvent("target-instance", "my-event", { key: "value" });
yield ctx.createTimer(1);
};

const registry = new Registry();
const name = registry.addOrchestrator(orchestrator);

const oldEvents = [
newOrchestratorStartedEvent(),
newExecutionStartedEvent(name, "test-instance"),
newEventSentEvent(1, "different-target", "my-event", JSON.stringify({ key: "value" })),
];

const executor = new OrchestrationExecutor(registry);
const result = await executor.execute("test-instance", oldEvents, [newOrchestratorStartedEvent()]);

const completeAction = result.actions.find((a) => a.hasCompleteorchestration());
expect(completeAction).toBeDefined();
expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual(
pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED,
);
const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails();
expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError");
expect(failureDetails?.getErrormessage()).toContain("different-target");
expect(failureDetails?.getErrormessage()).toContain("target-instance");
});

it("should throw NonDeterminismError when EVENTSENT event has no matching action", async () => {
// Orchestrator does NOT call sendEvent but gets an EVENTSENT history event
const myActivity = () => "result";
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
const result = yield ctx.callActivity(myActivity);
return result;
};

const registry = new Registry();
const name = registry.addOrchestrator(orchestrator);
registry.addActivity(myActivity);

// oldEvents contain an EVENTSENT event with ID=99 that has no corresponding action
const oldEvents = [
newOrchestratorStartedEvent(),
newExecutionStartedEvent(name, "test-instance"),
newEventSentEvent(99, "target-instance", "phantom-event"),
];

const newEvents = [newOrchestratorStartedEvent()];

const executor = new OrchestrationExecutor(registry);
const result = await executor.execute("test-instance", oldEvents, newEvents);

// Should fail with NonDeterminismError
const completeAction = result.actions.find((a) => a.hasCompleteorchestration());
expect(completeAction).toBeDefined();
expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual(
pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED,
);
const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails();
expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError");
expect(failureDetails?.getErrormessage()).toMatch(/sendEvent.*ID=99/);
});

it("should throw when EVENTSENT event matches a non-sendEvent action type", async () => {
// Orchestrator calls an activity (creates ScheduleTask action with ID=1)
// but history says ID=1 is an EventSent event
const myActivity = () => "result";
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
const result = yield ctx.callActivity(myActivity);
return result;
};

const registry = new Registry();
const name = registry.addOrchestrator(orchestrator);
registry.addActivity(myActivity);

// EVENTSENT event with ID=1 conflicts with ScheduleTask action at ID=1
const oldEvents = [
newOrchestratorStartedEvent(),
newExecutionStartedEvent(name, "test-instance"),
newEventSentEvent(1, "target-instance", "my-event"),
];

const newEvents = [newOrchestratorStartedEvent()];

const executor = new OrchestrationExecutor(registry);
const result = await executor.execute("test-instance", oldEvents, newEvents);

// Should fail with NonDeterminismError (wrong action type)
const completeAction = result.actions.find((a) => a.hasCompleteorchestration());
expect(completeAction).toBeDefined();
expect(completeAction?.getCompleteorchestration()?.getOrchestrationstatus()).toEqual(
pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED,
);
const failureDetails = completeAction?.getCompleteorchestration()?.getFailuredetails();
expect(failureDetails?.getErrortype()).toEqual("NonDeterminismError");
expect(failureDetails?.getErrormessage()).toMatch(/sendEvent/);
});
});

function getAndValidateSingleCompleteOrchestrationAction(
result: OrchestrationExecutionResult,
): CompleteOrchestrationAction | undefined {
Expand Down
Loading