diff --git a/package-lock.json b/package-lock.json index 3007de4..b7e5f7e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -226,6 +226,7 @@ "integrity": "sha512-H3mcG6ZDLTlYfaSNi0iOKkigqMFvkTKlGUYlD8GW7nNOYRrevuA46iTypPyv+06V3fEmvvazfntkBU34L0azAw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@babel/code-frame": "^7.28.6", "@babel/generator": "^7.28.6", @@ -897,6 +898,7 @@ "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.14.3.tgz", "integrity": "sha512-Iq8QQQ/7X3Sac15oB6p0FmUg/klxQvXLeileoqrTRGJYLV+/9tubbr9ipz0GKHjmXVsgFPo/+W+2cA8eNcR+XA==", "license": "Apache-2.0", + "peer": true, "dependencies": { "@grpc/proto-loader": "^0.8.0", "@js-sdsl/ordered-map": "^4.4.2" @@ -1493,6 +1495,7 @@ "integrity": "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==", "dev": true, "license": "Apache-2.0", + "peer": true, "engines": { "node": ">=8.0.0" } @@ -1669,6 +1672,7 @@ "dev": true, "hasInstallScript": true, "license": "Apache-2.0", + "peer": true, "dependencies": { "@swc/counter": "^0.1.3", "@swc/types": "^0.1.25" @@ -1884,6 +1888,7 @@ "integrity": "sha512-TXTnIcNJQEKwThMMqBXsZ4VGAza6bvN4pa41Rkqoio6QBKMvo+5lexeTMScGCIxtzgQJzElcvIltani+adC5PQ==", "dev": true, "license": "Apache-2.0", + "peer": true, "dependencies": { "tslib": "^2.8.0" } @@ -2045,6 +2050,7 @@ "resolved": "https://registry.npmjs.org/@types/node/-/node-22.19.9.tgz", "integrity": "sha512-PD03/U8g1F9T9MI+1OBisaIARhSzeidsUjQaf51fOxrfjeiKN9bLVO06lHuHYjxdnqLWJijJHfqXPSJri2EM2A==", "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~6.21.0" } @@ -2118,6 +2124,7 @@ "integrity": "sha512-BtE0k6cjwjLZoZixN0t5AKP0kSzlGu7FctRXYuPAm//aaiZhmfq1JwdYpYr1brzEspYyFeF+8XF5j2VK6oalrA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.54.0", "@typescript-eslint/types": "8.54.0", @@ -2359,6 +2366,7 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", + "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2683,6 +2691,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -3330,6 +3339,7 @@ "integrity": "sha512-LEyamqS7W5HB3ujJyvi0HQK/dtVINZvd5mAAp9eT5S/ujByGjiZLCzPcHVzuXbpJDJF/cxwHlfceVUDZ2lnSTw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -4245,6 +4255,7 @@ "integrity": "sha512-NIy3oAFp9shda19hy4HK0HRTWKtPJmGdnvywu01nOqNC2vZg+Z+fvJDxpMQA88eb2I9EcafcdjYgsDthnYTvGw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@jest/core": "^29.7.0", "@jest/types": "^29.6.3", @@ -5991,6 +6002,7 @@ "integrity": "sha512-UOnG6LftzbdaHZcKoPFtOcCKztrQ57WkHDeRD9t/PTQtmT0NHSeWWepj6pS0z/N7+08BHFDQVUrfmfMRcZwbMg==", "dev": true, "license": "MIT", + "peer": true, "bin": { "prettier": "bin/prettier.cjs" }, @@ -6715,6 +6727,7 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -6866,6 +6879,7 @@ "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@cspotcode/source-map-support": "^0.8.0", "@tsconfig/node10": "^1.0.7", @@ -6952,6 +6966,7 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" diff --git a/packages/durabletask-js/src/index.ts b/packages/durabletask-js/src/index.ts index 96d7bc9..57d6f99 100644 --- a/packages/durabletask-js/src/index.ts +++ b/packages/durabletask-js/src/index.ts @@ -62,18 +62,32 @@ export { // Proto types (for advanced usage) export { OrchestrationStatus as ProtoOrchestrationStatus } from "./proto/orchestrator_service_pb"; +// Failure details +export { FailureDetails, TaskFailureDetails } from "./task/failure-details"; + // Task utilities export { getName, whenAll, whenAny } from "./task"; export { Task } from "./task/task"; // Retry policies and task options -export { RetryPolicy, RetryPolicyOptions } from "./task/retry"; +export { + RetryPolicy, + RetryPolicyOptions, + FailureHandlerPredicate, + RetryContext, + createRetryContext, + RetryHandler, + AsyncRetryHandler, + RetryHandlerResult, + toAsyncRetryHandler, +} from "./task/retry"; export { TaskOptions, SubOrchestrationOptions, StartOrchestrationOptions, - taskOptionsFromRetryPolicy, - subOrchestrationOptionsFromRetryPolicy, + TaskRetryOptions, + isRetryPolicy, + isRetryHandler, } from "./task/options"; // Types diff --git a/packages/durabletask-js/src/task/exception/task-failed-error.ts b/packages/durabletask-js/src/task/exception/task-failed-error.ts index a4caff2..030ab52 100644 --- a/packages/durabletask-js/src/task/exception/task-failed-error.ts +++ b/packages/durabletask-js/src/task/exception/task-failed-error.ts @@ -9,6 +9,7 @@ export class TaskFailedError extends Error { constructor(message: string, details: pb.TaskFailureDetails) { super(message); + this.name = "TaskFailedError"; this._details = new FailureDetails( details.getErrormessage(), diff --git a/packages/durabletask-js/src/task/failure-details.ts b/packages/durabletask-js/src/task/failure-details.ts index 7047465..2dec0d7 100644 --- a/packages/durabletask-js/src/task/failure-details.ts +++ b/packages/durabletask-js/src/task/failure-details.ts @@ -1,7 +1,20 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -export class FailureDetails { +/** + * Interface representing task failure details. + * This is used for retry handlers to inspect failure information. + */ +export interface TaskFailureDetails { + /** The type/class name of the error */ + readonly errorType: string; + /** The error message */ + readonly message: string; + /** The stack trace, if available */ + readonly stackTrace?: string; +} + +export class FailureDetails implements TaskFailureDetails { private _message: string; private _errorType: string; private _stackTrace: string | undefined; diff --git a/packages/durabletask-js/src/task/options/index.ts b/packages/durabletask-js/src/task/options/index.ts index bdcc4ab..8ea3058 100644 --- a/packages/durabletask-js/src/task/options/index.ts +++ b/packages/durabletask-js/src/task/options/index.ts @@ -5,6 +5,7 @@ export { TaskOptions, SubOrchestrationOptions, StartOrchestrationOptions, - taskOptionsFromRetryPolicy, - subOrchestrationOptionsFromRetryPolicy, + TaskRetryOptions, + isRetryPolicy, + isRetryHandler, } from "./task-options"; diff --git a/packages/durabletask-js/src/task/options/task-options.ts b/packages/durabletask-js/src/task/options/task-options.ts index 5cb11ed..765baa8 100644 --- a/packages/durabletask-js/src/task/options/task-options.ts +++ b/packages/durabletask-js/src/task/options/task-options.ts @@ -2,16 +2,31 @@ // Licensed under the MIT License. import { RetryPolicy } from "../retry/retry-policy"; +import { AsyncRetryHandler, RetryHandler } from "../retry/retry-handler"; + +/** + * Union type representing the available retry strategies for a task. + * + * - {@link RetryPolicy} for declarative retry control (with backoff, max attempts, etc.) + * - {@link AsyncRetryHandler} for asynchronous imperative retry control + * - {@link RetryHandler} for synchronous imperative retry control + * + * When a synchronous {@link RetryHandler} is provided, it is automatically + * wrapped into an {@link AsyncRetryHandler} internally. + */ +export type TaskRetryOptions = RetryPolicy | AsyncRetryHandler | RetryHandler; /** * Options that can be used to control the behavior of orchestrator task execution. */ export interface TaskOptions { /** - * The retry policy for the task. - * Controls how many times a task is retried and the delay between retries. + * The retry options for the task. + * Can be a RetryPolicy for declarative retry control, + * an AsyncRetryHandler for async imperative retry control, + * or a RetryHandler for sync imperative retry control. */ - retry?: RetryPolicy; + retry?: TaskRetryOptions; /** * The tags to associate with the task. */ @@ -62,45 +77,22 @@ export interface StartOrchestrationOptions { } /** - * Creates a TaskOptions instance from a RetryPolicy. + * Type guard to check if the retry option is a RetryPolicy. * - * @param policy - The retry policy to use - * @returns A TaskOptions instance configured with the retry policy - * - * @example - * ```typescript - * const retryPolicy = new RetryPolicy({ - * maxNumberOfAttempts: 3, - * firstRetryIntervalInMilliseconds: 1000 - * }); - * - * const options = taskOptionsFromRetryPolicy(retryPolicy); - * ``` + * @param retry - The retry option to check + * @returns true if the retry option is a RetryPolicy, false otherwise */ -export function taskOptionsFromRetryPolicy(policy: RetryPolicy): TaskOptions { - return { retry: policy }; +export function isRetryPolicy(retry: TaskRetryOptions | undefined): retry is RetryPolicy { + return retry instanceof RetryPolicy; } /** - * Creates a SubOrchestrationOptions instance from a RetryPolicy and optional instance ID. - * - * @param policy - The retry policy to use - * @param instanceId - Optional instance ID for the sub-orchestration - * @returns A SubOrchestrationOptions instance configured with the retry policy - * - * @example - * ```typescript - * const retryPolicy = new RetryPolicy({ - * maxNumberOfAttempts: 3, - * firstRetryIntervalInMilliseconds: 1000 - * }); + * Type guard to check if the retry option is a retry handler function + * (either {@link AsyncRetryHandler} or {@link RetryHandler}). * - * const options = subOrchestrationOptionsFromRetryPolicy(retryPolicy, "my-sub-orch-123"); - * ``` + * @param retry - The retry option to check + * @returns true if the retry option is a handler function, false otherwise */ -export function subOrchestrationOptionsFromRetryPolicy( - policy: RetryPolicy, - instanceId?: string, -): SubOrchestrationOptions { - return { retry: policy, instanceId }; +export function isRetryHandler(retry: TaskRetryOptions | undefined): retry is AsyncRetryHandler | RetryHandler { + return typeof retry === "function"; } diff --git a/packages/durabletask-js/src/task/retry-handler-task.ts b/packages/durabletask-js/src/task/retry-handler-task.ts new file mode 100644 index 0000000..68024dd --- /dev/null +++ b/packages/durabletask-js/src/task/retry-handler-task.ts @@ -0,0 +1,93 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import * as pb from "../proto/orchestrator_service_pb"; +import { OrchestrationContext } from "./context/orchestration-context"; +import { RetryTaskBase, RetryTaskType } from "./retry-task-base"; +import { AsyncRetryHandler, RetryHandlerResult } from "./retry/retry-handler"; +import { createRetryContext } from "./retry/retry-context"; +import { TaskFailureDetails } from "./failure-details"; + +/** + * A task that uses an AsyncRetryHandler for imperative retry control. + * + * @remarks + * Unlike RetryableTask which uses a declarative RetryPolicy, this task delegates + * all retry decisions to a user-provided handler function. The handler receives + * a RetryContext with failure details, attempt count, and elapsed time, and + * returns true to retry or false to stop. + * + * This mirrors the .NET SDK's InvokeWithCustomRetryHandler pattern, where the + * retry handler runs as orchestrator code (subject to replay). + */ +export class RetryHandlerTask extends RetryTaskBase { + private readonly _handler: AsyncRetryHandler; + private readonly _orchestrationContext: OrchestrationContext; + + /** + * Creates a new RetryHandlerTask instance. + * + * @param handler - The async retry handler for imperative retry decisions + * @param orchestrationContext - The orchestration context for the current execution + * @param action - The orchestrator action associated with this task + * @param startTime - The time when the task was first scheduled + * @param taskType - The type of task (activity or sub-orchestration) + */ + constructor( + handler: AsyncRetryHandler, + orchestrationContext: OrchestrationContext, + action: pb.OrchestratorAction, + startTime: Date, + taskType: RetryTaskType, + ) { + super(action, startTime, taskType); + if (!handler) { + throw new Error("RetryHandlerTask requires a non-null handler"); + } + this._handler = handler; + this._orchestrationContext = orchestrationContext; + } + + /** + * Gets the async retry handler for this task. + */ + get handler(): AsyncRetryHandler { + return this._handler; + } + + /** + * Invokes the async retry handler to determine whether to retry. + * + * @param currentTime - The current orchestration time (for deterministic replay) + * @returns A Promise that resolves to `true` to retry immediately, + * `false` to stop retrying, or a positive number indicating the + * delay in milliseconds before the next retry attempt + */ + async shouldRetry(currentTime: Date): Promise { + if (!this.lastFailure) { + return false; + } + + // Check for non-retriable failures (e.g., activity not found) + if (this.lastFailure.getIsnonretriable()) { + return false; + } + + const failureDetails: TaskFailureDetails = { + errorType: this.lastFailure.getErrortype() || "Error", + message: this.lastFailure.getErrormessage() || "", + stackTrace: this.lastFailure.getStacktrace()?.getValue(), + }; + + const totalRetryTimeMs = currentTime.getTime() - this.startTime.getTime(); + + const retryContext = createRetryContext( + this._orchestrationContext, + this.attemptCount, + failureDetails, + totalRetryTimeMs, + ); + + return this._handler(retryContext); + } +} diff --git a/packages/durabletask-js/src/task/retry-task-base.ts b/packages/durabletask-js/src/task/retry-task-base.ts new file mode 100644 index 0000000..7e82664 --- /dev/null +++ b/packages/durabletask-js/src/task/retry-task-base.ts @@ -0,0 +1,158 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import * as pb from "../proto/orchestrator_service_pb"; +import { TaskFailedError } from "./exception/task-failed-error"; +import { CompletableTask } from "./completable-task"; + +/** + * Represents the type of retryable task - activity or sub-orchestration. + */ +export type RetryTaskType = "activity" | "subOrchestration"; + +/** + * Abstract base class for tasks that support retry behavior. + * + * @remarks + * This class provides shared state and behavior for both policy-based retry + * (RetryableTask) and handler-based retry (RetryHandlerTask). It manages the + * action, attempt count, start time, task type, and failure tracking that are + * common to both retry strategies. + */ +export abstract class RetryTaskBase extends CompletableTask { + private _action: pb.OrchestratorAction; + private readonly _startTime: Date; + private readonly _taskType: RetryTaskType; + private _attemptCount: number; + private _lastFailure: pb.TaskFailureDetails | undefined; + + /** + * Creates a new RetryTaskBase instance. + * + * @param action - The orchestrator action associated with this task + * @param startTime - The time when the task was first scheduled + * @param taskType - The type of task (activity or sub-orchestration) + */ + constructor( + action: pb.OrchestratorAction, + startTime: Date, + taskType: RetryTaskType, + ) { + super(); + this._action = action; + this._startTime = startTime; + this._taskType = taskType; + this._attemptCount = 1; + } + + /** + * Gets the orchestrator action associated with this task. + */ + get action(): pb.OrchestratorAction { + return this._action; + } + + /** + * Gets the current attempt count. + */ + get attemptCount(): number { + return this._attemptCount; + } + + /** + * Gets the time when the task was first scheduled. + */ + get startTime(): Date { + return this._startTime; + } + + /** + * Gets the type of task (activity or sub-orchestration). + */ + get taskType(): RetryTaskType { + return this._taskType; + } + + /** + * Gets the name of the task from the underlying action. + * For activities, this is the activity name; for sub-orchestrations, the orchestrator name. + */ + get taskName(): string { + if (this._taskType === "activity") { + return this._action.getScheduletask()?.getName() ?? "(unknown)"; + } + return this._action.getCreatesuborchestration()?.getName() ?? "(unknown)"; + } + + /** + * Gets the last failure details. + */ + get lastFailure(): pb.TaskFailureDetails | undefined { + return this._lastFailure; + } + + /** + * Increments the attempt count. + */ + incrementAttemptCount(): void { + this._attemptCount++; + } + + /** + * Updates the action associated with this task. + * This is called when the task is rescheduled for retry with a new sequence ID. + * + * @param action - The new orchestrator action + */ + updateAction(action: pb.OrchestratorAction): void { + this._action = action; + } + + /** + * Records a failure for potential retry. + * + * @param message - The failure message + * @param details - The failure details from the protobuf + */ + recordFailure(message: string, details?: pb.TaskFailureDetails): void { + details = details ?? new pb.TaskFailureDetails(); + this._lastFailure = details; + // Defer _exception creation to fail() - only create it when retry is exhausted + } + + /** + * Completes the task with the given result. + * Clears any previously recorded failure since the retry succeeded. + * + * @param result - The result of the task + */ + override complete(result: T): void { + // Clear any previously recorded failure since the retry succeeded + this._exception = undefined; + this._lastFailure = undefined; + + // Call the parent implementation + super.complete(result); + } + + /** + * Marks the task as failed after all retry attempts are exhausted. + * + * @param message - The failure message + * @param details - Optional failure details + */ + override fail(message: string, details?: pb.TaskFailureDetails): void { + if (this._isComplete) { + throw new Error("Task is already completed"); + } + + details = details ?? new pb.TaskFailureDetails(); + + this._exception = new TaskFailedError(message, details); + this._isComplete = true; + + if (this._parent) { + this._parent.onChildCompleted(this); + } + } +} diff --git a/packages/durabletask-js/src/task/retry-timer-task.ts b/packages/durabletask-js/src/task/retry-timer-task.ts index b010afd..6ee959b 100644 --- a/packages/durabletask-js/src/task/retry-timer-task.ts +++ b/packages/durabletask-js/src/task/retry-timer-task.ts @@ -2,7 +2,7 @@ // Licensed under the MIT License. import { CompletableTask } from "./completable-task"; -import { RetryableTask } from "./retryable-task"; +import { RetryTaskBase } from "./retry-task-base"; /** * A timer task that is associated with a retryable task for retry purposes. @@ -10,14 +10,14 @@ import { RetryableTask } from "./retryable-task"; * When this timer fires, the retryable task should be rescheduled for another attempt. */ export class RetryTimerTask extends CompletableTask { - private readonly _retryableParent: RetryableTask; + private readonly _retryableParent: RetryTaskBase; /** * Creates a new RetryTimerTask. * * @param retryableParent - The retryable task that this timer is associated with */ - constructor(retryableParent: RetryableTask) { + constructor(retryableParent: RetryTaskBase) { super(); this._retryableParent = retryableParent; } @@ -25,7 +25,7 @@ export class RetryTimerTask extends CompletableTask { /** * Gets the retryable task that this timer is associated with. */ - get retryableParent(): RetryableTask { + get retryableParent(): RetryTaskBase { return this._retryableParent; } } diff --git a/packages/durabletask-js/src/task/retry/index.ts b/packages/durabletask-js/src/task/retry/index.ts index fc4250a..20398bd 100644 --- a/packages/durabletask-js/src/task/retry/index.ts +++ b/packages/durabletask-js/src/task/retry/index.ts @@ -1,4 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -export { RetryPolicy, RetryPolicyOptions } from "./retry-policy"; +export { RetryPolicy, RetryPolicyOptions, FailureHandlerPredicate } from "./retry-policy"; +export { RetryContext, createRetryContext } from "./retry-context"; +export { RetryHandler, AsyncRetryHandler, RetryHandlerResult, toAsyncRetryHandler } from "./retry-handler"; diff --git a/packages/durabletask-js/src/task/retry/retry-context.ts b/packages/durabletask-js/src/task/retry/retry-context.ts new file mode 100644 index 0000000..c33217d --- /dev/null +++ b/packages/durabletask-js/src/task/retry/retry-context.ts @@ -0,0 +1,94 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { OrchestrationContext } from "../context/orchestration-context"; +import { TaskFailureDetails } from "../failure-details"; + +/** + * Retry context data that's provided to task retry handler implementations. + * + * @remarks + * This context is passed to custom retry handlers to provide information about + * the current retry state and allow making informed decisions about whether + * to continue retrying. + * + * Retry handler code is an extension of the orchestrator code and must therefore + * comply with all the determinism requirements of orchestrator code. The + * {@link orchestrationContext} property can be used to check + * {@link OrchestrationContext.isReplaying} to guard side-effects such as logging. + * + * @example + * ```typescript + * const retryHandler: RetryHandler = (context: RetryContext) => { + * // Guard side-effects with isReplaying + * if (!context.orchestrationContext.isReplaying) { + * console.log(`Retry attempt ${context.lastAttemptNumber}`); + * } + * // Don't retry after 5 attempts + * if (context.lastAttemptNumber >= 5) { + * return false; + * } + * // Don't retry if we've been retrying for more than 5 minutes + * if (context.totalRetryTimeInMilliseconds > 300000) { + * return false; + * } + * // Don't retry certain error types + * if (context.lastFailure.errorType === "ValidationError") { + * return false; + * } + * return true; + * }; + * ``` + */ +export interface RetryContext { + /** + * The orchestration context for the currently executing orchestration. + * + * @remarks + * This is primarily useful for checking {@link OrchestrationContext.isReplaying} + * to guard side-effects like logging or counter increments inside retry handlers, + * since retry handler code runs as part of the orchestrator and is subject to replay. + */ + readonly orchestrationContext: OrchestrationContext; + + /** + * The previous retry attempt number. + * This is 1 after the first failure, 2 after the second, etc. + */ + readonly lastAttemptNumber: number; + + /** + * The details of the previous task failure. + * Contains the error type, message, and stack trace. + */ + readonly lastFailure: TaskFailureDetails; + + /** + * The total amount of time spent in the retry loop for the current task, in milliseconds. + * This includes the time spent executing the task and waiting between retries. + */ + readonly totalRetryTimeInMilliseconds: number; +} + +/** + * Creates a new RetryContext object. + * + * @param orchestrationContext - The orchestration context for the current execution + * @param lastAttemptNumber - The previous retry attempt number + * @param lastFailure - The details of the previous task failure + * @param totalRetryTimeInMilliseconds - The total time spent retrying + * @returns A RetryContext object + */ +export function createRetryContext( + orchestrationContext: OrchestrationContext, + lastAttemptNumber: number, + lastFailure: TaskFailureDetails, + totalRetryTimeInMilliseconds: number, +): RetryContext { + return { + orchestrationContext, + lastAttemptNumber, + lastFailure, + totalRetryTimeInMilliseconds, + }; +} diff --git a/packages/durabletask-js/src/task/retry/retry-handler.ts b/packages/durabletask-js/src/task/retry/retry-handler.ts new file mode 100644 index 0000000..8b81643 --- /dev/null +++ b/packages/durabletask-js/src/task/retry/retry-handler.ts @@ -0,0 +1,104 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { RetryContext } from "./retry-context"; + +/** + * Delegate for manually handling task retries (synchronous version). + * + * @remarks + * Retry handler code is an extension of the orchestrator code and must therefore + * comply with all the determinism requirements of orchestrator code. This means: + * - No I/O operations (file, network, database) + * - No random number generation + * - No accessing current date/time directly + * - No accessing environment variables + * + * @param retryContext - Retry context that's updated between each retry attempt + * @returns Returns `true` to retry immediately, `false` to stop retrying, + * or a positive number to retry after that many milliseconds + * + * @example + * ```typescript + * const handler: RetryHandler = (context) => { + * // Retry up to 5 times + * if (context.lastAttemptNumber >= 5) { + * return false; + * } + * // Don't retry validation errors + * if (context.lastFailure.errorType === "ValidationError") { + * return false; + * } + * // Exponential backoff: 1s, 2s, 4s, 8s, ... + * return 1000 * Math.pow(2, context.lastAttemptNumber - 1); + * }; + * + * await ctx.callActivity("myActivity", input, { retry: handler }); + * ``` + */ +export type RetryHandler = (retryContext: RetryContext) => boolean | number; + +/** + * Delegate for manually handling task retries (asynchronous version). + * + * @remarks + * Retry handler code is an extension of the orchestrator code and must therefore + * comply with all the determinism requirements of orchestrator code. This means: + * - No I/O operations (file, network, database) + * - No random number generation + * - No accessing current date/time directly + * - No accessing environment variables + * + * While this handler is async, the async operations should only be used for + * deterministic orchestration operations (like waiting for events or timers), + * not for non-deterministic I/O. + * + * @param retryContext - Retry context that's updated between each retry attempt + * @returns Returns a Promise that resolves to `true` to retry immediately, + * `false` to stop retrying, or a positive number to retry after that many milliseconds + * + * @example + * ```typescript + * const asyncHandler: AsyncRetryHandler = async (context) => { + * // Retry up to 5 times + * if (context.lastAttemptNumber >= 5) { + * return false; + * } + * // Exponential backoff: 1s, 2s, 4s, 8s, ... + * return 1000 * Math.pow(2, context.lastAttemptNumber - 1); + * }; + * + * await ctx.callActivity("myActivity", input, { retry: asyncHandler }); + * ``` + */ +export type AsyncRetryHandler = (retryContext: RetryContext) => Promise; + +/** + * The result type returned by retry handlers. + * + * - `true` — retry immediately + * - `false` — stop retrying (the task fails) + * - a positive `number` — retry after that many milliseconds + */ +export type RetryHandlerResult = boolean | number; + +/** + * Normalizes a retry handler to an {@link AsyncRetryHandler}. + * + * If the handler is already an {@link AsyncRetryHandler}, wrapping it with + * `Promise.resolve` is a no-op since `Promise.resolve(promise)` returns the + * same promise. If it is a synchronous {@link RetryHandler}, the result + * is lifted into a resolved `Promise`. + * + * @param handler - A synchronous or asynchronous retry handler + * @returns An AsyncRetryHandler + * + * @example + * ```typescript + * const syncHandler: RetryHandler = (context) => context.lastAttemptNumber < 3; + * const asyncHandler = toAsyncRetryHandler(syncHandler); + * ``` + */ +export function toAsyncRetryHandler(handler: RetryHandler | AsyncRetryHandler): AsyncRetryHandler { + return (context: RetryContext) => Promise.resolve(handler(context)); +} diff --git a/packages/durabletask-js/src/task/retry/retry-policy.ts b/packages/durabletask-js/src/task/retry/retry-policy.ts index c057b66..8dfb45c 100644 --- a/packages/durabletask-js/src/task/retry/retry-policy.ts +++ b/packages/durabletask-js/src/task/retry/retry-policy.ts @@ -1,6 +1,14 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +import { TaskFailureDetails } from "../failure-details"; + +/** + * Type for a predicate function that determines whether a failure should be retried. + * Receives the failure details and returns true to retry, false to stop. + */ +export type FailureHandlerPredicate = (failure: TaskFailureDetails) => boolean; + /** * A declarative retry policy that can be configured for activity or sub-orchestration calls. * @@ -18,6 +26,19 @@ * retryTimeoutInMilliseconds: 300000 * }); * ``` + * + * @example + * ```typescript + * // With handleFailure predicate to filter which errors to retry + * const retryPolicy = new RetryPolicy({ + * maxNumberOfAttempts: 3, + * firstRetryIntervalInMilliseconds: 1000, + * handleFailure: (failure) => { + * // Only retry transient errors, not validation errors + * return failure.errorType !== "ValidationError"; + * } + * }); + * ``` */ export class RetryPolicy { private readonly _maxNumberOfAttempts: number; @@ -25,6 +46,7 @@ export class RetryPolicy { private readonly _backoffCoefficient: number; private readonly _maxRetryIntervalInMilliseconds: number; private readonly _retryTimeoutInMilliseconds: number; + private readonly _handleFailure: FailureHandlerPredicate; /** * Creates a new RetryPolicy instance. @@ -39,6 +61,7 @@ export class RetryPolicy { backoffCoefficient = 1.0, maxRetryIntervalInMilliseconds, retryTimeoutInMilliseconds, + handleFailure, } = options; // Validation aligned with .NET SDK @@ -77,6 +100,8 @@ export class RetryPolicy { this._maxRetryIntervalInMilliseconds = maxRetryIntervalInMilliseconds ?? 3600000; // Default to -1 (infinite) if not specified this._retryTimeoutInMilliseconds = retryTimeoutInMilliseconds ?? -1; + // Default to always retry (return true for all failures) + this._handleFailure = handleFailure ?? (() => true); } /** @@ -117,6 +142,31 @@ export class RetryPolicy { get retryTimeoutInMilliseconds(): number { return this._retryTimeoutInMilliseconds; } + + /** + * Gets the predicate function that determines whether a specific failure should be retried. + * + * @remarks + * This predicate is called for each failure to determine if a retry should be attempted. + * Even if this predicate allows a retry, time-based and attempt-count constraints may still + * prevent another attempt from being scheduled. + * Defaults to a function that always returns true (all failures are retried). + * + * @returns A function that takes TaskFailureDetails and returns true to retry, false to stop. + */ + get handleFailure(): FailureHandlerPredicate { + return this._handleFailure; + } + + /** + * Evaluates whether a failure should be retried based on the handleFailure predicate. + * + * @param failure - The failure details to evaluate + * @returns true if the failure should be retried, false otherwise + */ + shouldRetry(failure: TaskFailureDetails): boolean { + return this._handleFailure(failure); + } } /** @@ -156,4 +206,33 @@ export interface RetryPolicyOptions { * @default -1 (infinite) */ retryTimeoutInMilliseconds?: number; + + /** + * Optional predicate to determine if a specific failure should be retried. + * + * @remarks + * This predicate receives TaskFailureDetails and should return true to retry + * or false to stop retrying. The predicate is evaluated first to enable fail-fast + * behavior for non-retriable errors, but any retry that it allows is still subject + * to the overall time and attempt count constraints of this policy. + * + * @default A function that always returns true (all failures are retried) + * + * @example + * ```typescript + * const policy = new RetryPolicy({ + * maxNumberOfAttempts: 5, + * firstRetryIntervalInMilliseconds: 1000, + * handleFailure: (failure) => { + * // Don't retry validation errors + * if (failure.errorType === "ValidationError") { + * return false; + * } + * // Retry all other errors + * return true; + * } + * }); + * ``` + */ + handleFailure?: FailureHandlerPredicate; } diff --git a/packages/durabletask-js/src/task/retryable-task.ts b/packages/durabletask-js/src/task/retryable-task.ts index 1c073a6..d1d4855 100644 --- a/packages/durabletask-js/src/task/retryable-task.ts +++ b/packages/durabletask-js/src/task/retryable-task.ts @@ -1,31 +1,20 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -import * as pb from "../proto/orchestrator_service_pb"; -import { TaskFailedError } from "./exception/task-failed-error"; -import { CompletableTask } from "./completable-task"; +import { RetryTaskBase, RetryTaskType } from "./retry-task-base"; import { RetryPolicy } from "./retry/retry-policy"; +import * as pb from "../proto/orchestrator_service_pb"; /** - * Represents the type of retryable task - activity or sub-orchestration. - */ -export type RetryableTaskType = "activity" | "subOrchestration"; - -/** - * A task that can be retried according to a retry policy. + * A task that can be retried according to a declarative retry policy. * * @remarks - * This class extends CompletableTask and adds retry tracking and delay computation. - * It tracks the number of attempts and computes the next retry delay based on - * exponential backoff with the configured retry policy. + * This class extends RetryTaskBase and adds policy-driven delay computation. + * It tracks attempts and computes the next retry delay based on exponential + * backoff with the configured retry policy. */ -export class RetryableTask extends CompletableTask { +export class RetryableTask extends RetryTaskBase { private readonly _retryPolicy: RetryPolicy; - private readonly _action: pb.OrchestratorAction; - private readonly _startTime: Date; - private readonly _taskType: RetryableTaskType; - private _attemptCount: number; - private _lastFailure: pb.TaskFailureDetails | undefined; /** * Creates a new RetryableTask instance. @@ -39,14 +28,10 @@ export class RetryableTask extends CompletableTask { retryPolicy: RetryPolicy, action: pb.OrchestratorAction, startTime: Date, - taskType: RetryableTaskType, + taskType: RetryTaskType, ) { - super(); + super(action, startTime, taskType); this._retryPolicy = retryPolicy; - this._action = action; - this._startTime = startTime; - this._taskType = taskType; - this._attemptCount = 1; } /** @@ -56,98 +41,6 @@ export class RetryableTask extends CompletableTask { return this._retryPolicy; } - /** - * Gets the orchestrator action associated with this task. - */ - get action(): pb.OrchestratorAction { - return this._action; - } - - /** - * Gets the current attempt count. - */ - get attemptCount(): number { - return this._attemptCount; - } - - /** - * Gets the time when the task was first scheduled. - */ - get startTime(): Date { - return this._startTime; - } - - /** - * Gets the type of task (activity or sub-orchestration). - */ - get taskType(): RetryableTaskType { - return this._taskType; - } - - /** - * Gets the last failure details. - */ - get lastFailure(): pb.TaskFailureDetails | undefined { - return this._lastFailure; - } - - /** - * Increments the attempt count. - */ - incrementAttemptCount(): void { - this._attemptCount++; - } - - /** - * Records a failure for potential retry. - * - * @param message - The failure message - * @param details - The failure details from the protobuf - */ - recordFailure(message: string, details?: pb.TaskFailureDetails): void { - details = details ?? new pb.TaskFailureDetails(); - this._lastFailure = details; - - // Store the exception for later if we exhaust retries - this._exception = new TaskFailedError(message, details); - } - - /** - * Completes the task with the given result. - * Clears any previously recorded failure since the retry succeeded. - * - * @param result - The result of the task - */ - override complete(result: T): void { - // Clear any previously recorded failure since the retry succeeded - this._exception = undefined; - this._lastFailure = undefined; - - // Call the parent implementation - super.complete(result); - } - - /** - * Marks the task as failed after exhausting all retry attempts. - * - * @param message - The failure message - * @param details - Optional failure details - */ - override fail(message: string, details?: pb.TaskFailureDetails): void { - if (this._isComplete) { - throw new Error("Task is already completed"); - } - - details = details ?? new pb.TaskFailureDetails(); - - this._exception = new TaskFailedError(message, details); - this._isComplete = true; - - if (this._parent) { - this._parent.onChildCompleted(this); - } - } - /** * Computes the next retry delay in milliseconds. * @@ -156,6 +49,7 @@ export class RetryableTask extends CompletableTask { * * @remarks * Returns undefined if: + * - The handleFailure predicate returns false for the last failure * - The maximum number of attempts has been reached * - The retry timeout has been exceeded * @@ -165,14 +59,32 @@ export class RetryableTask extends CompletableTask { * The delay is capped at maxRetryInterval. */ computeNextDelayInMilliseconds(currentTime: Date): number | undefined { + // Check for non-retriable failures first (e.g., activity not found, version mismatch) + if (this.lastFailure?.getIsnonretriable()) { + return undefined; + } + + // Check if handleFailure predicate says we should NOT retry this failure type + if (this.lastFailure) { + const failureDetails = { + errorType: this.lastFailure.getErrortype() || "Error", + message: this.lastFailure.getErrormessage() || "", + stackTrace: this.lastFailure.getStacktrace()?.getValue(), + }; + + if (!this._retryPolicy.shouldRetry(failureDetails)) { + return undefined; + } + } + // Check if we've exhausted max attempts - if (this._attemptCount >= this._retryPolicy.maxNumberOfAttempts) { + if (this.attemptCount >= this._retryPolicy.maxNumberOfAttempts) { return undefined; } // Check if we've exceeded the retry timeout if (this._retryPolicy.retryTimeoutInMilliseconds !== -1) { - const elapsedTime = currentTime.getTime() - this._startTime.getTime(); + const elapsedTime = currentTime.getTime() - this.startTime.getTime(); if (elapsedTime >= this._retryPolicy.retryTimeoutInMilliseconds) { return undefined; } @@ -181,7 +93,7 @@ export class RetryableTask extends CompletableTask { // Calculate the next delay using exponential backoff // delay = firstRetryInterval * (backoffCoefficient ^ (attemptCount - 1)) const backoffCoefficient = this._retryPolicy.backoffCoefficient; - const exponent = this._attemptCount - 1; + const exponent = this.attemptCount - 1; let nextDelay = this._retryPolicy.firstRetryIntervalInMilliseconds * Math.pow(backoffCoefficient, exponent); @@ -192,7 +104,7 @@ export class RetryableTask extends CompletableTask { // Check if the computed delay would exceed the retry timeout if (this._retryPolicy.retryTimeoutInMilliseconds !== -1) { - const elapsedTime = currentTime.getTime() - this._startTime.getTime(); + const elapsedTime = currentTime.getTime() - this.startTime.getTime(); const remainingTime = this._retryPolicy.retryTimeoutInMilliseconds - elapsedTime; if (nextDelay > remainingTime) { // No more retries - timeout would be exceeded diff --git a/packages/durabletask-js/src/utils/pb-helper.util.ts b/packages/durabletask-js/src/utils/pb-helper.util.ts index 1b9303f..1cddc5d 100644 --- a/packages/durabletask-js/src/utils/pb-helper.util.ts +++ b/packages/durabletask-js/src/utils/pb-helper.util.ts @@ -115,11 +115,15 @@ export function newTaskCompletedEvent(eventId: number, encodedOutput?: string): return event; } -export function newTaskFailedEvent(eventId: number, ex: Error): pb.HistoryEvent { +export function newTaskFailedEvent( + eventId: number, + ex: Error, + failureDetails?: pb.TaskFailureDetails, +): pb.HistoryEvent { const ts = new Timestamp(); const taskFailedEvent = new pb.TaskFailedEvent(); - taskFailedEvent.setFailuredetails(newFailureDetails(ex)); + taskFailedEvent.setFailuredetails(failureDetails ?? newFailureDetails(ex)); taskFailedEvent.setTaskscheduledid(eventId); const event = new pb.HistoryEvent(); @@ -183,8 +187,9 @@ export function newSubOrchestrationFailedEvent(eventId: number, ex: Error): pb.H export function newFailureDetails(e: unknown): pb.TaskFailureDetails { const failure = new pb.TaskFailureDetails(); - - const errorType = e instanceof Error ? e.constructor.name : "UnknownError"; + // Use e.name (which can be customized) over constructor.name (which is always the class name) + // This allows users to set error.name = "CustomError" and have it preserved in failure details + const errorType = e instanceof Error ? e.name : "UnknownError"; const errorMessage = e instanceof Error ? e.message : String(e); const stack = e instanceof Error ? e.stack : undefined; diff --git a/packages/durabletask-js/src/worker/orchestration-executor.ts b/packages/durabletask-js/src/worker/orchestration-executor.ts index 7c2823a..35340ac 100644 --- a/packages/durabletask-js/src/worker/orchestration-executor.ts +++ b/packages/durabletask-js/src/worker/orchestration-executor.ts @@ -13,7 +13,9 @@ import { Logger, ConsoleLogger } from "../types/logger.type"; import { getName } from "../task"; import * as WorkerLogs from "./logs"; import { OrchestrationStateError } from "../task/exception/orchestration-state-error"; +import { CompletableTask } from "../task/completable-task"; import { RetryableTask } from "../task/retryable-task"; +import { RetryHandlerTask } from "../task/retry-handler-task"; import { RetryTimerTask } from "../task/retry-timer-task"; import { TOrchestrator } from "../types/orchestrator.type"; import { enumValueToKey } from "../utils/enum.util"; @@ -225,10 +227,10 @@ export class OrchestrationExecutor { // Check if this is a retry timer if (timerTask instanceof RetryTimerTask) { - // Get the retryable parent task and reschedule it - const retryableTask = timerTask.retryableParent; + // Get the parent retry task and reschedule it + const retryTask = timerTask.retryableParent; // Reschedule the original action - this will add it back to pendingActions - ctx.rescheduleRetryableTask(retryableTask); + ctx.rescheduleRetryTask(retryTask); // Don't resume the orchestrator - we're just rescheduling the task return; } @@ -319,19 +321,10 @@ export class OrchestrationExecutor { return; } - // Check if this is a retryable task and if we should retry - if (activityTask instanceof RetryableTask) { - activityTask.recordFailure(errorMessage, failureDetails); - const nextDelayMs = activityTask.computeNextDelayInMilliseconds(ctx._currentUtcDatetime); - - if (nextDelayMs !== undefined) { - // Schedule a retry timer - activityTask.incrementAttemptCount(); - ctx.createRetryTimer(activityTask, nextDelayMs); - // Remove from pendingTasks - the task will be re-added with a new ID when rescheduled - delete ctx._pendingTasks[taskId]; - return; - } + // Check if this task supports retry and handle it + const retried = await this.tryHandleRetry(activityTask, errorMessage, failureDetails, taskId, ctx); + if (retried) { + return; } // No retry - fail the task @@ -427,19 +420,10 @@ export class OrchestrationExecutor { return; } - // Check if this is a retryable task and if we should retry - if (subOrchTask instanceof RetryableTask) { - subOrchTask.recordFailure(errorMessage, failureDetails); - const nextDelayMs = subOrchTask.computeNextDelayInMilliseconds(ctx._currentUtcDatetime); - - if (nextDelayMs !== undefined) { - // Schedule a retry timer - subOrchTask.incrementAttemptCount(); - ctx.createRetryTimer(subOrchTask, nextDelayMs); - // Remove from pendingTasks - the task will be re-added with a new ID when rescheduled - delete ctx._pendingTasks[taskId]; - return; - } + // Check if this task supports retry and handle it + const retried = await this.tryHandleRetry(subOrchTask, errorMessage, failureDetails, taskId, ctx); + if (retried) { + return; } // No retry - fail the task @@ -564,4 +548,63 @@ export class OrchestrationExecutor { throw e; } } + + /** + * Checks if a failed task supports retry and handles the retry if applicable. + * Supports both RetryableTask (policy-based with timer delay) and RetryHandlerTask + * (handler-based: returns true for immediate reschedule, or a number for delayed retry via timer). + * + * @param task - The task that failed + * @param errorMessage - The failure error message + * @param failureDetails - The protobuf failure details + * @param taskId - The task's sequence ID in pendingTasks + * @param ctx - The orchestration context + * @returns true if the task was retried, false otherwise + */ + private async tryHandleRetry( + task: CompletableTask, + errorMessage: string, + failureDetails: pb.TaskFailureDetails | undefined, + taskId: number, + ctx: RuntimeOrchestrationContext, + ): Promise { + if (task instanceof RetryableTask) { + task.recordFailure(errorMessage, failureDetails); + const nextDelayMs = task.computeNextDelayInMilliseconds(ctx._currentUtcDatetime); + + if (nextDelayMs !== undefined) { + WorkerLogs.retryingTask(this._logger, ctx._instanceId, task.taskName, task.attemptCount); + task.incrementAttemptCount(); + ctx.createRetryTimer(task, nextDelayMs); + delete ctx._pendingTasks[taskId]; + return true; + } + } else if (task instanceof RetryHandlerTask) { + task.recordFailure(errorMessage, failureDetails); + const retryResult = await task.shouldRetry(ctx._currentUtcDatetime); + + if (retryResult !== false) { + WorkerLogs.retryingTask(this._logger, ctx._instanceId, task.taskName, task.attemptCount); + task.incrementAttemptCount(); + + if (typeof retryResult === "number") { + if (retryResult <= 0) { + // Handler returned true — retry immediately + ctx.rescheduleRetryTask(task); + } else { + // Handler returned a delay in milliseconds — use a timer + ctx.createRetryTimer(task, retryResult); + } + } else { + // Handler returned true — retry immediately + ctx.rescheduleRetryTask(task); + } + + delete ctx._pendingTasks[taskId]; + return true; + } + } + + return false; + } } diff --git a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts index 595feef..73ec5b2 100644 --- a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts +++ b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts @@ -8,9 +8,12 @@ import { ParentOrchestrationInstance } from "../types/parent-orchestration-insta import * as pb from "../proto/orchestrator_service_pb"; import * as ph from "../utils/pb-helper.util"; import { CompletableTask } from "../task/completable-task"; +import { RetryTaskBase, RetryTaskType } from "../task/retry-task-base"; import { RetryableTask } from "../task/retryable-task"; +import { RetryHandlerTask } from "../task/retry-handler-task"; import { RetryTimerTask } from "../task/retry-timer-task"; -import { TaskOptions, SubOrchestrationOptions } from "../task/options"; +import { TaskOptions, SubOrchestrationOptions, isRetryPolicy, isRetryHandler } from "../task/options"; +import { toAsyncRetryHandler } from "../task/retry/retry-handler"; import { TActivity } from "../types/activity.type"; import { TOrchestrator } from "../types/orchestrator.type"; import { Task } from "../task/task"; @@ -294,20 +297,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { const action = ph.newScheduleTaskAction(id, name, encodedInput, options?.tags, options?.version); this._pendingActions[action.getId()] = action; - // If a retry policy is provided, create a RetryableTask - if (options?.retry) { - const retryableTask = new RetryableTask( - options.retry, - action, - this._currentUtcDatetime, - "activity", - ); - this._pendingTasks[id] = retryableTask; - return retryableTask; - } - - const task = new CompletableTask(); - this._pendingTasks[id] = task; + const task = this.createRetryTaskOrDefault(action, id, options, "activity"); return task; } @@ -338,20 +328,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput, options?.tags, options?.version); this._pendingActions[action.getId()] = action; - // If a retry policy is provided, create a RetryableTask - if (options?.retry) { - const retryableTask = new RetryableTask( - options.retry, - action, - this._currentUtcDatetime, - "subOrchestration", - ); - this._pendingTasks[id] = retryableTask; - return retryableTask; - } - - const task = new CompletableTask(); - this._pendingTasks[id] = task; + const task = this.createRetryTaskOrDefault(action, id, options, "subOrchestration"); return task; } @@ -516,7 +493,11 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { * @param delayMs - The delay in milliseconds before the timer fires * @returns The timer task */ - createRetryTimer(retryableTask: RetryableTask, delayMs: number): RetryTimerTask { + createRetryTimer(retryableTask: RetryTaskBase, delayMs: number): RetryTimerTask { + if (delayMs <= 0) { + throw new Error(`delayMs must be a positive number, but received ${delayMs}`); + } + const timerId = this.nextSequenceNumber(); const fireAt = new Date(this._currentUtcDatetime.getTime() + delayMs); const timerAction = ph.newCreateTimerAction(timerId, fireAt); @@ -530,44 +511,95 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { } /** - * Reschedules a retryable task for retry by creating a new action with a new ID. - * This is called when a retry timer fires and the task needs to be retried. + * Creates the appropriate retry task or a plain CompletableTask based on the options. * - * @param retryableTask - The retryable task to reschedule + * @param action - The orchestrator action + * @param id - The sequence ID for task tracking + * @param options - The task options (may contain retry configuration) + * @param taskType - Whether this is an activity or sub-orchestration + * @returns The created task */ - rescheduleRetryableTask(retryableTask: RetryableTask): void { - const originalAction = retryableTask.action; + private createRetryTaskOrDefault( + action: pb.OrchestratorAction, + id: number, + options: TaskOptions | SubOrchestrationOptions | undefined, + taskType: RetryTaskType + ): CompletableTask { + if (options?.retry && isRetryPolicy(options.retry)) { + const retryableTask = new RetryableTask( + options.retry, + action, + this._currentUtcDatetime, + taskType, + ); + this._pendingTasks[id] = retryableTask; + return retryableTask; + } + + if (options?.retry && isRetryHandler(options.retry)) { + // Normalize to AsyncRetryHandler — wraps sync handlers via Promise.resolve, + // and is a no-op for handlers that already return a Promise. + const handler = toAsyncRetryHandler(options.retry); + const retryHandlerTask = new RetryHandlerTask( + handler, + this, + action, + this._currentUtcDatetime, + taskType, + ); + this._pendingTasks[id] = retryHandlerTask; + return retryHandlerTask; + } + + const task = new CompletableTask(); + this._pendingTasks[id] = task; + return task; + } + + /** + * Reschedules a retry task for retry by creating a new action with a new ID. + * This is called when a retry timer fires or a retry handler returns true. + * + * @param retryTask - The retry task to reschedule (RetryableTask or RetryHandlerTask) + */ + rescheduleRetryTask(retryTask: RetryTaskBase): void { + const originalAction = retryTask.action; const newId = this.nextSequenceNumber(); let newAction: pb.OrchestratorAction; - if (retryableTask.taskType === "activity") { + if (retryTask.taskType === "activity") { // Reschedule an activity task const scheduleTask = originalAction.getScheduletask(); if (!scheduleTask) { - throw new Error("Expected ScheduleTaskAction on activity retryable task"); + throw new Error("Expected ScheduleTaskAction on activity retry task"); } const name = scheduleTask.getName(); const input = scheduleTask.getInput()?.getValue(); const tags = mapToRecord(scheduleTask.getTagsMap()); - newAction = ph.newScheduleTaskAction(newId, name, input, tags); + const version = scheduleTask.getVersion()?.getValue(); + newAction = ph.newScheduleTaskAction(newId, name, input, tags, version); } else { // Reschedule a sub-orchestration task const subOrch = originalAction.getCreatesuborchestration(); if (!subOrch) { - throw new Error("Expected CreateSubOrchestrationAction on sub-orchestration retryable task"); + throw new Error("Expected CreateSubOrchestrationAction on sub-orchestration retry task"); } const name = subOrch.getName(); const instanceId = subOrch.getInstanceid(); const input = subOrch.getInput()?.getValue(); const tags = mapToRecord(subOrch.getTagsMap()); - newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input, tags); + const version = subOrch.getVersion()?.getValue(); + newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input, tags, version); } // Register the new action this._pendingActions[newAction.getId()] = newAction; - // Map the retryable task to the new action ID - this._pendingTasks[newId] = retryableTask; + // Update the task's action reference so it stays in sync with the new ID + retryTask.updateAction(newAction); + + // Map the retry task to the new action ID + this._pendingTasks[newId] = retryTask; } } diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index dd22084..dda9478 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -840,181 +840,290 @@ describe("Orchestration Executor", () => { expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); }); + // ==================== Retry Tests (shared helpers) ==================== + + /** + * Helper that registers an orchestrator, starts it, and returns the accumulated + * event log together with a convenience function for replaying subsequent steps. + */ + async function startOrchestration( + orchestrator: TOrchestrator, + input?: unknown, + startTime?: Date, + ) { + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const allEvents = [ + newOrchestratorStartedEvent(startTime), + newExecutionStartedEvent(name, TEST_INSTANCE_ID, input !== undefined ? JSON.stringify(input) : undefined), + ]; + + const result = await new OrchestrationExecutor(registry, testLogger) + .execute(TEST_INSTANCE_ID, [], allEvents); + + /** Append old-events, create a fresh executor, and replay with newEvents. */ + const replay = async (...oldAndNew: [old: pb.HistoryEvent[], newEvts: pb.HistoryEvent[]]) => { + const [old, newEvts] = oldAndNew; + old.forEach((e) => allEvents.push(e)); + return new OrchestrationExecutor(registry, testLogger) + .execute(TEST_INSTANCE_ID, allEvents, newEvts); + }; + + return { registry, result, allEvents, replay }; + } + // ==================== Retry Policy Tests ==================== describe("Retry Policy", () => { it("should schedule retry timer when activity fails with retry policy", async () => { - // Arrange const { RetryPolicy } = await import("../src/task/retry/retry-policy"); - - const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { - const retryPolicy = new RetryPolicy({ - maxNumberOfAttempts: 3, - firstRetryIntervalInMilliseconds: 1000, - backoffCoefficient: 1.0, - }); - const result = yield ctx.callActivity("flakyActivity", input, { retry: retryPolicy }); - return result; - }; - const registry = new Registry(); - const name = registry.addOrchestrator(orchestrator); + const { result: startResult, replay } = await startOrchestration( + async function* (ctx: OrchestrationContext, input: number): any { + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, firstRetryIntervalInMilliseconds: 1000, backoffCoefficient: 1.0, + }); + return yield ctx.callActivity("flakyActivity", input, { retry: retryPolicy }); + }, + 21, + ); - // Act - Step 1: Start orchestration - let executor = new OrchestrationExecutor(registry, testLogger); - let newEvents = [ - newOrchestratorStartedEvent(), - newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), - ]; - let result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + expect(startResult.actions.length).toBe(1); + expect(startResult.actions[0].hasScheduletask()).toBe(true); + expect(startResult.actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); - // Assert - Step 1: Should schedule activity - expect(result.actions.length).toBe(1); - expect(result.actions[0].hasScheduletask()).toBe(true); - expect(result.actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); - - // Act - Step 2: Activity scheduled, then fails - const oldEvents = [ - ...newEvents, - newTaskScheduledEvent(1, "flakyActivity"), - ]; - executor = new OrchestrationExecutor(registry, testLogger); - newEvents = [ - newTaskFailedEvent(1, new Error("Transient failure on attempt 1")), - ]; - result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + // Activity fails → should schedule a retry timer + const result = await replay( + [newTaskScheduledEvent(1, "flakyActivity")], + [newTaskFailedEvent(1, new Error("Transient failure"))], + ); - // Assert - Step 2: Should schedule a retry timer expect(result.actions.length).toBe(1); expect(result.actions[0].hasCreatetimer()).toBe(true); }); it("should complete successfully after retry timer fires and activity succeeds", async () => { - // Arrange const { RetryPolicy } = await import("../src/task/retry/retry-policy"); - - const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { - const retryPolicy = new RetryPolicy({ - maxNumberOfAttempts: 3, - firstRetryIntervalInMilliseconds: 1000, - backoffCoefficient: 1.0, - }); - const result = yield ctx.callActivity("flakyActivity", input, { - retry: retryPolicy, - tags: { env: "test" }, - }); - return result; - }; - const registry = new Registry(); - const name = registry.addOrchestrator(orchestrator); - const startTime = new Date(); - - // Step 1: Start orchestration - let executor = new OrchestrationExecutor(registry, testLogger); - const allEvents = [ - newOrchestratorStartedEvent(startTime), - newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), - ]; - let result = await executor.execute(TEST_INSTANCE_ID, [], allEvents); - expect(result.actions.length).toBe(1); - expect(result.actions[0].hasScheduletask()).toBe(true); + const { result: startResult, replay } = await startOrchestration( + async function* (ctx: OrchestrationContext, input: number): any { + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, firstRetryIntervalInMilliseconds: 1000, backoffCoefficient: 1.0, + }); + return yield ctx.callActivity("flakyActivity", input, { retry: retryPolicy, tags: { env: "test" } }); + }, + 21, + new Date(), + ); - // Step 2: Activity scheduled, then fails - allEvents.push(newTaskScheduledEvent(1, "flakyActivity")); - executor = new OrchestrationExecutor(registry, testLogger); - result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ - newTaskFailedEvent(1, new Error("Transient failure on attempt 1")), - ]); - expect(result.actions.length).toBe(1); + expect(startResult.actions.length).toBe(1); + expect(startResult.actions[0].hasScheduletask()).toBe(true); + + // Activity fails → timer created + let result = await replay( + [newTaskScheduledEvent(1, "flakyActivity")], + [newTaskFailedEvent(1, new Error("Transient failure"))], + ); expect(result.actions[0].hasCreatetimer()).toBe(true); const timerFireAt = result.actions[0].getCreatetimer()?.getFireat()?.toDate(); expect(timerFireAt).toBeDefined(); - // Step 3: Timer created, then fires - allEvents.push(newTaskFailedEvent(1, new Error("Transient failure on attempt 1"))); - allEvents.push(newTimerCreatedEvent(2, timerFireAt!)); - executor = new OrchestrationExecutor(registry, testLogger); - result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ - newTimerFiredEvent(2, timerFireAt!), - ]); - // Should reschedule the activity with a new ID - expect(result.actions.length).toBe(1); + // Timer fires → activity rescheduled with new ID + preserved tags + result = await replay( + [newTaskFailedEvent(1, new Error("Transient failure")), newTimerCreatedEvent(2, timerFireAt!)], + [newTimerFiredEvent(2, timerFireAt!)], + ); expect(result.actions[0].hasScheduletask()).toBe(true); expect(result.actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); expect(result.actions[0].getScheduletask()?.getTagsMap().get("env")).toBe("test"); - expect(result.actions[0].getId()).toBe(3); // New ID after timer - - // Step 4: Retried activity scheduled, then completes - allEvents.push(newTimerFiredEvent(2, timerFireAt!)); - allEvents.push(newTaskScheduledEvent(3, "flakyActivity")); - executor = new OrchestrationExecutor(registry, testLogger); - result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ - newTaskCompletedEvent(3, JSON.stringify(42)), - ]); - - // Assert: Orchestration should complete successfully + expect(result.actions[0].getId()).toBe(3); + + // Retried activity completes + result = await replay( + [newTimerFiredEvent(2, timerFireAt!), newTaskScheduledEvent(3, "flakyActivity")], + [newTaskCompletedEvent(3, JSON.stringify(42))], + ); const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(JSON.stringify(42)); }); it("should fail after exhausting all retry attempts", async () => { - // Arrange const { RetryPolicy } = await import("../src/task/retry/retry-policy"); - - const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { - const retryPolicy = new RetryPolicy({ - maxNumberOfAttempts: 2, - firstRetryIntervalInMilliseconds: 100, - backoffCoefficient: 1.0, - }); - const result = yield ctx.callActivity("alwaysFailsActivity", input, { retry: retryPolicy }); - return result; - }; - const registry = new Registry(); - const name = registry.addOrchestrator(orchestrator); - const startTime = new Date(); - - // Step 1: Start orchestration - let executor = new OrchestrationExecutor(registry, testLogger); - const allEvents = [ - newOrchestratorStartedEvent(startTime), - newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), - ]; - let result = await executor.execute(TEST_INSTANCE_ID, [], allEvents); - expect(result.actions.length).toBe(1); - expect(result.actions[0].hasScheduletask()).toBe(true); + const { result: startResult, replay } = await startOrchestration( + async function* (ctx: OrchestrationContext, input: number): any { + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 2, firstRetryIntervalInMilliseconds: 100, backoffCoefficient: 1.0, + }); + return yield ctx.callActivity("alwaysFailsActivity", input, { retry: retryPolicy }); + }, + 21, + new Date(), + ); - // Step 2: Activity fails - first attempt - allEvents.push(newTaskScheduledEvent(1, "alwaysFailsActivity")); - executor = new OrchestrationExecutor(registry, testLogger); - result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ - newTaskFailedEvent(1, new Error("Failure on attempt 1")), - ]); - expect(result.actions.length).toBe(1); + expect(startResult.actions[0].hasScheduletask()).toBe(true); + + // First failure → timer + let result = await replay( + [newTaskScheduledEvent(1, "alwaysFailsActivity")], + [newTaskFailedEvent(1, new Error("Failure 1"))], + ); expect(result.actions[0].hasCreatetimer()).toBe(true); const timerFireAt = result.actions[0].getCreatetimer()?.getFireat()?.toDate(); - // Step 3: Timer fires, activity is rescheduled - allEvents.push(newTaskFailedEvent(1, new Error("Failure on attempt 1"))); - allEvents.push(newTimerCreatedEvent(2, timerFireAt!)); - executor = new OrchestrationExecutor(registry, testLogger); - result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ - newTimerFiredEvent(2, timerFireAt!), - ]); + // Timer fires → rescheduled + result = await replay( + [newTaskFailedEvent(1, new Error("Failure 1")), newTimerCreatedEvent(2, timerFireAt!)], + [newTimerFiredEvent(2, timerFireAt!)], + ); + expect(result.actions[0].hasScheduletask()).toBe(true); + + // Second failure → max attempts exhausted → orchestration fails + result = await replay( + [newTimerFiredEvent(2, timerFireAt!), newTaskScheduledEvent(3, "alwaysFailsActivity")], + [newTaskFailedEvent(3, new Error("Failure 2"))], + ); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + }); + }); + + // ==================== Retry Handler Tests ==================== + describe("Retry Handler (AsyncRetryHandler)", () => { + it("should reschedule activity when retry handler returns true", async () => { + const { result: startResult, replay } = await startOrchestration( + async function* (ctx: OrchestrationContext, input: number): any { + const retryHandler = async (retryCtx: any) => retryCtx.lastAttemptNumber < 3; + return yield ctx.callActivity("flakyActivity", input, { retry: retryHandler }); + }, + 21, + ); + + expect(startResult.actions[0].hasScheduletask()).toBe(true); + expect(startResult.actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); + + // Activity fails → handler returns true → rescheduled immediately (no timer) + const result = await replay( + [newTaskScheduledEvent(1, "flakyActivity")], + [newTaskFailedEvent(1, new Error("Transient failure"))], + ); expect(result.actions.length).toBe(1); expect(result.actions[0].hasScheduletask()).toBe(true); + expect(result.actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); + }); + + it("should complete successfully after retry handler reschedules and activity succeeds", async () => { + const { result: startResult, replay } = await startOrchestration( + async function* (ctx: OrchestrationContext, input: number): any { + const retryHandler = async (retryCtx: any) => retryCtx.lastAttemptNumber < 5; + return yield ctx.callActivity("flakyActivity", input, { retry: retryHandler, tags: { env: "test" } }); + }, + 21, + new Date(), + ); + + expect(startResult.actions[0].hasScheduletask()).toBe(true); + + // Activity fails → rescheduled immediately with new ID + preserved tags + let result = await replay( + [newTaskScheduledEvent(1, "flakyActivity")], + [newTaskFailedEvent(1, new Error("Transient failure"))], + ); + expect(result.actions[0].hasScheduletask()).toBe(true); + expect(result.actions[0].getScheduletask()?.getTagsMap().get("env")).toBe("test"); + expect(result.actions[0].getId()).toBe(2); + + // Retried activity completes + result = await replay( + [newTaskFailedEvent(1, new Error("Transient failure")), newTaskScheduledEvent(2, "flakyActivity")], + [newTaskCompletedEvent(2, JSON.stringify(42))], + ); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(completeAction?.getResult()?.getValue()).toEqual(JSON.stringify(42)); + }); + + it("should fail when retry handler returns false", async () => { + const { result: startResult, replay } = await startOrchestration( + async function* (ctx: OrchestrationContext): any { + const retryHandler = async (retryCtx: any) => retryCtx.lastFailure.errorType !== "PermanentError"; + return yield ctx.callActivity("failingActivity", undefined, { retry: retryHandler }); + }, + ); + + expect(startResult.actions[0].hasScheduletask()).toBe(true); - // Step 4: Second activity attempt fails - max attempts reached - allEvents.push(newTimerFiredEvent(2, timerFireAt!)); - allEvents.push(newTaskScheduledEvent(3, "alwaysFailsActivity")); - executor = new OrchestrationExecutor(registry, testLogger); - result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ - newTaskFailedEvent(3, new Error("Failure on attempt 2")), - ]); + // Activity fails with PermanentError → handler returns false → orchestration fails + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("PermanentError"); + failureDetails.setErrormessage("This is permanent"); - // Assert: Orchestration should fail + const result = await replay( + [newTaskScheduledEvent(1, "failingActivity")], + [newTaskFailedEvent(1, new Error("This is permanent"), failureDetails)], + ); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + }); + + it("should handle sub-orchestration retry with handler", async () => { + const { registry, result: startResult, replay } = await startOrchestration( + async function* (ctx: OrchestrationContext, input: number): any { + const retryHandler = async (retryCtx: any) => retryCtx.lastAttemptNumber < 3; + return yield ctx.callSubOrchestrator("subOrch", input, { retry: retryHandler }); + }, + 21, + ); + registry.addNamedOrchestrator("subOrch", async function* () { yield; }); + + expect(startResult.actions[0].hasCreatesuborchestration()).toBe(true); + + // Sub-orchestration fails → rescheduled immediately + let result = await replay( + [newSubOrchestrationCreatedEvent(1, "subOrch", "sub-orch-1")], + [newSubOrchestrationFailedEvent(1, new Error("Sub-orch failed"))], + ); + expect(result.actions[0].hasCreatesuborchestration()).toBe(true); + + // Retried sub-orchestration completes + result = await replay( + [newSubOrchestrationFailedEvent(1, new Error("Sub-orch failed")), newSubOrchestrationCreatedEvent(2, "subOrch", "sub-orch-2")], + [newSubOrchestrationCompletedEvent(2, JSON.stringify(42))], + ); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(completeAction?.getResult()?.getValue()).toEqual(JSON.stringify(42)); + }); + + it("should fail after handler returns false on third attempt", async () => { + const { result: startResult, replay } = await startOrchestration( + async function* (ctx: OrchestrationContext): any { + const retryHandler = async (retryCtx: any) => retryCtx.lastAttemptNumber < 3; + return yield ctx.callActivity("flakyActivity", undefined, { retry: retryHandler }); + }, + ); + + expect(startResult.actions[0].hasScheduletask()).toBe(true); + + // Failure 1 → rescheduled (attempt 1 < 3) + let result = await replay( + [newTaskScheduledEvent(1, "flakyActivity")], + [newTaskFailedEvent(1, new Error("Failure 1"))], + ); + expect(result.actions[0].hasScheduletask()).toBe(true); + + // Failure 2 → rescheduled (attempt 2 < 3) + result = await replay( + [newTaskFailedEvent(1, new Error("Failure 1")), newTaskScheduledEvent(2, "flakyActivity")], + [newTaskFailedEvent(2, new Error("Failure 2"))], + ); + expect(result.actions[0].hasScheduletask()).toBe(true); + + // Failure 3 → handler returns false (3 < 3 is false) → orchestration fails + result = await replay( + [newTaskFailedEvent(2, new Error("Failure 2")), newTaskScheduledEvent(3, "flakyActivity")], + [newTaskFailedEvent(3, new Error("Failure 3"))], + ); const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); }); diff --git a/packages/durabletask-js/test/retry-context.spec.ts b/packages/durabletask-js/test/retry-context.spec.ts new file mode 100644 index 0000000..9906d65 --- /dev/null +++ b/packages/durabletask-js/test/retry-context.spec.ts @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { RetryContext, createRetryContext } from "../src/task/retry/retry-context"; +import { OrchestrationContext } from "../src/task/context/orchestration-context"; +import { TaskFailureDetails } from "../src/task/failure-details"; + +describe("RetryContext", () => { + const mockOrchCtx = {} as OrchestrationContext; + const mockFailureDetails: TaskFailureDetails = { + errorType: "TestError", + message: "Test error message", + stackTrace: "at TestFile.ts:10", + }; + + describe("createRetryContext", () => { + it("should create a RetryContext with all required properties", () => { + const context = createRetryContext(mockOrchCtx, 1, mockFailureDetails, 5000); + + expect(context.orchestrationContext).toBe(mockOrchCtx); + expect(context.lastAttemptNumber).toBe(1); + expect(context.lastFailure).toBe(mockFailureDetails); + expect(context.totalRetryTimeInMilliseconds).toBe(5000); + }); + + it("should preserve failure details correctly", () => { + const detailedFailure: TaskFailureDetails = { + errorType: "ValidationError", + message: "Input validation failed", + stackTrace: "at Validator.ts:25\nat Handler.ts:50", + }; + + const context = createRetryContext(mockOrchCtx, 1, detailedFailure, 1000); + + expect(context.lastFailure.errorType).toBe("ValidationError"); + expect(context.lastFailure.message).toBe("Input validation failed"); + expect(context.lastFailure.stackTrace).toContain("Validator.ts"); + }); + + it("should handle zero retry time", () => { + const context = createRetryContext(mockOrchCtx, 1, mockFailureDetails, 0); + + expect(context.totalRetryTimeInMilliseconds).toBe(0); + }); + + it("should handle large retry times", () => { + const context = createRetryContext(mockOrchCtx, 10, mockFailureDetails, 3600000); // 1 hour + + expect(context.totalRetryTimeInMilliseconds).toBe(3600000); + }); + + it("should handle high attempt numbers", () => { + const context = createRetryContext(mockOrchCtx, 100, mockFailureDetails, 500000); + + expect(context.lastAttemptNumber).toBe(100); + }); + }); + + describe("RetryContext interface", () => { + it("should be readonly - properties cannot be modified", () => { + const context: RetryContext = createRetryContext(mockOrchCtx, 1, mockFailureDetails, 5000); + + // TypeScript should prevent these at compile time, but we verify the object structure + expect(Object.keys(context)).toContain("orchestrationContext"); + expect(Object.keys(context)).toContain("lastAttemptNumber"); + expect(Object.keys(context)).toContain("lastFailure"); + expect(Object.keys(context)).toContain("totalRetryTimeInMilliseconds"); + }); + }); +}); diff --git a/packages/durabletask-js/test/retry-handler-task.spec.ts b/packages/durabletask-js/test/retry-handler-task.spec.ts new file mode 100644 index 0000000..cad9595 --- /dev/null +++ b/packages/durabletask-js/test/retry-handler-task.spec.ts @@ -0,0 +1,427 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { RetryHandlerTask } from "../src/task/retry-handler-task"; +import { AsyncRetryHandler } from "../src/task/retry/retry-handler"; +import { OrchestrationContext } from "../src/task/context/orchestration-context"; +import * as pb from "../src/proto/orchestrator_service_pb"; +import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb"; + +/** + * Creates a minimal mock OrchestrationContext for testing. + */ +function createMockOrchestrationContext(overrides?: { isReplaying?: boolean }): OrchestrationContext { + return { + instanceId: "test-instance-id", + parent: undefined, + currentUtcDateTime: new Date(), + isReplaying: overrides?.isReplaying ?? false, + version: "", + compareVersionTo: () => 0, + createTimer: () => { + throw new Error("Not implemented"); + }, + callActivity: () => { + throw new Error("Not implemented"); + }, + callSubOrchestrator: () => { + throw new Error("Not implemented"); + }, + waitForExternalEvent: () => { + throw new Error("Not implemented"); + }, + continueAsNew: () => { + throw new Error("Not implemented"); + }, + createReplaySafeLogger: () => { + throw new Error("Not implemented"); + }, + setCustomStatus: () => { + throw new Error("Not implemented"); + }, + sendEvent: () => { + throw new Error("Not implemented"); + }, + } as unknown as OrchestrationContext; +} + +describe("RetryHandlerTask", () => { + let mockCtx: OrchestrationContext; + + beforeEach(() => { + mockCtx = createMockOrchestrationContext(); + }); + + describe("constructor", () => { + it("should create a retry handler task with correct initial state", () => { + // Arrange + const handler: AsyncRetryHandler = async () => true; + const action = new pb.OrchestratorAction(); + const startTime = new Date(); + + // Act + const task = new RetryHandlerTask(handler, mockCtx, action, startTime, "activity"); + + // Assert + expect(task.attemptCount).toBe(1); + expect(task.taskType).toBe("activity"); + expect(task.action).toBe(action); + expect(task.startTime).toBe(startTime); + expect(task.handler).toBe(handler); + expect(task.lastFailure).toBeUndefined(); + }); + + it("should create a sub-orchestration retry handler task", () => { + // Arrange + const handler: AsyncRetryHandler = async () => false; + const action = new pb.OrchestratorAction(); + const startTime = new Date(); + + // Act + const task = new RetryHandlerTask(handler, mockCtx, action, startTime, "subOrchestration"); + + // Assert + expect(task.taskType).toBe("subOrchestration"); + }); + }); + + describe("recordFailure", () => { + it("should store failure details", () => { + // Arrange + const handler: AsyncRetryHandler = async () => true; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler, mockCtx, action, new Date(), "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TransientError"); + failureDetails.setErrormessage("Connection timeout"); + + // Act + task.recordFailure("Connection timeout", failureDetails); + + // Assert + expect(task.lastFailure).toBe(failureDetails); + }); + + it("should create default failure details when none provided", () => { + // Arrange + const handler: AsyncRetryHandler = async () => true; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler, mockCtx, action, new Date(), "activity"); + + // Act + task.recordFailure("Something failed"); + + // Assert + expect(task.lastFailure).toBeDefined(); + }); + }); + + describe("incrementAttemptCount", () => { + it("should increment the attempt count", () => { + // Arrange + const handler: AsyncRetryHandler = async () => true; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler, mockCtx, action, new Date(), "activity"); + + // Act + task.incrementAttemptCount(); + task.incrementAttemptCount(); + + // Assert + expect(task.attemptCount).toBe(3); + }); + }); + + describe("complete", () => { + it("should clear failure state on successful completion", () => { + // Arrange + const handler: AsyncRetryHandler = async () => true; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler, mockCtx, action, new Date(), "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TransientError"); + task.recordFailure("Previous failure", failureDetails); + + // Act + task.complete("success"); + + // Assert + expect(task.isComplete).toBe(true); + expect(task.isFailed).toBe(false); + expect(task.lastFailure).toBeUndefined(); + expect(task.getResult()).toBe("success"); + }); + }); + + describe("fail", () => { + it("should mark task as failed", () => { + // Arrange + const handler: AsyncRetryHandler = async () => true; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler, mockCtx, action, new Date(), "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("PermanentError"); + failureDetails.setErrormessage("Not retryable"); + + // Act + task.fail("Not retryable", failureDetails); + + // Assert + expect(task.isComplete).toBe(true); + expect(task.isFailed).toBe(true); + }); + + it("should throw if task is already completed", () => { + // Arrange + const handler: AsyncRetryHandler = async () => true; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler, mockCtx, action, new Date(), "activity"); + task.complete("done"); + + // Act & Assert + expect(() => task.fail("fail")).toThrow("Task is already completed"); + }); + }); + + describe("shouldRetry", () => { + it("should return false when no failure recorded", async () => { + // Arrange + const handler: AsyncRetryHandler = async () => true; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler, mockCtx, action, new Date(), "activity"); + + // Act + const result = await task.shouldRetry(new Date()); + + // Assert + expect(result).toBe(false); + }); + + it("should return false when failure is non-retriable", async () => { + // Arrange + const handler: AsyncRetryHandler = async () => true; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler, mockCtx, action, new Date(), "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("NotFoundError"); + failureDetails.setErrormessage("Activity not found"); + failureDetails.setIsnonretriable(true); + task.recordFailure("Activity not found", failureDetails); + + // Act + const result = await task.shouldRetry(new Date()); + + // Assert + expect(result).toBe(false); + }); + + it("should call handler and return true when handler says retry", async () => { + // Arrange + const handler: AsyncRetryHandler = async (ctx) => { + return ctx.lastAttemptNumber < 3; + }; + const action = new pb.OrchestratorAction(); + const startTime = new Date(2025, 0, 1); + const task = new RetryHandlerTask(handler, mockCtx, action, startTime, "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TransientError"); + failureDetails.setErrormessage("Connection timeout"); + task.recordFailure("Connection timeout", failureDetails); + + const currentTime = new Date(2025, 0, 1, 0, 0, 1); // 1 second later + + // Act + const result = await task.shouldRetry(currentTime); + + // Assert + expect(result).toBe(true); + }); + + it("should call handler and return false when handler says stop", async () => { + // Arrange + const handler: AsyncRetryHandler = async (ctx) => { + return ctx.lastFailure.errorType !== "PermanentError"; + }; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler, mockCtx, action, new Date(), "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("PermanentError"); + failureDetails.setErrormessage("Permanent failure"); + task.recordFailure("Permanent failure", failureDetails); + + // Act + const result = await task.shouldRetry(new Date()); + + // Assert + expect(result).toBe(false); + }); + + it("should pass correct retry context to handler", async () => { + // Arrange + let capturedContext: any; + const handler: AsyncRetryHandler = async (ctx) => { + capturedContext = ctx; + return true; + }; + const action = new pb.OrchestratorAction(); + const startTime = new Date(2025, 0, 1, 0, 0, 0); + const task = new RetryHandlerTask(handler, mockCtx, action, startTime, "activity"); + + // Simulate 2 previous failures + task.incrementAttemptCount(); // attempt count = 2 + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TransientError"); + failureDetails.setErrormessage("Connection timeout"); + const stackTrace = new StringValue(); + stackTrace.setValue("Error: Connection timeout\n at line 1"); + failureDetails.setStacktrace(stackTrace); + task.recordFailure("Connection timeout", failureDetails); + + const currentTime = new Date(2025, 0, 1, 0, 0, 5); // 5 seconds later + + // Act + await task.shouldRetry(currentTime); + + // Assert + expect(capturedContext).toBeDefined(); + expect(capturedContext.orchestrationContext).toBe(mockCtx); + expect(capturedContext.lastAttemptNumber).toBe(2); + expect(capturedContext.lastFailure.errorType).toBe("TransientError"); + expect(capturedContext.lastFailure.message).toBe("Connection timeout"); + expect(capturedContext.lastFailure.stackTrace).toBe("Error: Connection timeout\n at line 1"); + expect(capturedContext.totalRetryTimeInMilliseconds).toBe(5000); + }); + + it("should expose orchestrationContext.isReplaying in retry context", async () => { + // Arrange + const replayingCtx = createMockOrchestrationContext({ isReplaying: true }); + let capturedIsReplaying: boolean | undefined; + const handler: AsyncRetryHandler = async (ctx) => { + capturedIsReplaying = ctx.orchestrationContext.isReplaying; + return true; + }; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler, replayingCtx, action, new Date(), "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TransientError"); + failureDetails.setErrormessage("Timeout"); + task.recordFailure("Timeout", failureDetails); + + // Act + await task.shouldRetry(new Date()); + + // Assert + expect(capturedIsReplaying).toBe(true); + }); + + it("should allow handler to make decisions based on totalRetryTime", async () => { + // Arrange - handler that stops after 10 seconds + const handler: AsyncRetryHandler = async (ctx) => { + return ctx.totalRetryTimeInMilliseconds < 10000; + }; + const action = new pb.OrchestratorAction(); + const startTime = new Date(2025, 0, 1, 0, 0, 0); + const task = new RetryHandlerTask(handler, mockCtx, action, startTime, "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TransientError"); + failureDetails.setErrormessage("Timeout"); + task.recordFailure("Timeout", failureDetails); + + // Act - 5 seconds elapsed (under limit) + const result1 = await task.shouldRetry(new Date(2025, 0, 1, 0, 0, 5)); + + // Assert + expect(result1).toBe(true); + + // Act - 15 seconds elapsed (over limit) + const result2 = await task.shouldRetry(new Date(2025, 0, 1, 0, 0, 15)); + + // Assert + expect(result2).toBe(false); + }); + + it("should return a delay in milliseconds when handler returns a number", async () => { + // Arrange - handler that returns exponential backoff delay + const handler: AsyncRetryHandler = async (ctx) => { + return 1000 * Math.pow(2, ctx.lastAttemptNumber - 1); + }; + const action = new pb.OrchestratorAction(); + const startTime = new Date(2025, 0, 1); + const task = new RetryHandlerTask(handler, mockCtx, action, startTime, "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TransientError"); + failureDetails.setErrormessage("Connection timeout"); + task.recordFailure("Connection timeout", failureDetails); + + const currentTime = new Date(2025, 0, 1, 0, 0, 1); + + // Act + const result = await task.shouldRetry(currentTime); + + // Assert - first attempt → 1000 * 2^0 = 1000ms + expect(result).toBe(1000); + }); + + it("should support sync handler returning a number", async () => { + // Arrange + const handler = (_ctx: any) => 5000; + const action = new pb.OrchestratorAction(); + const task = new RetryHandlerTask(handler as any, mockCtx, action, new Date(), "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TransientError"); + failureDetails.setErrormessage("Timeout"); + task.recordFailure("Timeout", failureDetails); + + // Act + const result = await task.shouldRetry(new Date()); + + // Assert + expect(result).toBe(5000); + }); + + it("should support handler switching from delay to immediate retry", async () => { + // Arrange - handler that returns delay for first 2 attempts, then true for immediate retry + const handler: AsyncRetryHandler = async (ctx) => { + if (ctx.lastAttemptNumber <= 2) { + return 2000; // delay 2 seconds + } + return true; // retry immediately after that + }; + const action = new pb.OrchestratorAction(); + const startTime = new Date(2025, 0, 1, 0, 0, 0); + const task = new RetryHandlerTask(handler, mockCtx, action, startTime, "activity"); + + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TransientError"); + failureDetails.setErrormessage("Timeout"); + task.recordFailure("Timeout", failureDetails); + + // Act - first attempt (attempt count = 1) + const result1 = await task.shouldRetry(new Date(2025, 0, 1, 0, 0, 1)); + + // Assert - should return delay + expect(result1).toBe(2000); + + // Simulate progressing past attempt 2 + task.incrementAttemptCount(); // attempt 2 + task.incrementAttemptCount(); // attempt 3 + + // Act - third attempt + const result2 = await task.shouldRetry(new Date(2025, 0, 1, 0, 0, 5)); + + // Assert - should return immediate retry + expect(result2).toBe(true); + }); + }); +}); diff --git a/packages/durabletask-js/test/retry-handler.spec.ts b/packages/durabletask-js/test/retry-handler.spec.ts new file mode 100644 index 0000000..08f5a34 --- /dev/null +++ b/packages/durabletask-js/test/retry-handler.spec.ts @@ -0,0 +1,189 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { RetryHandler, AsyncRetryHandler, toAsyncRetryHandler } from "../src/task/retry/retry-handler"; +import { RetryContext, createRetryContext } from "../src/task/retry/retry-context"; +import { OrchestrationContext } from "../src/task/context/orchestration-context"; +import { TaskFailureDetails } from "../src/task/failure-details"; + +describe("RetryHandler", () => { + const mockOrchCtx = {} as OrchestrationContext; + const mockFailureDetails: TaskFailureDetails = { + errorType: "TestError", + message: "Test error message", + stackTrace: "at TestFile.ts:10", + }; + + const createTestContext = (attemptNumber: number, errorType: string = "TestError"): RetryContext => { + return createRetryContext( + mockOrchCtx, + attemptNumber, + { ...mockFailureDetails, errorType }, + attemptNumber * 1000, + ); + }; + + describe("RetryHandler type", () => { + it("should return true to continue retrying", () => { + const handler: RetryHandler = () => true; + + const result = handler(createTestContext(1)); + + expect(result).toBe(true); + }); + + it("should return false to stop retrying", () => { + const handler: RetryHandler = () => false; + + const result = handler(createTestContext(1)); + + expect(result).toBe(false); + }); + + it("should be able to check attempt number", () => { + const handler: RetryHandler = (context) => context.lastAttemptNumber < 3; + + expect(handler(createTestContext(1))).toBe(true); + expect(handler(createTestContext(2))).toBe(true); + expect(handler(createTestContext(3))).toBe(false); + expect(handler(createTestContext(4))).toBe(false); + }); + + it("should be able to check error type", () => { + const handler: RetryHandler = (context) => { + return context.lastFailure.errorType !== "ValidationError"; + }; + + expect(handler(createTestContext(1, "TransientError"))).toBe(true); + expect(handler(createTestContext(1, "ValidationError"))).toBe(false); + }); + + it("should be able to check total retry time", () => { + const handler: RetryHandler = (context) => { + // Stop retrying after 5 seconds total + return context.totalRetryTimeInMilliseconds < 5000; + }; + + expect(handler(createRetryContext(mockOrchCtx, 1, mockFailureDetails, 1000))).toBe(true); + expect(handler(createRetryContext(mockOrchCtx, 3, mockFailureDetails, 4999))).toBe(true); + expect(handler(createRetryContext(mockOrchCtx, 5, mockFailureDetails, 5000))).toBe(false); + }); + + it("should support complex retry logic", () => { + const handler: RetryHandler = (context) => { + // Don't retry validation errors + if (context.lastFailure.errorType === "ValidationError") return false; + + // Don't retry after 5 attempts + if (context.lastAttemptNumber >= 5) return false; + + // Don't retry after 30 seconds + if (context.totalRetryTimeInMilliseconds > 30000) return false; + + return true; + }; + + // Should retry for normal errors under limits + expect(handler(createRetryContext(mockOrchCtx, 1, mockFailureDetails, 1000))).toBe(true); + + // Should not retry validation errors + expect( + handler( + createRetryContext( + mockOrchCtx, + 1, + { ...mockFailureDetails, errorType: "ValidationError" }, + 1000, + ), + ), + ).toBe(false); + + // Should not retry after 5 attempts + expect(handler(createRetryContext(mockOrchCtx, 5, mockFailureDetails, 5000))).toBe(false); + + // Should not retry after 30 seconds + expect(handler(createRetryContext(mockOrchCtx, 3, mockFailureDetails, 31000))).toBe(false); + }); + }); + + describe("AsyncRetryHandler type", () => { + it("should return Promise that resolves to true", async () => { + const handler: AsyncRetryHandler = async () => true; + + const result = await handler(createTestContext(1)); + + expect(result).toBe(true); + }); + + it("should return Promise that resolves to false", async () => { + const handler: AsyncRetryHandler = async () => false; + + const result = await handler(createTestContext(1)); + + expect(result).toBe(false); + }); + + it("should support async logic", async () => { + const handler: AsyncRetryHandler = async (context) => { + // Simulate some async operation (in practice this would be deterministic) + await Promise.resolve(); + return context.lastAttemptNumber < 3; + }; + + expect(await handler(createTestContext(1))).toBe(true); + expect(await handler(createTestContext(2))).toBe(true); + expect(await handler(createTestContext(3))).toBe(false); + }); + }); + + describe("toAsyncRetryHandler", () => { + it("should wrap a sync handler in a Promise", async () => { + const syncHandler: RetryHandler = (context) => context.lastAttemptNumber < 3; + const asyncHandler = toAsyncRetryHandler(syncHandler); + + expect(await asyncHandler(createTestContext(1))).toBe(true); + expect(await asyncHandler(createTestContext(2))).toBe(true); + expect(await asyncHandler(createTestContext(3))).toBe(false); + }); + + it("should preserve the logic of the original handler", async () => { + const syncHandler: RetryHandler = (context) => { + return context.lastFailure.errorType !== "FatalError" && context.lastAttemptNumber < 5; + }; + + const asyncHandler = toAsyncRetryHandler(syncHandler); + + expect(await asyncHandler(createTestContext(1, "TransientError"))).toBe(true); + expect(await asyncHandler(createTestContext(1, "FatalError"))).toBe(false); + expect(await asyncHandler(createTestContext(5, "TransientError"))).toBe(false); + }); + + it("should return a function", () => { + const syncHandler: RetryHandler = () => true; + const asyncHandler = toAsyncRetryHandler(syncHandler); + + expect(typeof asyncHandler).toBe("function"); + }); + + it("should return a Promise", () => { + const syncHandler: RetryHandler = () => true; + const asyncHandler = toAsyncRetryHandler(syncHandler); + + const result = asyncHandler(createTestContext(1)); + + expect(result).toBeInstanceOf(Promise); + }); + + it("should wrap a sync handler that returns a delay number", async () => { + const syncHandler: RetryHandler = (context) => { + if (context.lastAttemptNumber >= 3) return false; + return 1000 * context.lastAttemptNumber; + }; + const asyncHandler = toAsyncRetryHandler(syncHandler); + + expect(await asyncHandler(createTestContext(1))).toBe(1000); + expect(await asyncHandler(createTestContext(2))).toBe(2000); + expect(await asyncHandler(createTestContext(3))).toBe(false); + }); + }); +}); diff --git a/packages/durabletask-js/test/retry-policy-handle-failure.spec.ts b/packages/durabletask-js/test/retry-policy-handle-failure.spec.ts new file mode 100644 index 0000000..761f1d5 --- /dev/null +++ b/packages/durabletask-js/test/retry-policy-handle-failure.spec.ts @@ -0,0 +1,188 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { RetryPolicy, FailureHandlerPredicate } from "../src/task/retry/retry-policy"; +import { TaskFailureDetails } from "../src/task/failure-details"; + +describe("RetryPolicy handleFailure", () => { + const defaultOptions = { + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + }; + + const createFailure = (errorType: string, message: string = "Error"): TaskFailureDetails => ({ + errorType, + message, + stackTrace: "at test.ts:1", + }); + + describe("default handleFailure behavior", () => { + it("should default to always returning true (retry all failures)", () => { + const policy = new RetryPolicy(defaultOptions); + + expect(policy.shouldRetry(createFailure("TestError"))).toBe(true); + expect(policy.shouldRetry(createFailure("ValidationError"))).toBe(true); + expect(policy.shouldRetry(createFailure("NetworkError"))).toBe(true); + }); + + it("should expose handleFailure getter", () => { + const policy = new RetryPolicy(defaultOptions); + + expect(typeof policy.handleFailure).toBe("function"); + }); + }); + + describe("custom handleFailure predicate", () => { + it("should use custom predicate when provided", () => { + const handleFailure: FailureHandlerPredicate = (failure) => { + return failure.errorType !== "ValidationError"; + }; + + const policy = new RetryPolicy({ + ...defaultOptions, + handleFailure, + }); + + expect(policy.shouldRetry(createFailure("TransientError"))).toBe(true); + expect(policy.shouldRetry(createFailure("ValidationError"))).toBe(false); + }); + + it("should support filtering by error message", () => { + const policy = new RetryPolicy({ + ...defaultOptions, + handleFailure: (failure) => !failure.message.includes("permanent"), + }); + + expect(policy.shouldRetry(createFailure("Error", "Temporary issue"))).toBe(true); + expect(policy.shouldRetry(createFailure("Error", "This is a permanent failure"))).toBe(false); + }); + + it("should support filtering by stack trace", () => { + const policy = new RetryPolicy({ + ...defaultOptions, + handleFailure: (failure) => { + return !failure.stackTrace?.includes("CriticalService"); + }, + }); + + const normalFailure = createFailure("Error"); + const criticalFailure: TaskFailureDetails = { + errorType: "Error", + message: "Error", + stackTrace: "at CriticalService.ts:100", + }; + + expect(policy.shouldRetry(normalFailure)).toBe(true); + expect(policy.shouldRetry(criticalFailure)).toBe(false); + }); + + it("should support multiple error types to exclude", () => { + const nonRetryableErrors = ["ValidationError", "AuthenticationError", "AuthorizationError"]; + + const policy = new RetryPolicy({ + ...defaultOptions, + handleFailure: (failure) => !nonRetryableErrors.includes(failure.errorType), + }); + + expect(policy.shouldRetry(createFailure("TransientError"))).toBe(true); + expect(policy.shouldRetry(createFailure("NetworkError"))).toBe(true); + expect(policy.shouldRetry(createFailure("ValidationError"))).toBe(false); + expect(policy.shouldRetry(createFailure("AuthenticationError"))).toBe(false); + expect(policy.shouldRetry(createFailure("AuthorizationError"))).toBe(false); + }); + + it("should support multiple error types to include (whitelist)", () => { + const retryableErrors = ["TransientError", "NetworkError", "TimeoutError"]; + + const policy = new RetryPolicy({ + ...defaultOptions, + handleFailure: (failure) => retryableErrors.includes(failure.errorType), + }); + + expect(policy.shouldRetry(createFailure("TransientError"))).toBe(true); + expect(policy.shouldRetry(createFailure("NetworkError"))).toBe(true); + expect(policy.shouldRetry(createFailure("TimeoutError"))).toBe(true); + expect(policy.shouldRetry(createFailure("ValidationError"))).toBe(false); + expect(policy.shouldRetry(createFailure("UnknownError"))).toBe(false); + }); + + it("should handle undefined stack trace gracefully", () => { + const policy = new RetryPolicy({ + ...defaultOptions, + handleFailure: (failure) => { + // Safe check for stack trace + return failure.stackTrace?.includes("retry") ?? true; + }, + }); + + const failureWithStack: TaskFailureDetails = { + errorType: "Error", + message: "Error", + stackTrace: "at retry.ts:10", + }; + + const failureWithoutStack: TaskFailureDetails = { + errorType: "Error", + message: "Error", + stackTrace: undefined, + }; + + expect(policy.shouldRetry(failureWithStack)).toBe(true); + expect(policy.shouldRetry(failureWithoutStack)).toBe(true); + }); + }); + + describe("handleFailure getter vs shouldRetry method", () => { + it("handleFailure returns the predicate function", () => { + const predicate: FailureHandlerPredicate = (failure) => failure.errorType === "RetryableError"; + + const policy = new RetryPolicy({ + ...defaultOptions, + handleFailure: predicate, + }); + + // The getter returns the function itself + expect(policy.handleFailure).toBe(predicate); + }); + + it("shouldRetry is a convenience method that calls the predicate", () => { + const calls: TaskFailureDetails[] = []; + const predicate: FailureHandlerPredicate = (failure) => { + calls.push(failure); + return true; + }; + + const policy = new RetryPolicy({ + ...defaultOptions, + handleFailure: predicate, + }); + + const failure = createFailure("TestError"); + policy.shouldRetry(failure); + + expect(calls).toHaveLength(1); + expect(calls[0]).toBe(failure); + }); + }); + + describe("integration with other RetryPolicy options", () => { + it("should work with all retry options together", () => { + const policy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 1000, + backoffCoefficient: 2.0, + maxRetryIntervalInMilliseconds: 30000, + retryTimeoutInMilliseconds: 300000, + handleFailure: (failure) => failure.errorType !== "PermanentError", + }); + + expect(policy.maxNumberOfAttempts).toBe(5); + expect(policy.firstRetryIntervalInMilliseconds).toBe(1000); + expect(policy.backoffCoefficient).toBe(2.0); + expect(policy.maxRetryIntervalInMilliseconds).toBe(30000); + expect(policy.retryTimeoutInMilliseconds).toBe(300000); + expect(policy.shouldRetry(createFailure("TransientError"))).toBe(true); + expect(policy.shouldRetry(createFailure("PermanentError"))).toBe(false); + }); + }); +}); diff --git a/packages/durabletask-js/test/retryable-task.spec.ts b/packages/durabletask-js/test/retryable-task.spec.ts index 0aeb801..be72860 100644 --- a/packages/durabletask-js/test/retryable-task.spec.ts +++ b/packages/durabletask-js/test/retryable-task.spec.ts @@ -194,6 +194,108 @@ describe("RetryableTask", () => { // Assert - should still allow retry (timeout is infinite) expect(delay).toBeGreaterThan(0); }); + + it("should return undefined when handleFailure predicate returns false", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 10, + firstRetryIntervalInMilliseconds: 1000, + handleFailure: (failure) => failure.errorType !== "FatalError", + }); + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, new Date(), "activity"); + + // Record a FatalError (should NOT be retried) + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("FatalError"); + failureDetails.setErrormessage("This is a fatal error"); + task.recordFailure("This is a fatal error", failureDetails); + + const currentTime = new Date(); + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert - should return undefined because handleFailure returns false for FatalError + expect(delay).toBeUndefined(); + }); + + it("should allow retry when handleFailure predicate returns true", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 10, + firstRetryIntervalInMilliseconds: 1000, + handleFailure: (failure) => failure.errorType !== "FatalError", + }); + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, new Date(), "activity"); + + // Record a TransientError (should be retried) + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TransientError"); + failureDetails.setErrormessage("This is a transient error"); + task.recordFailure("This is a transient error", failureDetails); + + const currentTime = new Date(); + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert - should return a delay because handleFailure returns true for TransientError + expect(delay).toBeDefined(); + expect(delay).toBeGreaterThan(0); + }); + + it("should filter based on error message content in handleFailure", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 10, + firstRetryIntervalInMilliseconds: 1000, + handleFailure: (failure) => failure.message?.includes("timeout") ?? false, + }); + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, new Date(), "activity"); + + // Record a validation error (should NOT be retried - no "timeout" in message) + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("ValidationError"); + failureDetails.setErrormessage("Invalid input: field is required"); + task.recordFailure("Invalid input: field is required", failureDetails); + + const currentTime = new Date(); + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert - should return undefined because message doesn't contain "timeout" + expect(delay).toBeUndefined(); + }); + + it("should retry when error message matches handleFailure criteria", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 10, + firstRetryIntervalInMilliseconds: 1000, + handleFailure: (failure) => failure.message?.includes("timeout") ?? false, + }); + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, new Date(), "activity"); + + // Record a timeout error (should be retried - has "timeout" in message) + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("NetworkError"); + failureDetails.setErrormessage("Connection timeout - please retry"); + task.recordFailure("Connection timeout - please retry", failureDetails); + + const currentTime = new Date(); + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert - should return a delay because message contains "timeout" + expect(delay).toBeDefined(); + expect(delay).toBeGreaterThan(0); + }); }); describe("incrementAttemptCount", () => { diff --git a/packages/durabletask-js/test/task-options-retry-handler.spec.ts b/packages/durabletask-js/test/task-options-retry-handler.spec.ts new file mode 100644 index 0000000..6761107 --- /dev/null +++ b/packages/durabletask-js/test/task-options-retry-handler.spec.ts @@ -0,0 +1,155 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { RetryPolicy } from "../src/task/retry/retry-policy"; +import { RetryHandler, AsyncRetryHandler, toAsyncRetryHandler } from "../src/task/retry/retry-handler"; +import { + TaskOptions, + SubOrchestrationOptions, + TaskRetryOptions, + isRetryPolicy, + isRetryHandler, +} from "../src/task/options/task-options"; + +describe("TaskOptions with RetryHandler", () => { + const mockRetryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + }); + + const mockAsyncRetryHandler: AsyncRetryHandler = async (context) => { + return context.lastAttemptNumber < 3; + }; + + const mockSyncRetryHandler: RetryHandler = (context) => { + return context.lastAttemptNumber < 3; + }; + + describe("TaskRetryOptions type", () => { + it("should accept RetryPolicy", () => { + const options: TaskRetryOptions = mockRetryPolicy; + expect(options).toBe(mockRetryPolicy); + }); + + it("should accept AsyncRetryHandler", () => { + const options: TaskRetryOptions = mockAsyncRetryHandler; + expect(options).toBe(mockAsyncRetryHandler); + }); + + it("should accept synchronous RetryHandler directly", () => { + const options: TaskRetryOptions = mockSyncRetryHandler; + expect(options).toBe(mockSyncRetryHandler); + }); + }); + + describe("isRetryPolicy type guard", () => { + it("should return true for RetryPolicy instances", () => { + expect(isRetryPolicy(mockRetryPolicy)).toBe(true); + }); + + it("should return false for AsyncRetryHandler", () => { + expect(isRetryPolicy(mockAsyncRetryHandler)).toBe(false); + }); + + it("should return false for undefined", () => { + expect(isRetryPolicy(undefined)).toBe(false); + }); + + it("should return false for null", () => { + expect(isRetryPolicy(null as any)).toBe(false); + }); + }); + + describe("isRetryHandler type guard", () => { + it("should return true for AsyncRetryHandler", () => { + expect(isRetryHandler(mockAsyncRetryHandler)).toBe(true); + }); + + it("should return true for sync RetryHandler", () => { + expect(isRetryHandler(mockSyncRetryHandler)).toBe(true); + }); + + it("should return true for wrapped sync handler", () => { + const wrapped = toAsyncRetryHandler(mockSyncRetryHandler); + expect(isRetryHandler(wrapped)).toBe(true); + }); + + it("should return false for RetryPolicy instances", () => { + expect(isRetryHandler(mockRetryPolicy)).toBe(false); + }); + + it("should return false for undefined", () => { + expect(isRetryHandler(undefined)).toBe(false); + }); + }); + + describe("TaskOptions interface", () => { + it("should allow RetryPolicy as retry option", () => { + const options: TaskOptions = { + retry: mockRetryPolicy, + }; + + expect(isRetryPolicy(options.retry)).toBe(true); + }); + + it("should allow AsyncRetryHandler as retry option", () => { + const options: TaskOptions = { + retry: mockAsyncRetryHandler, + }; + + expect(isRetryHandler(options.retry)).toBe(true); + }); + + it("should allow synchronous RetryHandler as retry option", () => { + const options: TaskOptions = { + retry: mockSyncRetryHandler, + }; + + expect(isRetryHandler(options.retry)).toBe(true); + }); + + it("should allow combination with other options", () => { + const options: TaskOptions = { + retry: mockRetryPolicy, + tags: { env: "test" }, + version: "1.0.0", + }; + + expect(options.retry).toBe(mockRetryPolicy); + expect(options.tags).toEqual({ env: "test" }); + expect(options.version).toBe("1.0.0"); + }); + }); + + describe("SubOrchestrationOptions interface", () => { + it("should allow RetryPolicy with instanceId", () => { + const options: SubOrchestrationOptions = { + retry: mockRetryPolicy, + instanceId: "sub-orch-1", + }; + + expect(options.retry).toBe(mockRetryPolicy); + expect(options.instanceId).toBe("sub-orch-1"); + }); + + it("should allow AsyncRetryHandler with instanceId", () => { + const options: SubOrchestrationOptions = { + retry: mockAsyncRetryHandler, + instanceId: "sub-orch-2", + }; + + expect(isRetryHandler(options.retry)).toBe(true); + expect(options.instanceId).toBe("sub-orch-2"); + }); + + it("should allow synchronous RetryHandler with instanceId", () => { + const options: SubOrchestrationOptions = { + retry: mockSyncRetryHandler, + instanceId: "sub-orch-3", + }; + + expect(isRetryHandler(options.retry)).toBe(true); + expect(options.instanceId).toBe("sub-orch-3"); + }); + }); +}); diff --git a/test/e2e-azuremanaged/retry-advanced.spec.ts b/test/e2e-azuremanaged/retry-advanced.spec.ts new file mode 100644 index 0000000..3712fce --- /dev/null +++ b/test/e2e-azuremanaged/retry-advanced.spec.ts @@ -0,0 +1,443 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * E2E tests for advanced retry features in Durable Task Scheduler (DTS). + * + * These tests cover: + * - handleFailure predicate to filter which errors to retry + * - maxRetryIntervalInMilliseconds to cap backoff intervals + * - retryTimeoutInMilliseconds to limit total retry duration + * + * Environment variables (choose one): + * - DTS_CONNECTION_STRING: Full connection string + * OR + * - ENDPOINT: The endpoint for the DTS emulator (default: localhost:8080) + * - TASKHUB: The task hub name (default: default) + */ + +import { + TaskHubGrpcClient, + TaskHubGrpcWorker, + ProtoOrchestrationStatus as OrchestrationStatus, + ActivityContext, + OrchestrationContext, + TOrchestrator, + RetryPolicy, +} from "@microsoft/durabletask-js"; +import { + DurableTaskAzureManagedClientBuilder, + DurableTaskAzureManagedWorkerBuilder, +} from "@microsoft/durabletask-js-azuremanaged"; + +// Read environment variables +const connectionString = process.env.DTS_CONNECTION_STRING; +const endpoint = process.env.ENDPOINT || "localhost:8080"; +const taskHub = process.env.TASKHUB || "default"; + +function createClient(): TaskHubGrpcClient { + if (connectionString) { + return new DurableTaskAzureManagedClientBuilder().connectionString(connectionString).build(); + } + return new DurableTaskAzureManagedClientBuilder() + .endpoint(endpoint, taskHub, null) // null credential for emulator (no auth) + .build(); +} + +function createWorker(): TaskHubGrpcWorker { + if (connectionString) { + return new DurableTaskAzureManagedWorkerBuilder().connectionString(connectionString).build(); + } + return new DurableTaskAzureManagedWorkerBuilder() + .endpoint(endpoint, taskHub, null) // null credential for emulator (no auth) + .build(); +} + +describe("Advanced Retry Policy E2E Tests", () => { + let taskHubClient: TaskHubGrpcClient; + let taskHubWorker: TaskHubGrpcWorker; + + beforeEach(async () => { + taskHubClient = createClient(); + taskHubWorker = createWorker(); + }); + + afterEach(async () => { + try { + await taskHubWorker.stop(); + } catch { + // Worker wasn't started, ignore the error + } + await taskHubClient.stop(); + }); + + // ==================== handleFailure Predicate Tests ==================== + + describe("handleFailure predicate", () => { + it("should retry when handleFailure returns true", async () => { + let attemptCount = 0; + + const flakyActivity = async (_: ActivityContext, input: number) => { + attemptCount++; + if (attemptCount < 3) { + const error = new Error(`Transient failure on attempt ${attemptCount}`); + error.name = "TransientError"; + throw error; + } + return input * 2; + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 100, + backoffCoefficient: 1.0, + handleFailure: (failure) => { + // Only retry TransientError errors + return failure.errorType === "TransientError"; + }, + }); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + const result = yield ctx.callActivity(flakyActivity, input, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addActivity(flakyActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 21); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify(42)); + expect(attemptCount).toBe(3); // Failed twice with TransientError, succeeded on third + }, 31000); + + it("should stop retrying when handleFailure returns false", async () => { + let attemptCount = 0; + + const activityWithPermanentError = async (_: ActivityContext) => { + attemptCount++; + const error = new Error("This is a permanent error"); + error.name = "PermanentError"; + throw error; + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 100, + backoffCoefficient: 1.0, + handleFailure: (failure) => { + // Don't retry PermanentError - stop immediately + return failure.errorType !== "PermanentError"; + }, + }); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(activityWithPermanentError, undefined, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addActivity(activityWithPermanentError); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + // Should only attempt once since handleFailure returns false for PermanentError + expect(attemptCount).toBe(1); + }, 31000); + + it("should filter retries based on error message content", async () => { + let attemptCount = 0; + + const activityWithSpecificError = async (_: ActivityContext) => { + attemptCount++; + if (attemptCount === 1) { + throw new Error("Database connection timeout - please retry"); + } else if (attemptCount === 2) { + throw new Error("Invalid input: field 'email' is required"); + } + return "success"; + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 100, + backoffCoefficient: 1.0, + handleFailure: (failure) => { + // Only retry timeout errors, not validation errors + return failure.message?.includes("timeout") ?? false; + }, + }); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(activityWithSpecificError, undefined, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addActivity(activityWithSpecificError); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + // First attempt: timeout error (retry), Second attempt: validation error (no retry) + expect(attemptCount).toBe(2); + }, 31000); + + it("should apply handleFailure to sub-orchestration retries", async () => { + let attemptCount = 0; + + // eslint-disable-next-line require-yield + const subOrchestrator: TOrchestrator = async function* (_ctx: OrchestrationContext): any { + attemptCount++; + const error = new Error("Sub-orchestration failed"); + error.name = attemptCount === 1 ? "RetryableError" : "FatalError"; + throw error; + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 100, + backoffCoefficient: 1.0, + handleFailure: (failure) => { + // Only retry RetryableError + return failure.errorType === "RetryableError"; + }, + }); + + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callSubOrchestrator(subOrchestrator, undefined, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addOrchestrator(subOrchestrator); + taskHubWorker.addOrchestrator(parentOrchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(parentOrchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + // First: RetryableError (retry), Second: FatalError (no retry) + expect(attemptCount).toBe(2); + }, 31000); + }); + + // ==================== maxRetryIntervalInMilliseconds Tests ==================== + + describe("maxRetryIntervalInMilliseconds", () => { + it("should cap retry interval at maxRetryIntervalInMilliseconds", async () => { + const attemptTimes: number[] = []; + + const flakyActivity = async (_: ActivityContext) => { + attemptTimes.push(Date.now()); + if (attemptTimes.length < 4) { + throw new Error(`Transient failure on attempt ${attemptTimes.length}`); + } + return "success"; + }; + + // With backoffCoefficient=2.0 and first interval 200ms: + // Without cap: 200ms, 400ms, 800ms + // With cap at 300ms: 200ms, 300ms, 300ms + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 200, + backoffCoefficient: 2.0, + maxRetryIntervalInMilliseconds: 300, + }); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(flakyActivity, undefined, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addActivity(flakyActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(attemptTimes.length).toBe(4); + + // Verify delays are capped + if (attemptTimes.length >= 4) { + const delay1 = attemptTimes[1] - attemptTimes[0]; + const delay2 = attemptTimes[2] - attemptTimes[1]; + const delay3 = attemptTimes[3] - attemptTimes[2]; + + // First delay should be ~200ms + expect(delay1).toBeGreaterThanOrEqual(150); + expect(delay1).toBeLessThanOrEqual(400); + + // Second and third delays should be capped at ~300ms (not growing exponentially) + expect(delay2).toBeLessThanOrEqual(500); // With tolerance + expect(delay3).toBeLessThanOrEqual(500); // With tolerance + + // Without cap, third delay would be ~800ms; with cap it should be ~300ms + // So third delay should NOT be much larger than second delay + expect(delay3).toBeLessThanOrEqual(delay2 * 1.5); + } + }, 31000); + }); + + // ==================== retryTimeoutInMilliseconds Tests ==================== + + describe("retryTimeoutInMilliseconds", () => { + it("should stop retrying after retryTimeoutInMilliseconds expires", async () => { + let attemptCount = 0; + const startTime = Date.now(); + let endTime = 0; + + const slowFailingActivity = async (_: ActivityContext) => { + attemptCount++; + endTime = Date.now(); + throw new Error(`Failure on attempt ${attemptCount}`); + }; + + // Retry timeout of 800ms with 300ms intervals means max 2-3 attempts + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 10, // High max, but timeout should stop it earlier + firstRetryIntervalInMilliseconds: 300, + backoffCoefficient: 1.0, + retryTimeoutInMilliseconds: 800, + }); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(slowFailingActivity, undefined, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addActivity(slowFailingActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + + // Should have stopped before 10 attempts due to timeout + expect(attemptCount).toBeLessThan(10); + // Should have at least tried a couple times + expect(attemptCount).toBeGreaterThanOrEqual(2); + + // Total time should be roughly within the timeout window (with some tolerance) + const totalTime = endTime - startTime; + // The orchestration framework timing may add overhead, but attempts should stop reasonably quickly + expect(totalTime).toBeLessThan(2000); // Very generous upper bound + }, 31000); + }); + + // ==================== Combined Features Tests ==================== + + describe("combined retry features", () => { + it("should respect both handleFailure and maxRetryIntervalInMilliseconds", async () => { + let attemptCount = 0; + const attemptTimes: number[] = []; + + const flakyActivity = async (_: ActivityContext) => { + attemptCount++; + attemptTimes.push(Date.now()); + + if (attemptCount < 3) { + const error = new Error(`Transient failure on attempt ${attemptCount}`); + error.name = "TransientError"; + throw error; + } + return "success"; + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 200, + backoffCoefficient: 2.0, + maxRetryIntervalInMilliseconds: 250, + handleFailure: (failure) => failure.errorType === "TransientError", + }); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(flakyActivity, undefined, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addActivity(flakyActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(attemptCount).toBe(3); + + // Verify second delay is capped (would be 400ms without cap, should be ~250ms with cap) + if (attemptTimes.length >= 3) { + const delay2 = attemptTimes[2] - attemptTimes[1]; + expect(delay2).toBeLessThanOrEqual(400); // Should be capped at ~250ms + } + }, 31000); + + it("should fail fast when handleFailure returns false regardless of other settings", async () => { + let attemptCount = 0; + + const activityWithFatalError = async (_: ActivityContext) => { + attemptCount++; + const error = new Error("Fatal configuration error"); + error.name = "ConfigurationError"; + throw error; + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 10, + firstRetryIntervalInMilliseconds: 100, + backoffCoefficient: 2.0, + maxRetryIntervalInMilliseconds: 5000, + retryTimeoutInMilliseconds: 60000, + handleFailure: (failure) => { + // Never retry configuration errors + return failure.errorType !== "ConfigurationError"; + }, + }); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(activityWithFatalError, undefined, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addActivity(activityWithFatalError); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + // Should fail immediately on first attempt due to handleFailure returning false + expect(attemptCount).toBe(1); + }, 31000); + }); +}); diff --git a/test/e2e-azuremanaged/retry-handler.spec.ts b/test/e2e-azuremanaged/retry-handler.spec.ts new file mode 100644 index 0000000..c2ff2e3 --- /dev/null +++ b/test/e2e-azuremanaged/retry-handler.spec.ts @@ -0,0 +1,573 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * E2E tests for AsyncRetryHandler (custom retry logic) in Durable Task Scheduler (DTS). + * + * These tests cover: + * - Activity retry with custom retry handler + * - Sub-orchestration retry with custom retry handler + * - Retry handler that filters by error type + * - Retry handler that limits by attempt count + * - Sync RetryHandler support + * + * Environment variables (choose one): + * - DTS_CONNECTION_STRING: Full connection string + * OR + * - ENDPOINT: The endpoint for the DTS emulator (default: localhost:8080) + * - TASKHUB: The task hub name (default: default) + */ + +import { + TaskHubGrpcClient, + TaskHubGrpcWorker, + ProtoOrchestrationStatus as OrchestrationStatus, + ActivityContext, + OrchestrationContext, + TOrchestrator, + AsyncRetryHandler, + RetryHandler, + RetryContext, + toAsyncRetryHandler, +} from "@microsoft/durabletask-js"; +import { + DurableTaskAzureManagedClientBuilder, + DurableTaskAzureManagedWorkerBuilder, +} from "@microsoft/durabletask-js-azuremanaged"; + +// Read environment variables +const connectionString = process.env.DTS_CONNECTION_STRING; +const endpoint = process.env.ENDPOINT || "localhost:8080"; +const taskHub = process.env.TASKHUB || "default"; + +function createClient(): TaskHubGrpcClient { + if (connectionString) { + return new DurableTaskAzureManagedClientBuilder().connectionString(connectionString).build(); + } + return new DurableTaskAzureManagedClientBuilder() + .endpoint(endpoint, taskHub, null) // null credential for emulator (no auth) + .build(); +} + +function createWorker(): TaskHubGrpcWorker { + if (connectionString) { + return new DurableTaskAzureManagedWorkerBuilder().connectionString(connectionString).build(); + } + return new DurableTaskAzureManagedWorkerBuilder() + .endpoint(endpoint, taskHub, null) // null credential for emulator (no auth) + .build(); +} + +describe("Retry Handler E2E Tests", () => { + let taskHubClient: TaskHubGrpcClient; + let taskHubWorker: TaskHubGrpcWorker; + + beforeEach(async () => { + taskHubClient = createClient(); + taskHubWorker = createWorker(); + }); + + afterEach(async () => { + try { + await taskHubWorker.stop(); + } catch { + // Worker wasn't started, ignore the error + } + await taskHubClient.stop(); + }); + + // ==================== Activity Retry Handler Tests ==================== + + describe("activity retry with AsyncRetryHandler", () => { + it("should retry activity when handler returns true and succeed after transient failures", async () => { + let attemptCount = 0; + + const flakyActivity = async (_: ActivityContext, input: number) => { + attemptCount++; + if (attemptCount < 3) { + throw new Error(`Transient failure on attempt ${attemptCount}`); + } + return input * 2; + }; + + // Retry handler: retry up to 5 attempts + const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => { + return ctx.lastAttemptNumber < 5; + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + const result = yield ctx.callActivity(flakyActivity, input, { retry: retryHandler }); + return result; + }; + + taskHubWorker.addActivity(flakyActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 21); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify(42)); + expect(attemptCount).toBe(3); // Failed twice, succeeded on third + }, 31000); + + it("should fail when retry handler returns false", async () => { + let attemptCount = 0; + + const failingActivity = async (_: ActivityContext) => { + attemptCount++; + const error = new Error("Permanent failure"); + error.name = "PermanentError"; + throw error; + }; + + // Retry handler: don't retry PermanentError + const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => { + return ctx.lastFailure.errorType !== "PermanentError"; + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(failingActivity, undefined, { retry: retryHandler }); + return result; + }; + + taskHubWorker.addActivity(failingActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + // Should only attempt once since handler returns false for PermanentError + expect(attemptCount).toBe(1); + }, 31000); + + it("should exhaust retries when handler limits by attempt count", async () => { + let attemptCount = 0; + const maxAttempts = 3; + + const alwaysFailsActivity = async (_: ActivityContext) => { + attemptCount++; + throw new Error(`Failure on attempt ${attemptCount}`); + }; + + // Retry handler: retry up to maxAttempts + const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => { + return ctx.lastAttemptNumber < maxAttempts; + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(alwaysFailsActivity, undefined, { retry: retryHandler }); + return result; + }; + + taskHubWorker.addActivity(alwaysFailsActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + expect(attemptCount).toBe(maxAttempts); + }, 31000); + + it("should filter retries based on error type using handler", async () => { + let attemptCount = 0; + + const activityWithMixedErrors = async (_: ActivityContext) => { + attemptCount++; + if (attemptCount === 1) { + const error = new Error("Connection timeout"); + error.name = "TransientError"; + throw error; + } else { + const error = new Error("Invalid input"); + error.name = "ValidationError"; + throw error; + } + }; + + // Retry handler: only retry TransientError, not ValidationError + const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => { + return ctx.lastFailure.errorType === "TransientError"; + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(activityWithMixedErrors, undefined, { retry: retryHandler }); + return result; + }; + + taskHubWorker.addActivity(activityWithMixedErrors); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + // First: TransientError (retry), Second: ValidationError (stop) + expect(attemptCount).toBe(2); + }, 31000); + }); + + // ==================== Sub-Orchestration Retry Handler Tests ==================== + + describe("sub-orchestration retry with AsyncRetryHandler", () => { + it("should retry sub-orchestration when handler returns true", async () => { + let attemptCount = 0; + + // eslint-disable-next-line require-yield + const failingSubOrchestrator: TOrchestrator = async function* (_ctx: OrchestrationContext): any { + attemptCount++; + if (attemptCount < 3) { + throw new Error(`Sub-orchestration failure on attempt ${attemptCount}`); + } + return "sub-orch-result"; + }; + + const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => { + return ctx.lastAttemptNumber < 5; + }; + + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callSubOrchestrator(failingSubOrchestrator, undefined, { + retry: retryHandler, + }); + return result; + }; + + taskHubWorker.addOrchestrator(failingSubOrchestrator); + taskHubWorker.addOrchestrator(parentOrchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(parentOrchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify("sub-orch-result")); + expect(attemptCount).toBe(3); + }, 31000); + + it("should fail sub-orchestration when handler returns false", async () => { + let attemptCount = 0; + + // eslint-disable-next-line require-yield + const alwaysFailsSubOrch: TOrchestrator = async function* (_ctx: OrchestrationContext): any { + attemptCount++; + const error = new Error("Fatal sub-orchestration error"); + error.name = "FatalError"; + throw error; + }; + + // Don't retry FatalError + const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => { + return ctx.lastFailure.errorType !== "FatalError"; + }; + + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callSubOrchestrator(alwaysFailsSubOrch, undefined, { + retry: retryHandler, + }); + return result; + }; + + taskHubWorker.addOrchestrator(alwaysFailsSubOrch); + taskHubWorker.addOrchestrator(parentOrchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(parentOrchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + // Should only attempt once since handler returns false for FatalError + expect(attemptCount).toBe(1); + }, 31000); + }); + + // ==================== Retry Handler with Delay (number return) Tests ==================== + + describe("retry handler returning delay in milliseconds", () => { + it("should retry activity after specified delay when handler returns a positive number", async () => { + let attemptCount = 0; + const attemptTimes: number[] = []; + + const flakyActivity = async (_: ActivityContext, input: number) => { + attemptCount++; + attemptTimes.push(Date.now()); + if (attemptCount < 3) { + throw new Error(`Transient failure on attempt ${attemptCount}`); + } + return input * 2; + }; + + // Retry handler returning delay in ms (fixed 500ms delay) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const retryHandler: any = async (ctx: RetryContext): Promise => { + if (ctx.lastAttemptNumber >= 5) { + return false; + } + return 500; // retry after 500ms + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + const result = yield ctx.callActivity(flakyActivity, input, { retry: retryHandler }); + return result; + }; + + taskHubWorker.addActivity(flakyActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 21); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify(42)); + expect(attemptCount).toBe(3); + + // Verify delays are approximately 500ms + if (attemptTimes.length >= 3) { + const delay1 = attemptTimes[1] - attemptTimes[0]; + const delay2 = attemptTimes[2] - attemptTimes[1]; + // Allow generous tolerance for timer scheduling overhead + expect(delay1).toBeGreaterThanOrEqual(400); + expect(delay2).toBeGreaterThanOrEqual(400); + } + }, 31000); + + it("should support exponential backoff via handler returning increasing delays", async () => { + let attemptCount = 0; + const attemptTimes: number[] = []; + + const flakyActivity = async (_: ActivityContext) => { + attemptCount++; + attemptTimes.push(Date.now()); + if (attemptCount < 4) { + throw new Error(`Failure on attempt ${attemptCount}`); + } + return "success"; + }; + + // Retry handler implementing manual exponential backoff: 200ms, 400ms, 800ms + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const retryHandler: any = async (ctx: RetryContext): Promise => { + if (ctx.lastAttemptNumber >= 5) { + return false; + } + return 200 * Math.pow(2, ctx.lastAttemptNumber - 1); + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(flakyActivity, undefined, { retry: retryHandler }); + return result; + }; + + taskHubWorker.addActivity(flakyActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify("success")); + expect(attemptCount).toBe(4); + + // Verify delays are increasing + if (attemptTimes.length >= 4) { + const delay1 = attemptTimes[1] - attemptTimes[0]; + const delay2 = attemptTimes[2] - attemptTimes[1]; + const delay3 = attemptTimes[3] - attemptTimes[2]; + + // Each delay should be roughly double the previous (with tolerance) + expect(delay2).toBeGreaterThan(delay1 * 0.8); + expect(delay3).toBeGreaterThan(delay2 * 0.8); + } + }, 31000); + + it("should retry sub-orchestration with delay when handler returns a number", async () => { + let attemptCount = 0; + + // eslint-disable-next-line require-yield + const failingSubOrch: TOrchestrator = async function* (_ctx: OrchestrationContext): any { + attemptCount++; + if (attemptCount < 3) { + throw new Error(`Sub-orch failure on attempt ${attemptCount}`); + } + return "sub-orch-result"; + }; + + // Retry handler returning fixed delay + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const retryHandler: any = async (ctx: RetryContext): Promise => { + if (ctx.lastAttemptNumber >= 5) { + return false; + } + return 300; // 300ms delay + }; + + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callSubOrchestrator(failingSubOrch, undefined, { + retry: retryHandler, + }); + return result; + }; + + taskHubWorker.addOrchestrator(failingSubOrch); + taskHubWorker.addOrchestrator(parentOrchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(parentOrchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify("sub-orch-result")); + expect(attemptCount).toBe(3); + }, 31000); + + it("should handle handler switching between delay and false (stop)", async () => { + let attemptCount = 0; + + const activityWithMixedErrors = async (_: ActivityContext) => { + attemptCount++; + if (attemptCount === 1) { + const error = new Error("Transient error"); + error.name = "TransientError"; + throw error; + } else { + const error = new Error("Fatal error"); + error.name = "FatalError"; + throw error; + } + }; + + // Handler: returns delay for TransientError, false for FatalError + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const retryHandler: any = async (ctx: RetryContext): Promise => { + if (ctx.lastFailure.errorType === "TransientError") { + return 200; // retry after 200ms + } + return false; // don't retry + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(activityWithMixedErrors, undefined, { retry: retryHandler }); + return result; + }; + + taskHubWorker.addActivity(activityWithMixedErrors); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + // First: TransientError (retry with 200ms delay), Second: FatalError (stop) + expect(attemptCount).toBe(2); + }, 31000); + + it("should use sync RetryHandler returning a delay number", async () => { + let attemptCount = 0; + + const flakyActivity = async (_: ActivityContext) => { + attemptCount++; + if (attemptCount < 3) { + throw new Error(`Failure on attempt ${attemptCount}`); + } + return "success"; + }; + + // Sync handler returning a delay + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const syncHandler: any = (ctx: RetryContext): boolean | number => { + if (ctx.lastAttemptNumber >= 5) { + return false; + } + return 300; + }; + const retryHandler = toAsyncRetryHandler(syncHandler); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(flakyActivity, undefined, { retry: retryHandler }); + return result; + }; + + taskHubWorker.addActivity(flakyActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify("success")); + expect(attemptCount).toBe(3); + }, 31000); + }); + + // ==================== Sync RetryHandler Tests ==================== + + describe("sync RetryHandler support", () => { + it("should support synchronous RetryHandler via toAsyncRetryHandler", async () => { + let attemptCount = 0; + + const flakyActivity = async (_: ActivityContext) => { + attemptCount++; + if (attemptCount < 3) { + throw new Error(`Transient failure on attempt ${attemptCount}`); + } + return "success"; + }; + + // Use synchronous RetryHandler converted via toAsyncRetryHandler + const syncHandler: RetryHandler = (ctx: RetryContext) => { + return ctx.lastAttemptNumber < 5; + }; + const retryHandler = toAsyncRetryHandler(syncHandler); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(flakyActivity, undefined, { retry: retryHandler }); + return result; + }; + + taskHubWorker.addActivity(flakyActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify("success")); + expect(attemptCount).toBe(3); + }, 31000); + }); +});