Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions docs/1.docs/50.tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,18 @@ The `defineTask` helper accepts an object with the following properties:

- **`meta`** (optional): An object with optional `name` and `description` string fields used for display in the dev server and CLI.
- **`run`** (required): A function that receives a [`TaskEvent`](#taskevent) and returns (or resolves to) an object with an optional `result` property.
- **`concurrency`** (optional): Controls how concurrent calls are handled. Defaults to `{ mode: "dedupe" }`.

```ts
interface Task<RT = unknown> {
meta?: { name?: string; description?: string };
run(event: TaskEvent): { result?: RT } | Promise<{ result?: RT }>;
concurrency?:
| { mode: "parallel" }
| {
mode: "dedupe" | "serial";
key?: (event: TaskEvent) => string;
};
}
```

Expand Down Expand Up @@ -276,7 +283,24 @@ The `--payload` flag accepts a JSON string that will be parsed and passed to the

### Concurrency

Each task can have **one running instance**. Calling a task of same name multiple times in parallel, results in calling it once and all callers will get the same return value.
By default, each task can have **one running instance for the same payload**. Calling a task of the same name and payload multiple times in parallel runs it once, and all callers receive the same return value.

> [!NOTE]
> Nitro tasks can be running multiple times and in parallel.
You can customize this behavior with the `concurrency` option:

- **`dedupe`**: Coalesces concurrent calls to the same task and key into one execution. All callers wait for the same result. This is the default mode.
- **`parallel`**: Allows every call to run as an independent task instance.
- **`serial`**: Queues concurrent calls to the same task and key so they run one after another.

For `dedupe` and `serial`, you can provide a `key` function to derive the execution key from the task event. If no key function is provided, Nitro uses the hash of the task payload.

```ts [tasks/report/generate.ts]
export default defineTask({
concurrency: {
mode: "serial",
key: ({ payload }) => payload.tenant,
},
async run({ payload }) {
return { result: await generateTenantReport(payload.tenant) };
},
});
```
89 changes: 77 additions & 12 deletions src/runtime/internal/task.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { Cron } from "croner";
import { HTTPError } from "h3";
import type { Task, TaskContext, TaskEvent, TaskPayload, TaskResult } from "nitro/types";
import { hash } from "ohash";
import type {
Task,
TaskContext,
TaskConcurrency,
TaskEvent,
TaskPayload,
TaskResult,
} from "nitro/types";
import { scheduledTasks, tasks } from "#nitro/virtual/tasks";

/** @experimental */
Expand All @@ -13,17 +21,14 @@ export function defineTask<RT = unknown>(def: Task<RT>): Task<RT> {
return def;
}

const __runningTasks__: { [name: string]: ReturnType<Task<any>["run"]> } = {};
const __runningTasks__ = new Map<string, Promise<TaskResult>>();
const __serialQueues__ = new Map<string, Promise<void>>();

/** @experimental */
export async function runTask<RT = unknown>(
name: string,
{ payload = {}, context = {} }: { payload?: TaskPayload; context?: TaskContext } = {}
): Promise<TaskResult<RT>> {
if (__runningTasks__[name]) {
return __runningTasks__[name];
}

if (!(name in tasks)) {
throw new HTTPError({
message: `Task \`${name}\` is not available!`,
Expand All @@ -40,13 +45,24 @@ export async function runTask<RT = unknown>(

const handler = (await tasks[name].resolve!()) as Task<RT>;
const taskEvent: TaskEvent = { name, payload, context };
__runningTasks__[name] = handler.run(taskEvent);
const concurrency: TaskConcurrency = handler.concurrency ?? { mode: "dedupe" };

try {
const res = await __runningTasks__[name];
return res;
} finally {
delete __runningTasks__[name];
switch (concurrency.mode) {
case "parallel": {
return _callTask(handler, taskEvent);
}
case "dedupe": {
const key = _getTaskConcurrencyKey(concurrency, taskEvent);
return _runTaskOnce(key, () => _callTask(handler, taskEvent));
}
case "serial": {
const key = _getTaskConcurrencyKey(concurrency, taskEvent);
return _runTaskSerially(key, () => _callTask(handler, taskEvent));
}
default: {
const mode = (concurrency as { mode: string }).mode;
throw new Error(`Task \`${name}\` has an invalid concurrency mode: "${mode}"`);
}
}
}

Expand Down Expand Up @@ -92,3 +108,52 @@ export function runCronTasks(
): Promise<TaskResult[]> {
return Promise.all(getCronTasks(cron).map((name) => runTask(name, ctx)));
}

async function _callTask<RT>(handler: Task<RT>, taskEvent: TaskEvent): Promise<TaskResult<RT>> {
return await handler.run(taskEvent);
}

function _getTaskConcurrencyKey(
concurrency: Exclude<TaskConcurrency, { mode: "parallel" }>,
taskEvent: TaskEvent
): string {
const key = concurrency.key ? concurrency.key(taskEvent) : hash(taskEvent.payload);
return `${taskEvent.name}:${key}`;
}

function _runTaskOnce<RT>(
key: string,
run: () => Promise<TaskResult<RT>>
): Promise<TaskResult<RT>> {
const running = __runningTasks__.get(key);
if (running) {
return running as Promise<TaskResult<RT>>;
}

const promise = run().finally(() => {
if (__runningTasks__.get(key) === promise) {
__runningTasks__.delete(key);
}
});
__runningTasks__.set(key, promise);

return promise;
}

function _runTaskSerially<RT>(
key: string,
run: () => Promise<TaskResult<RT>>
): Promise<TaskResult<RT>> {
const previous = __serialQueues__.get(key) ?? Promise.resolve();
const promise = previous.then(run);
const queue = promise
.catch(() => {})
.then(() => {
if (__serialQueues__.get(key) === queue) {
__serialQueues__.delete(key);
}
});
__serialQueues__.set(key, queue);

return promise;
}
25 changes: 25 additions & 0 deletions src/types/runtime/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,35 @@ export interface TaskResult<RT = unknown> {
result?: RT;
}

/**
* Controls how concurrent calls to the same task are handled.
*
* - `"parallel"`: Allow multiple instances of the same task to run concurrently.
* - `"dedupe"`: Coalesce concurrent calls with the same key into a single execution.
* All callers await the same promise and receive the same result. (default)
* - `"serial"`: Queue concurrent calls with the same key so they run one after another.
*
* @experimental
* @default { mode: "dedupe" }
*/
export type TaskConcurrency =
| { mode: "parallel" }
| {
mode: "dedupe" | "serial";
/**
* Derives the dedupe or serial queue key from the task event.
* If omitted, the task payload hash is used.
*
* @default (event) => hash(event.payload)
*/
key?: (event: TaskEvent) => string;
};

/** @experimental */
export interface Task<RT = unknown> {
meta?: TaskMeta;
run(event: TaskEvent): MaybePromise<{ result?: RT }>;
concurrency?: TaskConcurrency;
}

/** @experimental */
Expand Down
Loading