From a4c02a51c6a3cc2d2963320a6f712b09f1a638d2 Mon Sep 17 00:00:00 2001 From: Fawzi Abdulfattah Date: Sun, 10 May 2026 03:01:22 +0200 Subject: [PATCH 1/4] feat!: publisher confirms (DLQ always-on, user-publish on by default) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #19, closes #28. #19 — DLQ publish is fire-and-forget, then immediately ack'd The retry checker called this.DLQPublisher.publish(...) without awaiting and called acknowledgeMessage(message) on the next line. If the broker didn't accept the DLQ publish (channel closed mid-flight, alarm state, no route), the original message was ack'd and the DLQ publish silently disappeared — permanent message loss. #28 — No publisher confirms anywhere RabbitMQClientChannel.publish returned true immediately. There was no way for callers to detect a broker-rejected publish. Changes: - AMQPChannel.publish becomes Promise. Added confirmSelect() to the channel interface. - RabbitMQClientChannel.publish awaits the underlying basicPublish. When confirmSelect() was called on the channel, that promise resolves only after broker ack — true publisher confirms. - RunMQPublisher.publish, RunMQBaseProducer.publish, RunMQFailureLoggerProducer.publish, and RunMQ.publish are all async (Promise). - Consumer channels enable confirmSelect() unconditionally — DLQ publishes flow through them and the message-loss risk is not negotiable. - The user publish channel enables confirmSelect() by default. Set RunMQConnectionConfig.usePublisherConfirms to false to opt out and fall back to fire-and-forget publishing. - RunMQRetriesCheckerProcessor.moveToFinalDeadLetter is now async and awaited. On DLQ publish failure: log + nack(false) so the message goes back through the retry pipeline. The retry-delay queue's TTL provides natural backoff before the next DLQ attempt — message is never lost. Tests: - Updated mocks (MockedAMQPChannel, MockedRabbitMQChannel, MockedRabbitMQPublisher) for the new async signatures and confirmSelect. - Updated RunMQ + producer unit tests for await-able publish. - New e2e suite tests/e2e/RunMQ.dlqConfirm.e2e.test.ts: * Injects a failure on the first DLQ publish; verifies the message is redelivered (handler called again) and eventually lands in DLQ on the next attempt — proves no message loss. * Happy-path: handler always fails, attempts=1, message reaches DLQ cleanly. All 140 unit + 83 e2e tests pass. BREAKING CHANGE: RunMQ.publish is now async and returns Promise instead of void, and resolves only after broker ack by default. Callers must await the call (or chain `.then`/`.catch`) to surface broker rejections — silent drops are no longer possible unless you opt out via `usePublisherConfirms: false` in the connection config. The same breaking change applies to AMQPChannel.publish and RunMQPublisher.publish on the public surface. Co-Authored-By: Claude Opus 4.7 --- src/core/RunMQ.ts | 18 ++- src/core/clients/RabbitMQClientChannel.ts | 9 +- src/core/consumer/RunMQConsumerCreator.ts | 5 + .../RunMQRetriesCheckerProcessor.ts | 22 ++- .../publisher/producers/RunMQBaseProducer.ts | 4 +- .../producers/RunMQFailureLoggerProducer.ts | 7 +- src/types/index.ts | 29 +++- tests/e2e/RunMQ.dlqConfirm.e2e.test.ts | 131 ++++++++++++++++++ tests/e2e/RunMQ.e2e.test.ts | 7 +- tests/mocks/MockedAMQPChannel.ts | 4 +- tests/mocks/MockedRabbitMQChannel.ts | 3 +- tests/mocks/MockedRunMQPublisher.ts | 2 +- tests/unit/core/RunMQ.test.ts | 11 +- .../producers/RunMQBaseProducer.test.ts | 12 +- .../RunMQFailureLoggerProducer.test.ts | 117 +++++----------- 15 files changed, 260 insertions(+), 121 deletions(-) create mode 100644 tests/e2e/RunMQ.dlqConfirm.e2e.test.ts diff --git a/src/core/RunMQ.ts b/src/core/RunMQ.ts index b98bece..b6ca8e3 100644 --- a/src/core/RunMQ.ts +++ b/src/core/RunMQ.ts @@ -56,17 +56,25 @@ export class RunMQ { } /** - * Publishes a message to the specified topic with an optional correlation ID + * Publishes a message to the specified topic with an optional correlation ID. + * + * If publisher confirms are enabled (`usePublisherConfirms: true` in the + * connection config), the returned promise resolves only after RabbitMQ + * acknowledges the message; if the broker rejects, the promise rejects. + * Otherwise it resolves once the message is flushed to the TCP socket + * (fire-and-forget, no delivery guarantee — same behavior as before + * publisher confirms were introduced). + * * @param topic The name of the topic to publish the message to * @param message The message payload to be published * @param correlationId (Optional) A unique identifier for correlating messages; if not provided, a new UUID will be generated */ - public publish(topic: string, message: Record, correlationId: string = RunMQUtils.generateUUID()): void { + public async publish(topic: string, message: Record, correlationId: string = RunMQUtils.generateUUID()): Promise { if (!this.publisher || !this.defaultChannel) { throw new RunMQException(Exceptions.NOT_INITIALIZED, {}); } RunMQUtils.assertRecord(message); - this.publisher.publish(topic, + await this.publisher.publish(topic, RabbitMQMessage.from( message, this.defaultChannel, @@ -76,7 +84,6 @@ export class RunMQ { this.logger.info(`Published message`, { topic, correlationId, - message, }); } @@ -137,6 +144,9 @@ export class RunMQ { this.defaultChannel = await this.client.getDefaultChannel(); await this.defaultChannel.assertExchange(Constants.ROUTER_EXCHANGE_NAME, 'direct', {durable: true}); await this.defaultChannel.assertExchange(Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME, 'direct', {durable: true}); + if (this.config.usePublisherConfirms) { + await this.defaultChannel.confirmSelect(); + } this.publisher = new RunMQPublisherCreator(this.logger).createPublisher(); } } \ No newline at end of file diff --git a/src/core/clients/RabbitMQClientChannel.ts b/src/core/clients/RabbitMQClientChannel.ts index c710309..eff0f32 100644 --- a/src/core/clients/RabbitMQClientChannel.ts +++ b/src/core/clients/RabbitMQClientChannel.ts @@ -106,8 +106,8 @@ export class RabbitMQClientChannel implements AMQPChannel { }); } - publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): boolean { - this.channel.basicPublish({ + async publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): Promise { + await this.channel.basicPublish({ exchange, routingKey, correlationId: options?.correlationId, @@ -124,7 +124,10 @@ export class RabbitMQClientChannel implements AMQPChannel { userId: options?.userId, appId: options?.appId, }, content); - return true; + } + + async confirmSelect(): Promise { + await this.channel.confirmSelect(); } async consume( diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts index f01ecb3..b948ebe 100644 --- a/src/core/consumer/RunMQConsumerCreator.ts +++ b/src/core/consumer/RunMQConsumerCreator.ts @@ -56,6 +56,11 @@ export class RunMQConsumerCreator { const consumerChannel = await this.getProcessorChannel(); const DLQPublisher = new RunMQPublisherCreator(this.logger).createPublisher(Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME); + // Always enable publisher confirms on the consumer channel: DLQ + // publishes flow through this channel and we cannot tolerate silent + // drops there (issue #19). + await consumerChannel.confirmSelect(); + await consumerChannel.prefetch(DEFAULTS.PREFETCH_COUNT); await consumerChannel.consume(consumerConfiguration.processorConfig.name, async (msg) => { if (msg) { diff --git a/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts b/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts index 727d253..ba369c1 100644 --- a/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts +++ b/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts @@ -22,7 +22,23 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer { } catch (e: unknown) { if (this.hasReachedMaxRetries(message)) { this.logMaxRetriesReached(message); - this.moveToFinalDeadLetter(message); + try { + await this.moveToFinalDeadLetter(message); + } catch (publishError) { + // DLQ publish failed (broker rejected, channel closed, etc.). + // Do NOT ack — that would lose the message. nack(false) + // sends it back through the retry pipeline, where it'll + // come right back here on the next attempt with a natural + // backoff (the retry-delay-queue TTL). If the underlying + // failure is transient, the next attempt's DLQ publish + // will succeed. + this.logger.error('Failed to publish to DLQ — message will be redelivered', { + correlationId: message.correlationId, + cause: publishError instanceof Error ? publishError.message : String(publishError), + }); + message.nack(false); + return false; + } this.acknowledgeMessage(message); return false; } @@ -45,7 +61,7 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer { ); } - private moveToFinalDeadLetter(message: RabbitMQMessage) { + private async moveToFinalDeadLetter(message: RabbitMQMessage): Promise { const originalPayload = this.extractOriginalPayload(message); const dlqMessage = new RabbitMQMessage( originalPayload, @@ -55,7 +71,7 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer { message.amqpMessage, message.headers ); - this.DLQPublisher.publish(ConsumerCreatorUtils.getDLQTopicName(this.config.name), dlqMessage) + await this.DLQPublisher.publish(ConsumerCreatorUtils.getDLQTopicName(this.config.name), dlqMessage); } private extractOriginalPayload(message: RabbitMQMessage): any { diff --git a/src/core/publisher/producers/RunMQBaseProducer.ts b/src/core/publisher/producers/RunMQBaseProducer.ts index b220825..d1640cb 100644 --- a/src/core/publisher/producers/RunMQBaseProducer.ts +++ b/src/core/publisher/producers/RunMQBaseProducer.ts @@ -8,7 +8,7 @@ export class RunMQBaseProducer implements RunMQPublisher { constructor(private serializer: Serializer, private exchange = Constants.ROUTER_EXCHANGE_NAME) { } - publish(topic: string, message: RabbitMQMessage): void { + async publish(topic: string, message: RabbitMQMessage): Promise { const runMQMessage = new RunMQMessage( message.message, new RunMQMessageMeta( @@ -17,7 +17,7 @@ export class RunMQBaseProducer implements RunMQPublisher { message.correlationId, )); const serialized = this.serializer.serialize(runMQMessage); - message.channel.publish(this.exchange, topic, Buffer.from(serialized), { + await message.channel.publish(this.exchange, topic, Buffer.from(serialized), { correlationId: message.correlationId, messageId: message.id, headers: message.headers, diff --git a/src/core/publisher/producers/RunMQFailureLoggerProducer.ts b/src/core/publisher/producers/RunMQFailureLoggerProducer.ts index 91e0b2f..c427daf 100644 --- a/src/core/publisher/producers/RunMQFailureLoggerProducer.ts +++ b/src/core/publisher/producers/RunMQFailureLoggerProducer.ts @@ -6,12 +6,13 @@ export class RunMQFailureLoggerProducer implements RunMQPublisher { constructor(private producer: RunMQPublisher, private logger: RunMQLogger) { } - publish(topic: string, message: RabbitMQMessage): void { + async publish(topic: string, message: RabbitMQMessage): Promise { try { - this.producer.publish(topic, message); + await this.producer.publish(topic, message); } catch (e) { this.logger.error('Message publishing failed', { - message: message, + topic, + correlationId: message.correlationId, error: e instanceof Error ? e.message : JSON.stringify(e), stack: e instanceof Error ? e.stack : undefined, }); diff --git a/src/types/index.ts b/src/types/index.ts index 15c0c5f..10fcf21 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -150,8 +150,20 @@ export interface AMQPChannel { /** * Publishes a message to an exchange. + * + * If `confirmSelect()` was called on this channel, the returned promise + * resolves only after the broker acknowledges the message; if the broker + * rejects, the promise rejects. Otherwise it resolves once the message is + * flushed to the TCP socket. + */ + publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): Promise; + + /** + * Enables publisher confirms on this channel. After this is called every + * `publish()` waits for broker acknowledgement before resolving. + * https://www.rabbitmq.com/confirms.html#publisher-confirms */ - publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): boolean; + confirmSelect(): Promise; /** * Starts consuming messages from a queue. @@ -215,6 +227,19 @@ export interface RunMQConnectionConfig { username: string; password: string; }; + /** + * If true, `runMQ.publish()` waits for RabbitMQ to acknowledge each + * message before resolving, and rejects on broker error (e.g. mandatory + * routing failure, alarm state). Default false — publish resolves once + * the message is written to the TCP socket (fire-and-forget). + * + * Trade-off: confirms add a broker round-trip per publish (typically a + * few hundred microseconds), but they're the only way to detect silent + * publish failures. DLQ publishes from the consumer chain are *always* + * confirmed regardless of this setting — the message-loss risk there is + * not negotiable. + */ + usePublisherConfirms?: boolean; } export type SchemaFailureStrategy = 'dlq' @@ -322,5 +347,5 @@ export interface RunMQConsumer { export interface RunMQPublisher { - publish: (topic: string, message: RabbitMQMessage) => void; + publish: (topic: string, message: RabbitMQMessage) => Promise; } \ No newline at end of file diff --git a/tests/e2e/RunMQ.dlqConfirm.e2e.test.ts b/tests/e2e/RunMQ.dlqConfirm.e2e.test.ts new file mode 100644 index 0000000..f5a3629 --- /dev/null +++ b/tests/e2e/RunMQ.dlqConfirm.e2e.test.ts @@ -0,0 +1,131 @@ +import {RunMQ} from '@src/core/RunMQ'; +import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter"; +import {RabbitMQClientChannel} from "@src/core/clients/RabbitMQClientChannel"; +import {Constants} from "@src/core/constants"; +import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; +import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; +import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample"; +import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessorConfigurationExample"; +import {RunMQMessageExample} from "@tests/Examples/RunMQMessageExample"; +import {MessageTestUtils} from "@tests/helpers/MessageTestUtils"; + +/** + * Integration tests for issues #19 + #28: publisher confirms. + * + * Before this fix, DLQ publishes from RunMQRetriesCheckerProcessor were + * fire-and-forget — the message was ack'd immediately even if the DLQ + * publish silently failed. That meant a network blip during a + * retry-exhausted message's DLQ handoff resulted in permanent message loss. + * + * The fix: enable publisher confirms on the consumer channel and await the + * DLQ publish. On failure, nack(false) so the message goes back through the + * retry pipeline (with natural backoff via the retry-delay queue's TTL) and + * gets another shot at reaching the DLQ. + */ +describe('RunMQ DLQ Publisher Confirms E2E', () => { + const validConfig = RunMQConnectionConfigExample.valid(); + + beforeEach(() => { + jest.restoreAllMocks(); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + it('does not lose the message when the DLQ publish fails — message is redelivered', async () => { + const configuration = RunMQProcessorConfigurationExample.simpleNoSchema('dlq_confirm_redeliver'); + + const testingConnection = new RabbitMQClientAdapter(validConfig); + const channel = await testingConnection.getChannel(); + await ChannelTestHelpers.deleteQueue(channel, configuration.name); + + const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); + let handlerCalls = 0; + await runMQ.process('dlq.confirm', configuration, () => { + handlerCalls++; + throw new Error('handler intentionally fails'); + }); + + // Make the FIRST DLQ publish fail (simulating broker rejection or + // channel closure mid-flight). Subsequent publishes succeed. + // This is the scenario where the old code silently lost messages. + let dlqPublishesAttempted = 0; + const original = RabbitMQClientChannel.prototype.publish; + jest.spyOn(RabbitMQClientChannel.prototype, 'publish') + .mockImplementation(async function (this: RabbitMQClientChannel, exchange: string, routingKey: string, content: Buffer, options) { + if (exchange === Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME) { + dlqPublishesAttempted++; + if (dlqPublishesAttempted === 1) { + throw new Error('simulated broker rejection on first DLQ publish'); + } + } + return original.call(this, exchange, routingKey, content, options); + }); + + try { + // Publish a message. Handler fails on first delivery → DLQ publish + // attempts → first publish fails → message is nacked back into the + // retry pipeline → retry-delay TTL → handler fails again → DLQ + // publish succeeds → message lands in DLQ. + channel.publish( + Constants.ROUTER_EXCHANGE_NAME, + 'dlq.confirm', + MessageTestUtils.buffer(RunMQMessageExample.random()), + ); + + // Default attemptsDelay is 100ms (simpleNoSchema config). + // We need: handler call, DLQ publish fail, retry-delay wait, handler call, DLQ publish succeeds. + await RunMQUtils.delay(2000); + + // Both DLQ publish attempts were made (first failed, second succeeded). + expect(dlqPublishesAttempted).toBeGreaterThanOrEqual(2); + // Handler ran multiple times due to redelivery — proves the message + // was nack'd back, NOT silently dropped. + expect(handlerCalls).toBeGreaterThanOrEqual(2); + // Message landed in DLQ on the second attempt. + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1); + // Main queue is empty. + await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0); + } finally { + await runMQ.disconnect(); + await testingConnection.disconnect(); + } + }); + + it('confirms DLQ publishes by default — broker ack required before original is acked', async () => { + const configuration = RunMQProcessorConfigurationExample.random( + 'dlq_confirm_happy_path', + 1, + 1, // attempts=1, so first failure → DLQ + 100, + ); + + const testingConnection = new RabbitMQClientAdapter(validConfig); + const channel = await testingConnection.getChannel(); + await ChannelTestHelpers.deleteQueue(channel, configuration.name); + + const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); + await runMQ.process('dlq.happy', configuration, () => { + throw new Error('handler always fails'); + }); + + channel.publish( + Constants.ROUTER_EXCHANGE_NAME, + 'dlq.happy', + MessageTestUtils.buffer(RunMQMessageExample.random()), + ); + + await RunMQUtils.delay(500); + + // Message should be in DLQ (with confirm received from broker). + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 1); + await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0); + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0); + + await runMQ.disconnect(); + await testingConnection.disconnect(); + }); +}); diff --git a/tests/e2e/RunMQ.e2e.test.ts b/tests/e2e/RunMQ.e2e.test.ts index 41e3f17..0a83cbd 100644 --- a/tests/e2e/RunMQ.e2e.test.ts +++ b/tests/e2e/RunMQ.e2e.test.ts @@ -160,12 +160,11 @@ describe('RunMQ E2E Tests', () => { await testingConnection.disconnect(); }) - it("should throw error when publishing invalid message", async () => { + it("should reject when publishing invalid message", async () => { const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); - expect(() => { - runMQ.publish("user.created", "invalid message" as any); - }).toThrow(RunMQException); + await expect(runMQ.publish("user.created", "invalid message" as any)) + .rejects.toThrow(RunMQException); await runMQ.disconnect(); }); diff --git a/tests/mocks/MockedAMQPChannel.ts b/tests/mocks/MockedAMQPChannel.ts index e46936d..b75ec63 100644 --- a/tests/mocks/MockedAMQPChannel.ts +++ b/tests/mocks/MockedAMQPChannel.ts @@ -39,7 +39,7 @@ export class MockedAMQPChannel implements AMQPChannel { bindQueue = jest.fn, [string, string, string, Record?]>().mockResolvedValue(); - publish = jest.fn().mockReturnValue(true); + publish = jest.fn, [string, string, Buffer, AMQPPublishOptions?]>().mockResolvedValue(); consume = jest.fn, [string, (msg: ConsumeMessage | null) => void, AMQPConsumeOptions?]>().mockResolvedValue({ consumerTag: 'test-consumer-tag' @@ -51,6 +51,8 @@ export class MockedAMQPChannel implements AMQPChannel { prefetch = jest.fn, [number, boolean?]>().mockResolvedValue(); + confirmSelect = jest.fn, []>().mockResolvedValue(); + close = jest.fn, []>().mockResolvedValue(); } diff --git a/tests/mocks/MockedRabbitMQChannel.ts b/tests/mocks/MockedRabbitMQChannel.ts index 91af045..bfaa059 100644 --- a/tests/mocks/MockedRabbitMQChannel.ts +++ b/tests/mocks/MockedRabbitMQChannel.ts @@ -9,12 +9,13 @@ export class MockedRabbitMQChannel implements AMQPChannel { deleteExchange = jest.fn(); checkExchange = jest.fn(); assertExchange = jest.fn(); - publish = jest.fn(); + publish = jest.fn().mockResolvedValue(undefined); consume = jest.fn(); get = jest.fn(); ack = jest.fn(); nack = jest.fn(); prefetch = jest.fn(); + confirmSelect = jest.fn().mockResolvedValue(undefined); close = jest.fn(); connection: Connection = {} as Connection; on = jest.fn(); diff --git a/tests/mocks/MockedRunMQPublisher.ts b/tests/mocks/MockedRunMQPublisher.ts index eb31b35..030e926 100644 --- a/tests/mocks/MockedRunMQPublisher.ts +++ b/tests/mocks/MockedRunMQPublisher.ts @@ -1,5 +1,5 @@ import {RunMQPublisher} from "@src/types"; export class MockedRabbitMQPublisher implements RunMQPublisher { - public publish = jest.fn(); + public publish = jest.fn().mockResolvedValue(undefined); } \ No newline at end of file diff --git a/tests/unit/core/RunMQ.test.ts b/tests/unit/core/RunMQ.test.ts index 20a669e..b0b6a4b 100644 --- a/tests/unit/core/RunMQ.test.ts +++ b/tests/unit/core/RunMQ.test.ts @@ -38,7 +38,7 @@ describe('RunMQ Unit Tests', () => { const setupPublisherMock = () => { const mockPublisherCreator = RunMQPublisherCreator as jest.MockedClass; - const mockPublisher = {publish: jest.fn()}; + const mockPublisher = {publish: jest.fn().mockResolvedValue(undefined)}; mockPublisherCreator.prototype.createPublisher.mockReturnValue(mockPublisher as any); return {mockPublisherCreator, mockPublisher}; }; @@ -142,13 +142,12 @@ describe('RunMQ Unit Tests', () => { }); describe('producer', () => { - it('should throw error if message is not a valid record', async () => { + it('should reject if message is not a valid record', async () => { setupSuccessfulClientMock(); const runMQ = await RunMQ.start(validConfig); - expect(() => { - runMQ.publish('test.topic', "invalid message" as any); - }).toThrow(RunMQException); + await expect(runMQ.publish('test.topic', "invalid message" as any)) + .rejects.toThrow(RunMQException); }); it('should publish message correctly if valid record', async () => { @@ -156,7 +155,7 @@ describe('RunMQ Unit Tests', () => { const {mockPublisher} = setupPublisherMock(); const runMQ = await RunMQ.start(validConfig); - runMQ.publish('test.topic', MessageExample.person()); + await runMQ.publish('test.topic', MessageExample.person()); expect(mockPublisher.publish).toHaveBeenCalledWith('test.topic', expect.any(Object)); }); diff --git a/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts b/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts index 7674205..7f99f2a 100644 --- a/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts +++ b/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts @@ -28,11 +28,11 @@ describe('RunMQBaseProducer Unit Tests', () => { }); describe('publish', () => { - it('should create RunMQMessage with generated ID and current timestamp', () => { + it('should create RunMQMessage with generated ID and current timestamp', async () => { const testMessage = MockedRabbitMQMessage; const testTopic = 'test.topic'; - producer.publish(testTopic, testMessage); + await producer.publish(testTopic, testMessage); expect(RunMQMessage).toHaveBeenCalledWith( testMessage.message, @@ -44,11 +44,11 @@ describe('RunMQBaseProducer Unit Tests', () => { ); }); - it('should serialize the RunMQMessage', () => { + it('should serialize the RunMQMessage', async () => { const testMessage = MockedRabbitMQMessage; const testTopic = 'test.topic'; - producer.publish(testTopic, testMessage); + await producer.publish(testTopic, testMessage); expect(mockedSerializer.serialize).toHaveBeenCalledTimes(1); expect(mockedSerializer.serialize).toHaveBeenCalledWith( @@ -63,7 +63,7 @@ describe('RunMQBaseProducer Unit Tests', () => { ); }); - it('should publish to the correct exchange and routing key', () => { + it('should publish to the correct exchange and routing key', async () => { jest.useFakeTimers().setSystemTime(new Date('2025-10-10T00:00:00Z')); const message = RunMQMessageExample.person(); const testMessage = mockedRabbitMQMessageWithChannelAndMessage( @@ -75,7 +75,7 @@ describe('RunMQBaseProducer Unit Tests', () => { const testTopic = 'test.topic'; const serializedMessage = JSON.stringify(message); - producer.publish(testTopic, testMessage); + await producer.publish(testTopic, testMessage); expect(mockedChannel.publish).toHaveBeenCalledWith( Constants.ROUTER_EXCHANGE_NAME, diff --git a/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts b/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts index 168370e..4b55b7d 100644 --- a/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts +++ b/tests/unit/core/publisher/producers/RunMQFailureLoggerProducer.test.ts @@ -10,136 +10,83 @@ describe('RunMQFailureLoggerProducer Unit Tests', () => { MockedRunMQLogger ); + beforeEach(() => { + jest.clearAllMocks(); + mockProducer.publish.mockResolvedValue(undefined); + }); + describe('publish', () => { - it('should delegate to wrapped producer when publish succeeds', () => { + it('should delegate to wrapped producer when publish succeeds', async () => { const testTopic = 'test.topic'; const testMessage = MockedRabbitMQMessage; - failureLoggerProducer.publish(testTopic, testMessage); + await failureLoggerProducer.publish(testTopic, testMessage); expect(mockProducer.publish).toHaveBeenCalledWith(testTopic, testMessage); expect(MockedRunMQLogger.error).not.toHaveBeenCalled(); }); - it('should log error and rethrow when publish fails', () => { + it('should log error and rethrow when publish rejects', async () => { const testTopic = 'test.topic'; const testMessage = MockedRabbitMQMessage; const publishError = new Error('Publish failed'); + mockProducer.publish.mockRejectedValueOnce(publishError); - mockProducer.publish.mockImplementation(() => { - throw publishError; - }); - - expect(() => { - failureLoggerProducer.publish(testTopic, testMessage); - }).toThrow('Publish failed'); + await expect(failureLoggerProducer.publish(testTopic, testMessage)) + .rejects.toThrow('Publish failed'); expect(mockProducer.publish).toHaveBeenCalledWith(testTopic, testMessage); expect(MockedRunMQLogger.error).toHaveBeenCalledWith( 'Message publishing failed', - { - message: testMessage, + expect.objectContaining({ + topic: testTopic, + correlationId: testMessage.correlationId, error: 'Publish failed', stack: publishError.stack, - } + }) ); }); - it('should handle non-Error exceptions', () => { + it('should handle non-Error rejections', async () => { const testTopic = 'test.topic'; const testMessage = MockedRabbitMQMessage; - const publishError = 'String error'; - - mockProducer.publish.mockImplementation(() => { - throw publishError; - }); + mockProducer.publish.mockRejectedValueOnce('String error'); - expect(() => { - failureLoggerProducer.publish(testTopic, testMessage); - }).toThrow('String error'); + await expect(failureLoggerProducer.publish(testTopic, testMessage)) + .rejects.toBe('String error'); expect(MockedRunMQLogger.error).toHaveBeenCalledWith( 'Message publishing failed', - { - message: testMessage, - error: JSON.stringify(publishError), + expect.objectContaining({ + topic: testTopic, + correlationId: testMessage.correlationId, + error: JSON.stringify('String error'), stack: undefined, - } + }) ); }); - it('should handle complex message objects in error logging', () => { - const testTopic = 'test.topic'; - const testMessage = MockedRabbitMQMessage; - - const publishError = new Error('Complex message error'); - - mockProducer.publish.mockImplementation(() => { - throw publishError; - }); - - expect(() => { - failureLoggerProducer.publish(testTopic, testMessage); - }).toThrow('Complex message error'); - - expect(MockedRunMQLogger.error).toHaveBeenCalledWith( - 'Message publishing failed', - { - message: testMessage, - error: 'Complex message error', - stack: publishError.stack, - } - ); - }); - - it('should handle null/undefined message content in error logging', () => { - const testTopic = 'test.topic'; - const testMessage = MockedRabbitMQMessage; - - const publishError = new Error('Null message error'); - - mockProducer.publish.mockImplementation(() => { - throw publishError; - }); - - expect(() => { - failureLoggerProducer.publish(testTopic, testMessage); - }).toThrow('Null message error'); - - expect(MockedRunMQLogger.error).toHaveBeenCalledWith( - 'Message publishing failed', - { - message: testMessage, - error: 'Null message error', - stack: publishError.stack, - } - ); - }); - - it('should preserve original error when rethrowing', () => { + it('should preserve original error when rethrowing', async () => { const testTopic = 'test.topic'; const testMessage = MockedRabbitMQMessage; const originalError = new Error('Original error'); originalError.name = 'CustomError'; + mockProducer.publish.mockRejectedValueOnce(originalError); - mockProducer.publish.mockImplementation(() => { - throw originalError; - }); - - expect(() => { - failureLoggerProducer.publish(testTopic, testMessage); - }).toThrow(originalError); + await expect(failureLoggerProducer.publish(testTopic, testMessage)) + .rejects.toBe(originalError); expect(MockedRunMQLogger.error).toHaveBeenCalledWith( 'Message publishing failed', - { - message: testMessage, + expect.objectContaining({ + topic: testTopic, + correlationId: testMessage.correlationId, error: 'Original error', stack: originalError.stack, - } + }) ); }); }); From 84e1e47946fb49a157d364b8d1256b23dd1d3185 Mon Sep 17 00:00:00 2001 From: Fawzi Abdulfattah Date: Sun, 10 May 2026 03:01:53 +0200 Subject: [PATCH 2/4] docs: document publisher confirms default-on in README --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 2db2b6c..997d369 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ What's happening here: ### Publish a message ```typescript -runMQ.publish('user.created', { +await runMQ.publish('user.created', { userId: '123', email: 'user@example.com', name: 'John Doe' @@ -108,6 +108,8 @@ runMQ.publish('user.created', { ✅ One publish, every subscribed processor receives the message — independently and atomically. +✅ **Confirmed delivery by default.** `runMQ.publish()` returns a promise that resolves only after RabbitMQ has accepted the message; if the broker rejects it (alarm state, mandatory routing failure, etc.), the promise rejects so your code can handle it. Set `usePublisherConfirms: false` in the connection config to opt out and fall back to fire-and-forget publishing if per-publish round-trip latency matters more to you than detecting silent drops. +
## Patterns RunMQ fits naturally From 65c527eb5740dcf6689baab9b48a1ce1f611912e Mon Sep 17 00:00:00 2001 From: Fawzi Abdulfattah Date: Sun, 10 May 2026 03:26:31 +0200 Subject: [PATCH 3/4] test: restore deflaked resubscribe e2e lost in merge The merge from origin/main reintroduced an older fixed-delay version of the resubscribe e2e (the publisher-confirms branch had deleted the file, and the merge picked up the pre-deflake copy). Restore the polling version from main so CI doesn't flake on the 8s/2s sleeps. Co-Authored-By: Claude Opus 4.7 --- tests/e2e/RunMQ.resubscribe.e2e.test.ts | 111 +++++++++++++++++------- 1 file changed, 78 insertions(+), 33 deletions(-) diff --git a/tests/e2e/RunMQ.resubscribe.e2e.test.ts b/tests/e2e/RunMQ.resubscribe.e2e.test.ts index a82750a..da5fe2b 100644 --- a/tests/e2e/RunMQ.resubscribe.e2e.test.ts +++ b/tests/e2e/RunMQ.resubscribe.e2e.test.ts @@ -2,7 +2,6 @@ import {RunMQ} from '@src/core/RunMQ'; import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter"; import {Constants} from "@src/core/constants"; import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers"; -import {RunMQUtils} from "@src/core/utils/RunMQUtils"; import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample"; import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessorConfigurationExample"; @@ -20,38 +19,73 @@ function authHeader(): string { return 'Basic ' + Buffer.from(`${cfg.username}:${cfg.password}`).toString('base64'); } -async function closeConnectionsForQueue(queueName: string, timeoutMs: number): Promise { +async function listConsumersForQueue(queueName: string): Promise { const cfg = RabbitMQManagementConfigExample.valid(); + const res = await fetch(`${cfg.url}/api/consumers`, { + headers: {Authorization: authHeader()}, + }); + if (!res.ok) throw new Error(`list consumers failed: ${res.status}`); + const consumers = (await res.json()) as ManagementConsumer[]; + return consumers.filter((c) => c.queue?.name === queueName); +} + +async function waitForConsumers(queueName: string, expectedCount: number, timeoutMs: number): Promise { const deadline = Date.now() + timeoutMs; + let last: ManagementConsumer[] = []; + while (Date.now() < deadline) { + last = await listConsumersForQueue(queueName); + if (last.length >= expectedCount) return last; + await new Promise((r) => setTimeout(r, 200)); + } + return last; +} + +async function closeConnectionsForQueue(queueName: string, timeoutMs: number): Promise<{ closed: Set; consumerTags: Set }> { + const cfg = RabbitMQManagementConfigExample.valid(); + const targets = await waitForConsumers(queueName, 1, timeoutMs); const closed = new Set(); + const consumerTags = new Set(); + for (const c of targets) { + const connName = c.channel_details?.connection_name; + if (connName && !closed.has(connName)) { + const del = await fetch( + `${cfg.url}/api/connections/${encodeURIComponent(connName)}`, + {method: 'DELETE', headers: {Authorization: authHeader()}} + ); + if (!del.ok && del.status !== 404) { + throw new Error(`close connection failed: ${del.status}`); + } + closed.add(connName); + } + const tag = (c as any).consumer_tag; + if (tag) consumerTags.add(tag); + } + return {closed, consumerTags}; +} +async function waitForFreshConsumer(queueName: string, originalTags: Set, timeoutMs: number): Promise { + const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { - const res = await fetch(`${cfg.url}/api/consumers`, { - headers: {Authorization: authHeader()}, + const consumers = await listConsumersForQueue(queueName); + const fresh = consumers.find((c) => { + const tag = (c as any).consumer_tag; + return tag && !originalTags.has(tag); }); - if (!res.ok) throw new Error(`list consumers failed: ${res.status}`); - const consumers = (await res.json()) as ManagementConsumer[]; - const targets = consumers.filter((c) => c.queue?.name === queueName); - - if (targets.length > 0) { - for (const c of targets) { - const connName = c.channel_details?.connection_name; - if (connName && !closed.has(connName)) { - const del = await fetch( - `${cfg.url}/api/connections/${encodeURIComponent(connName)}`, - {method: 'DELETE', headers: {Authorization: authHeader()}} - ); - if (!del.ok && del.status !== 404) { - throw new Error(`close connection failed: ${del.status}`); - } - closed.add(connName); - } - } - return closed.size; - } + if (fresh) return true; await new Promise((r) => setTimeout(r, 200)); } - return closed.size; + return false; +} + +async function waitFor(check: () => T | Promise, predicate: (v: T) => boolean, timeoutMs: number): Promise { + const deadline = Date.now() + timeoutMs; + let last: T; + do { + last = await check(); + if (predicate(last)) return last; + await new Promise((r) => setTimeout(r, 100)); + } while (Date.now() < deadline); + return last!; } describe('RunMQ Consumer Channel Resubscription E2E', () => { @@ -81,17 +115,24 @@ describe('RunMQ Consumer Channel Resubscription E2E', () => { topic, MessageTestUtils.buffer(RunMQMessageExample.random()) ); - await RunMQUtils.delay(500); + await waitFor( + () => received.length, + (n) => n >= 1, + 5000 + ); expect(received.length).toBe(1); // Force-close only the connection(s) holding consumers for our queue. // Scoping by queue avoids killing unrelated parallel-test connections. - const closed = await closeConnectionsForQueue(configuration.name, 5000); - expect(closed).toBeGreaterThan(0); + const {closed, consumerTags} = await closeConnectionsForQueue(configuration.name, 5000); + expect(closed.size).toBeGreaterThan(0); - // Wait long enough for the rabbitmq-client to reconnect and for our - // resubscription to fire (RECONNECT_DELAY is 5s). Add headroom. - await RunMQUtils.delay(8000); + // Wait until a NEW consumer (different tag from the one we killed) is + // registered against the queue — that proves the resubscription + // pipeline ran end-to-end. Polling beats fixed sleeps: it cuts the + // happy-path delay and removes the slow-CI flake from waiting too short. + const resubscribed = await waitForFreshConsumer(configuration.name, consumerTags, 20000); + expect(resubscribed).toBe(true); // Re-acquire a publishing channel; the previous one was closed too. const republishChannel = await testingConnection.getChannel(); @@ -101,9 +142,13 @@ describe('RunMQ Consumer Channel Resubscription E2E', () => { MessageTestUtils.buffer(RunMQMessageExample.random()) ); - await RunMQUtils.delay(2000); + await waitFor( + () => received.length, + (n) => n >= 2, + 5000 + ); expect(received.length).toBe(2); await runMQ.disconnect(); - }, 30000); + }, 40000); }); From 9b7f81678d3f2023af9ebf7cc958c316132b5aa1 Mon Sep 17 00:00:00 2001 From: Fawzi Abdulfattah Date: Sun, 10 May 2026 03:55:42 +0200 Subject: [PATCH 4/4] Fixing tests Signed-off-by: Fawzi Abdulfattah --- tests/unit/core/RunMQ.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/core/RunMQ.test.ts b/tests/unit/core/RunMQ.test.ts index 96f285c..a20d586 100644 --- a/tests/unit/core/RunMQ.test.ts +++ b/tests/unit/core/RunMQ.test.ts @@ -167,7 +167,7 @@ describe('RunMQ Unit Tests', () => { (RunMQUtils.generateUUID as jest.Mock).mockReturnValue('msg-uuid'); const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); - runMQ.publish('test.topic', MessageExample.person(), 'corr-1'); + await runMQ.publish('test.topic', MessageExample.person(), 'corr-1'); expect(MockedRunMQLogger.info).toHaveBeenCalledWith('Published message', { topic: 'test.topic', @@ -186,7 +186,7 @@ describe('RunMQ Unit Tests', () => { MockedRunMQLogger, ); const payload = MessageExample.person(); - runMQ.publish('test.topic', payload, 'corr-1'); + await runMQ.publish('test.topic', payload, 'corr-1'); expect(MockedRunMQLogger.info).toHaveBeenCalledWith('Published message', { topic: 'test.topic',