English 한국어
Redis-backed distributed job processing for fluo. It features decorator-based worker discovery, JSON-safe job serialization, and lifecycle-managed execution.
npm install @fluojs/queue @fluojs/redis- When you need to process long-running or resource-intensive tasks in the background.
- When you want to decouple expensive operations (e.g., sending emails, image processing) from the request-response cycle.
- When you need a distributed queue with retry logic, backoff, and dead-letter handling.
Create a job class and a worker class decorated with @QueueWorker.
import { QueueWorker } from '@fluojs/queue';
export class ProcessOrderJob {
constructor(public readonly orderId: string) {}
}
@QueueWorker(ProcessOrderJob, { attempts: 3, backoff: { type: 'fixed', delayMs: 5000 } })
export class OrderWorker {
async handle(job: ProcessOrderJob) {
console.log(`Processing order: ${job.orderId}`);
// Your logic here
}
}Import QueueModule and inject QueueLifecycleService to enqueue jobs.
QueueModule.forRoot(...) is the supported root entrypoint for application-level queue registration.
import { Module, Inject } from '@fluojs/core';
import { QueueModule, QueueLifecycleService } from '@fluojs/queue';
import { RedisModule } from '@fluojs/redis';
@Inject(QueueLifecycleService)
export class OrderService {
constructor(private readonly queue: QueueLifecycleService) {}
async placeOrder(id: string) {
await this.queue.enqueue(new ProcessOrderJob(id));
}
}
@Module({
imports: [
RedisModule.forRoot({ host: 'localhost', port: 6379 }),
QueueModule.forRoot(),
],
providers: [OrderService, OrderWorker],
})
export class AppModule {}Leave clientName unset to keep using the default @fluojs/redis client from your app. If your queues should use a non-default Redis connection, set clientName to the name registered with RedisModule.forRoot({ name, ... }).
QueueModule.forRoot({ clientName: 'jobs' })@fluojs/queue resolves that Redis client during application bootstrap, then creates queue-owned duplicate connections for BullMQ. The shared @fluojs/redis client remains owned by RedisModule; Queue closes only the duplicate BullMQ connections it creates. Those duplicate connections are configured with BullMQ's required maxRetriesPerRequest: null worker setting so startup behavior matches BullMQ's runtime constraints.
Queue discovers workers and creates queue-owned BullMQ resources during application bootstrap, but BullMQ worker processors are started only after the runtime marks the full application bootstrap/readiness sequence complete. Jobs enqueued by other onApplicationBootstrap() hooks can be accepted once the Queue service is initialized, and their processors run after the bootstrap-ready handoff instead of racing ahead of later async bootstrap hooks or application readiness. Queue status reports degraded readiness until those BullMQ processors have actually started; if a processor fails to start, the lifecycle moves to failed and status snapshots expose the failure instead of reporting the workers as ready.
Application shutdown marks Queue as stopping, rejects new enqueue attempts, closes queue-owned workers/queues/connections, and drains pending dead-letter writes. Worker shutdown is bounded by workerShutdownTimeoutMs so an active processor that never settles cannot block application shutdown indefinitely. When the timeout elapses, Queue logs the timeout and asks BullMQ to force-close the worker before continuing resource cleanup.
Workers can be configured with a maximum number of attempts and backoff strategies to handle transient failures automatically.
@QueueWorker(MyJob, {
attempts: 5,
backoff: { type: 'exponential', delayMs: 1000 }
})When a worker exhausts its retry attempts, Queue appends a dead-letter record to Redis (fluo:queue:dead-letter:<jobName>) for manual inspection or recovery. Queue does not move the BullMQ job itself.
QueueModule.forRoot() keeps the most recent 1_000 dead-letter entries per job by default. Set defaultDeadLetterMaxEntries: false to opt out, or provide a smaller positive number when operators need a tighter retention budget.
Jobs must be JSON-serializable plain objects. Queue serializes the job payload before enqueueing and rehydrates the job prototype on the worker side.
Treat low-level provider assembly as an internal implementation detail: low-level provider helpers are not part of the documented root-barrel contract.
QueueModule: Main entry point for queue registration.QueueModule.forRoot(options): Registers queue support for an application module.QueueLifecycleService: Primary service for enqueuing jobs and creating lifecycle/status snapshots (enqueue(job),createPlatformStatusSnapshot()).@QueueWorker(JobClass, options?): Decorator to mark a class as a job handler.QUEUE: Compatibility injection token for the queue facade.createQueuePlatformStatusSnapshot(...): Status snapshot helper for lifecycle/readiness diagnostics.
Queue: Compatibility facade withenqueue(job)for application code and theQUEUEtoken.QueueJobType: Constructor type used to identify and rehydrate a job payload class.QueueModuleOptions: Global queue settings (global, clientName, default attempts,defaultBackoff, concurrency, rate limiting, dead-letter retention).QueueWorkerOptions: Per-job settings (attempts, backoff, concurrency, jobName, rate limiting).QueueBackoffType: Supported retry backoff strategy names (fixed,exponential).QueueBackoffOptions: Retry backoff settings (type,delayMs).QueueRateLimiterOptions: Worker-level distributed rate limiter settings (max,duration).QueueLifecycleState: Lifecycle states reported by Queue status adapters (idle,starting,started,stopping,stopped,failed).QueueStatusAdapterInput: Normalized queue metrics and worker-start diagnostics passed tocreateQueuePlatformStatusSnapshot(...).QueuePlatformStatusSnapshot: Queue-specific readiness, health, ownership, and detail snapshot returned by the status helper andQueueLifecycleService.createPlatformStatusSnapshot().
QueueModuleOptions also includes lifecycle and dead-letter retention controls such as workerShutdownTimeoutMs and defaultDeadLetterMaxEntries.
QueueModuleOptions lifecycle/status controls:
global: whether the queue module registration is global. Defaults totrue; setfalsewhen queue providers should stay scoped to the importing module graph.workerShutdownTimeoutMs: maximum time to wait for active worker processors during shutdown before force-closing the BullMQ worker. Defaults to30_000.defaultDeadLetterMaxEntries: maximum retained dead-letter records per job, orfalseto disable trimming. Defaults to1_000.
QueueLifecycleService.createPlatformStatusSnapshot() uses the same public snapshot contract as createQueuePlatformStatusSnapshot(...). It reports readiness as ready only after Queue reaches started and every discovered BullMQ worker processor has started. started resources with pending processors report degraded readiness, starting reports degraded readiness, stopping/stopped report not-ready, and worker-start failures report not-ready/unhealthy with workerStartFailures and lastWorkerStartFailure details. Snapshot details include the Redis dependency id, lifecycle state, ready/discovered worker counts, pending dead-letter writes, the dead-letter drain timeout, and workerShutdownTimeoutMs.
Only singleton @QueueWorker() providers/controllers are registered. Request/transient workers are skipped during discovery.
@fluojs/redis: Required as the backing store for job persistence.@fluojs/cron: For scheduled/recurring background tasks.
packages/queue/src/module.test.ts: Worker discovery and enqueueing tests.packages/queue/src/public-surface.test.ts: Public API contract verification.packages/queue/src/status.test.ts: Queue lifecycle status snapshot tests.