From ef80d975daaf1e71dff30ff920a98675d4c64df1 Mon Sep 17 00:00:00 2001 From: Fawzi Abdulfattah Date: Sun, 7 Jun 2026 00:58:37 +0200 Subject: [PATCH] perf(consumer): build processor chain once per consumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The processor decorator chain (8 objects + the deserializer) was rebuilt on every delivery inside the consume callback. Every processor and the deserializer are stateless across messages — the only per-message input is the RabbitMQMessage passed to consume() — so the chain can be constructed once per consumer and reused. Hoisting it out of the hot path removes 8 allocations per consumed message, leaving only the unavoidable RabbitMQMessage allocation. Behavior is unchanged; the last-resort try/catch around consume() is preserved. --- src/core/consumer/RunMQConsumerCreator.ts | 60 +++++++++++++---------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts index b5e38bd..161e89e 100644 --- a/src/core/consumer/RunMQConsumerCreator.ts +++ b/src/core/consumer/RunMQConsumerCreator.ts @@ -73,6 +73,39 @@ export class RunMQConsumerCreator { // drops there (issue #19). await consumerChannel.confirmSelect(); + // Build the processor decorator chain ONCE per consumer. Every + // processor (and the deserializer) is stateless across messages — the + // only per-message input is the RabbitMQMessage passed to consume(). + // Rebuilding the chain per delivery allocated 8 objects on every hot- + // path message; hoisting it leaves only the unavoidable + // RabbitMQMessage allocation below. + const processorChain = new RunMQExceptionLoggerProcessor( + new RunMQSucceededMessageAcknowledgerProcessor( + new RunMQFailedMessageRejecterProcessor( + new RunMQRetriesCheckerProcessor( + new RunMQFailureLoggerProcessor( + new RunMQSchemaFailureProcessor( + new RunMQBaseProcessor( + consumerConfiguration.processor, + consumerConfiguration.processorConfig, + new DefaultDeserializer() + ), + consumerConfiguration.processorConfig, + DLQPublisher, + this.logger + ), + this.logger, + this.logFullMessagePayload, + ), + consumerConfiguration.processorConfig, + this.logger, + this.logFullMessagePayload, + ), + this.logger + ), + this.logger + ), this.logger); + const prefetchCount = consumerConfiguration.processorConfig.prefetch ?? DEFAULTS.PREFETCH_COUNT; await consumerChannel.prefetch(prefetchCount); await consumerChannel.consume(consumerConfiguration.processorConfig.name, async (msg) => { @@ -86,32 +119,7 @@ export class RunMQConsumerCreator { msg.properties.headers, ) try { - await new RunMQExceptionLoggerProcessor( - new RunMQSucceededMessageAcknowledgerProcessor( - new RunMQFailedMessageRejecterProcessor( - new RunMQRetriesCheckerProcessor( - new RunMQFailureLoggerProcessor( - new RunMQSchemaFailureProcessor( - new RunMQBaseProcessor( - consumerConfiguration.processor, - consumerConfiguration.processorConfig, - new DefaultDeserializer() - ), - consumerConfiguration.processorConfig, - DLQPublisher, - this.logger - ), - this.logger, - this.logFullMessagePayload, - ), - consumerConfiguration.processorConfig, - this.logger, - this.logFullMessagePayload, - ), - this.logger - ), - this.logger - ), this.logger).consume(rabbitmqMessage) + await processorChain.consume(rabbitmqMessage) } catch (e) { // Last-resort guard: nothing above should throw, but if it does // we must not let the rejection propagate into amqplib's