Skip to content

Fix batch acknowledgment for deserialized message IDs#359

Open
MatheusReichert wants to merge 1 commit into
fsprojects:developfrom
MatheusReichert:fix/deserialized-batch-ack
Open

Fix batch acknowledgment for deserialized message IDs#359
MatheusReichert wants to merge 1 commit into
fsprojects:developfrom
MatheusReichert:fix/deserialized-batch-ack

Conversation

@MatheusReichert

@MatheusReichert MatheusReichert commented Jun 11, 2026

Copy link
Copy Markdown

See also apache/pulsar#19030 and apache/pulsar#19031

Motivation

Currently, when a message ID of a batched message is serialized (e.g., stored in a database or outbox envelope) and later deserialized to be acknowledged manually via AcknowledgeAsync (or NegativeAcknowledge), the acknowledgment fails to propagate to the broker.

This happens because the deserialized MessageId gets its own newly instantiated, isolated instance of BatchMessageAcker. Since the individual messages in the batch no longer share the same acker instance, calling AckIndividual on each isolated acker never drives the outstanding count to 0 for any of the instances. Thus, the consumer never sends the final batch acknowledgment command to the broker, causing all messages in the batch to be redelivered once the acknowledgment timeout expires.

This PR fixes the issue by introducing a batchAckers map inside ConsumerImpl to track and share BatchMessageAcker instances across deserialized message IDs associated with the same batch entry (LedgerId, EntryId).

Modifications

  • Added the batchAckers dictionary in ConsumerImpl.fs to track active BatchMessageAckers.
  • Stored the batch acker in the batchAckers map during batch parsing in ReceiveIndividualMessagesFromBatch.
  • Updated trySendAcknowledge and doTransactionAcknowledgeForResponse to resolve the shared BatchMessageAcker from the map.
  • Added clearing of the batchAckers dictionary in closeConsumerTasks to release resources and prevent memory leaks.
  • Added a unit test in MessageTests.fs to verify batch message ID acker isolation behavior during serialization and deserialization.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a Pulsar batch-acknowledgment correctness issue when MessageIds from batched messages are serialized and later deserialized for manual acknowledgment, by introducing shared BatchMessageAcker resolution inside ConsumerImpl keyed by (LedgerId, EntryId).

Changes:

  • Added batchAckers map to share BatchMessageAcker instances across deserialized MessageIds for the same batch entry.
  • Updated acknowledgment paths to resolve the shared acker from batchAckers, and clear the map during consumer shutdown.
  • Added a unit test documenting that MessageId (de)serialization yields isolated BatchMessageAcker instances.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.

File Description
src/Pulsar.Client/Internal/ConsumerImpl.fs Adds and uses a batchAckers map to coordinate batch-acker state across deserialized MessageIds; clears map on close; stores acker during batch parsing.
tests/UnitTests/Common/MessageTests.fs Adds a unit test around batch MessageId (de)serialization acker-instance isolation.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +415 to +422
let struct(batchIndex, batchAcker) = batchDetails
let key = struct(messageId.LedgerId, messageId.EntryId)
let acker =
match batchAckers.TryGetValue(key) with
| true, sharedAcker -> sharedAcker
| false, _ ->
batchAckers[key] <- batchAcker
batchAcker
Comment on lines 360 to 368
| Batch (batchIndex, batchAcker) ->
let key = struct(messageId.LedgerId, messageId.EntryId)
let acker =
match batchAckers.TryGetValue(key) with
| true, sharedAcker -> sharedAcker
| false, _ ->
batchAckers[key] <- batchAcker
batchAcker
let ackSet =
Comment on lines 1406 to 1409
let batchSize = rawMessage.Metadata.NumMessages
let acker = BatchMessageAcker(batchSize)
batchAckers[struct(rawMessage.MessageId.LedgerId, rawMessage.MessageId.EntryId)] <- acker
let mutable skippedMessages = 0

let negativeAcksTracker = NegativeAcksTracker(prefix, consumerConfig.NegativeAckRedeliveryDelay, negativeAcksRedeliver)

let batchAckers = Dictionary<struct(LedgerId*EntryId), BatchMessageAcker>()
Expect.equal "" msgId deserialized
}

test "Batch MessageId serialization acker isolation" {
@Lanayx

Lanayx commented Jun 12, 2026

Copy link
Copy Markdown
Member

Hi, thanks for raising the issue, however given the referenced Java issue and unmerged PR, I'm not sure that this should be fixed at all, it rather looks like unsupported case by pulsar.
As far as I know, the main use case for saving MessageId before consumer restart to start from it later, so the byte structure was designed particuarly for that. It doesn't seem like there is any other pulsar client that went above that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants