Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 34 additions & 26 deletions src/core/consumer/RunMQConsumerCreator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,39 @@ export class RunMQConsumerCreator {
// drops there (issue #19).
await consumerChannel.confirmSelect();

// Build the processor decorator chain ONCE per consumer. Every
// processor (and the deserializer) is stateless across messages — the
// only per-message input is the RabbitMQMessage passed to consume().
// Rebuilding the chain per delivery allocated 8 objects on every hot-
// path message; hoisting it leaves only the unavoidable
// RabbitMQMessage allocation below.
const processorChain = new RunMQExceptionLoggerProcessor(
new RunMQSucceededMessageAcknowledgerProcessor(
new RunMQFailedMessageRejecterProcessor(
new RunMQRetriesCheckerProcessor(
new RunMQFailureLoggerProcessor(
new RunMQSchemaFailureProcessor(
new RunMQBaseProcessor<T>(
consumerConfiguration.processor,
consumerConfiguration.processorConfig,
new DefaultDeserializer<T>()
),
consumerConfiguration.processorConfig,
DLQPublisher,
this.logger
),
this.logger,
this.logFullMessagePayload,
),
consumerConfiguration.processorConfig,
this.logger,
this.logFullMessagePayload,
),
this.logger
),
this.logger
), this.logger);

const prefetchCount = consumerConfiguration.processorConfig.prefetch ?? DEFAULTS.PREFETCH_COUNT;
await consumerChannel.prefetch(prefetchCount);
await consumerChannel.consume(consumerConfiguration.processorConfig.name, async (msg) => {
Expand All @@ -86,32 +119,7 @@ export class RunMQConsumerCreator {
msg.properties.headers,
)
try {
await new RunMQExceptionLoggerProcessor(
new RunMQSucceededMessageAcknowledgerProcessor(
new RunMQFailedMessageRejecterProcessor(
new RunMQRetriesCheckerProcessor(
new RunMQFailureLoggerProcessor(
new RunMQSchemaFailureProcessor(
new RunMQBaseProcessor<T>(
consumerConfiguration.processor,
consumerConfiguration.processorConfig,
new DefaultDeserializer<T>()
),
consumerConfiguration.processorConfig,
DLQPublisher,
this.logger
),
this.logger,
this.logFullMessagePayload,
),
consumerConfiguration.processorConfig,
this.logger,
this.logFullMessagePayload,
),
this.logger
),
this.logger
), this.logger).consume(rabbitmqMessage)
await processorChain.consume(rabbitmqMessage)
} catch (e) {
// Last-resort guard: nothing above should throw, but if it does
// we must not let the rejection propagate into amqplib's
Expand Down
Loading