diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts index b5e38bd..161e89e 100644 --- a/src/core/consumer/RunMQConsumerCreator.ts +++ b/src/core/consumer/RunMQConsumerCreator.ts @@ -73,6 +73,39 @@ export class RunMQConsumerCreator { // drops there (issue #19). await consumerChannel.confirmSelect(); + // Build the processor decorator chain ONCE per consumer. Every + // processor (and the deserializer) is stateless across messages — the + // only per-message input is the RabbitMQMessage passed to consume(). + // Rebuilding the chain per delivery allocated 8 objects on every hot- + // path message; hoisting it leaves only the unavoidable + // RabbitMQMessage allocation below. + const processorChain = new RunMQExceptionLoggerProcessor( + new RunMQSucceededMessageAcknowledgerProcessor( + new RunMQFailedMessageRejecterProcessor( + new RunMQRetriesCheckerProcessor( + new RunMQFailureLoggerProcessor( + new RunMQSchemaFailureProcessor( + new RunMQBaseProcessor( + consumerConfiguration.processor, + consumerConfiguration.processorConfig, + new DefaultDeserializer() + ), + consumerConfiguration.processorConfig, + DLQPublisher, + this.logger + ), + this.logger, + this.logFullMessagePayload, + ), + consumerConfiguration.processorConfig, + this.logger, + this.logFullMessagePayload, + ), + this.logger + ), + this.logger + ), this.logger); + const prefetchCount = consumerConfiguration.processorConfig.prefetch ?? DEFAULTS.PREFETCH_COUNT; await consumerChannel.prefetch(prefetchCount); await consumerChannel.consume(consumerConfiguration.processorConfig.name, async (msg) => { @@ -86,32 +119,7 @@ export class RunMQConsumerCreator { msg.properties.headers, ) try { - await new RunMQExceptionLoggerProcessor( - new RunMQSucceededMessageAcknowledgerProcessor( - new RunMQFailedMessageRejecterProcessor( - new RunMQRetriesCheckerProcessor( - new RunMQFailureLoggerProcessor( - new RunMQSchemaFailureProcessor( - new RunMQBaseProcessor( - consumerConfiguration.processor, - consumerConfiguration.processorConfig, - new DefaultDeserializer() - ), - consumerConfiguration.processorConfig, - DLQPublisher, - this.logger - ), - this.logger, - this.logFullMessagePayload, - ), - consumerConfiguration.processorConfig, - this.logger, - this.logFullMessagePayload, - ), - this.logger - ), - this.logger - ), this.logger).consume(rabbitmqMessage) + await processorChain.consume(rabbitmqMessage) } catch (e) { // Last-resort guard: nothing above should throw, but if it does // we must not let the rejection propagate into amqplib's