Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class MessageIdSerializationTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 30000)
public void testAcknowledge() throws Exception {
String topic = "test-acknowledge-" + System.currentTimeMillis();
@Cleanup Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.batchingMaxMessages(100)
.batchingMaxPublishDelay(1, TimeUnit.DAYS)
.create();
ConsumerImpl<Integer> consumer = createConsumer(topic);
final int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
producer.sendAsync(i);
}
producer.flush();
final List<MessageId> msgIds = new ArrayList<>();
final Set<Pair<Long, Long>> positions = new TreeSet<>();
receiveAtMost(consumer, numMessages, msgIds, positions);
assertEquals(positions.size(), 1);

for (int i = 0; i < msgIds.size(); i++) {
consumer.acknowledge(msgIds.get(i));
if (i < msgIds.size() - 1) {
assertEquals(consumer.getBatchMessageToAckerSize(), 1);
} else {
assertEquals(consumer.getBatchMessageToAckerSize(), 0);
}
}
consumer.close();

consumer = createConsumer(topic);
MessageId newMsgId = producer.send(0);
MessageId receivedMessageId = consumer.receive().getMessageId();
assertEquals(newMsgId, receivedMessageId);
consumer.close();
}

@Test(timeOut = 30000)
public void testAcknowledgeCumulative() throws Exception {
String topic = "test-acknowledge-cumulative-" + System.currentTimeMillis();
final int batchingMaxMessages = 10;
@Cleanup Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.batchingMaxMessages(batchingMaxMessages)
.batchingMaxPublishDelay(1, TimeUnit.DAYS)
.create();
ConsumerImpl<Integer> consumer = createConsumer(topic);
// send 3 batches
for (int i = 0; i < batchingMaxMessages * 2; i++) {
producer.sendAsync(i);
}
producer.flush();
final List<MessageId> msgIds = new ArrayList<>();
final Set<Pair<Long, Long>> positions = new TreeSet<>();
receiveAtMost(consumer, batchingMaxMessages * 2, msgIds, positions);
assertEquals(positions.size(), 2);

consumer.acknowledgeCumulative(msgIds.get(2));
assertEquals(consumer.getBatchMessageToAckerSize(), 1);
for (int i = batchingMaxMessages; i < batchingMaxMessages * 2; i++) {
consumer.acknowledgeCumulative(msgIds.get(i));
if (i < batchingMaxMessages * 2 - 1) {
assertEquals(consumer.getBatchMessageToAckerSize(), 1);
} else {
assertEquals(consumer.getBatchMessageToAckerSize(), 0);
}
}
consumer.close();
consumer = createConsumer(topic);
MessageId newMsgId = producer.send(0);
MessageId receivedMessageId = consumer.receive().getMessageId();
assertEquals(newMsgId, receivedMessageId);
consumer.close();
}

private ConsumerImpl<Integer> createConsumer(String topic) throws PulsarClientException {
return (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32).topic(topic)
.subscriptionName("sub").isAckReceiptEnabled(true).subscribe();
}

private static void receiveAtMost(Consumer<Integer> consumer, int numMessages, List<MessageId> msgIds,
Set<Pair<Long, Long>> positions) throws IOException {
for (int i = 0; i < numMessages; i++) {
final MessageIdImpl messageId = (MessageIdImpl) consumer.receive().getMessageId();
MessageId deserialized = MessageId.fromByteArray(messageId.toByteArray());
assertTrue(deserialized instanceof BatchMessageIdImpl);
msgIds.add(deserialized);
positions.add(Pair.of(messageId.getLedgerId(), messageId.getEntryId()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,17 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {

CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);

default CompletableFuture<Void> addBatchIndexAck(BatchMessageIdImpl msgId, AckType ackType,
Map<String, Long> properties) {
return CompletableFuture.completedFuture(null);
}

@Deprecated
CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, AckType ackType,
Map<String, Long> properties);

CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds, Map<String, Long> properties);

void flush();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public byte[] toByteArray() {
return toByteArray(batchIndex, batchSize);
}

@Deprecated
public boolean ackIndividual() {
return acker.ackIndividual(batchIndex);
}
Expand All @@ -113,14 +114,16 @@ public boolean ackCumulative() {
return acker.ackCumulative(batchIndex);
}

@Deprecated
public int getOutstandingAcksInSameBatch() {
return acker.getOutstandingAcks();
}

public int getBatchSize() {
return acker.getBatchSize();
return this.batchSize;
}

@Deprecated
public int getOriginalBatchSize() {
return this.batchSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,12 +542,12 @@ public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transactio

@Override
public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList) {
return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null);
return doAcknowledgeWithTxn(messageIdList, Collections.emptyMap(), null);
}

@Override
public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, Transaction txn) {
return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), (TransactionImpl) txn);
return doAcknowledgeWithTxn(messageIdList, Collections.emptyMap(), (TransactionImpl) txn);
}

@Override
Expand Down Expand Up @@ -655,17 +655,17 @@ public void negativeAcknowledge(Message<?> message) {
negativeAcknowledge(message.getMessageId());
}

protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageIdList, AckType ackType,
protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageIdList,
Map<String, Long> properties,
TransactionImpl txn) {
CompletableFuture<Void> ackFuture;
if (txn != null && this instanceof ConsumerImpl) {
ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));
.thenCompose(ignored -> doAcknowledge(messageIdList, AckType.Individual, properties, txn));
// register the ackFuture as part of the transaction
txn.registerAckOp(ackFuture);
} else {
ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
ackFuture = doAcknowledge(messageIdList, AckType.Individual, properties, txn);
}
return ackFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -62,9 +64,11 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
Expand Down Expand Up @@ -204,6 +208,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
// Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also consider cleaning up the batchMessageToAcker

  • After the seek operation
  • After the message has been redelivered(Nack or Ack timeout)
  • After the consumer reconnects to the broker? (I'm not 100% sure about this part, the cursor-reset might happened on the broker side)

@BewareMyPower BewareMyPower Dec 30, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get it. A new map entry could only be only added when a message was acknowledged.

    private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
        final BatchMessageAcker acker;
        if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
            acker = batchMessageToAcker.computeIfAbsent(

Why should we clean up the batchMessageToAcker after a seek operation?

Regarding the message redelivering or reconnection, if the batch message id was not created by deserialization, the acker inside the BatchMessageIdImpl would not be modified. If we only clean up the batchMessageToAcker for deserialized batch message id, the behavior would be different.

For example, assuming there are 2 batch message ids (id0 and id1) of the same entry and for the following steps:

  1. Acknowledge id0
  2. Reconnection
  3. Acknowledge id1

If they are retrieved from the deserialization, the entry will not be acknowledged because the 2nd step cleared the cache, and the BatchMessageAcker will be "XO" (X represents not acknowledged).

However, if they are just saved from the message.getMessageId(), the entry will be acknowledged because the shared BatchMessageAcker is "OO" and not affected by the reconnection.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batchMessageAckTracker before #1424 is updated for each message received:

    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf uncompressedPayload,
            MessageIdData messageId, ClientCnx cnx) {
        /* ... */
        batchMessageAckTracker.put(batchMessage, bitSet);

But the batchMessageToAcker in this PR will be updated only for acknowlegment.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we clean up the batchMessageToAcker after a seek operation?

The seek API will lead the consumer to consume from a new start message ID. It can be an earlier position.
Suppose you have 3 batch messages.

0:(0,1,2,3),
1:(0,1,2),
2:(0,1,2,3,4,5)

The message 0, 1, and 2:(0,2,4) are acknowledged. Then the consumer seeks to the earliest position.
Then the consumer will receive messages 0, 1, and 2 again.
Due to we have 2:(0,2,4) in the batchMessageToAcker and If only 2:(1,3,5) is acked after the seek operation.
But the client will ack the whole message 2.

From a user perspective, it can be a data loss. We should guarantee the at-lease-once semantic after the seek operation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the message redelivering or reconnection, if the batch message id was not created by deserialization, the acker inside the BatchMessageIdImpl would not be modified. If we only clean up the batchMessageToAcker for deserialized batch message id, the behavior would be different.

If the user decides to nack the message, they should not continue to ack it. After the message redelivery, the newly received message with a new Acker. But this PR introduced a shared state. It looks like the newly received message with the different acker can also associate with the old ack state if we don't clean up the shared state.

@codelipenghui codelipenghui Dec 30, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users should not continue to process/ack the messages before the seek operation. It's not the same case as I provided which is normal usage.

And we should ensure after the seek operation, we should not return the cached messages before the seek operation to users. If users try to cache the messages or just inflight messages, users need to guarantee they will not ack the inflight or cached messages before the seek operation.

BTW, if users left one message ID (messageIdList.get(4)) to acknowledge after seek and they didn't acknowledge other message IDs, what will they expect? Did they expect messages could be received again?

Yes, they should receive all the messages after the new seek position no matter what happened before the seek operation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point I insisted on is, the following two code snippets should be equivalent.

        final List<BatchMessageIdImpl> msgIds = new ArrayList<>();
        for (int i = 0; i < numMessages; i++) {
            msgIds.add((BatchMessageIdImpl) consumer.receive().getMessageId());
        }
        final List<BatchMessageIdImpl> msgIds = new ArrayList<>();
        for (int i = 0; i < numMessages; i++) {
            final MessageIdImpl messageId = (MessageIdImpl) consumer.receive().getMessageId();
            MessageId deserialized = MessageId.fromByteArray(messageId.toByteArray());
            msgIds.add((BatchMessageIdImpl) deserialized);
        }

This PR works well for the assumption above.

Your solution to the corner cases you described is very hacky. You created the same (i.e. equals returns true) message ID and archive a different goal.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users try to cache the messages or just inflight messages, users need to guarantee they will not ack the inflight or cached messages before the seek operation.

Just like this rule you've mentioned here, when message IDs from deserializations are used, there is a rule that all message IDs should be acknowledged (for simplicity, it does not count the cumulative ACK case). IMO, if users load a set of message IDs and he only acknowledges partial of them before seeking, it should be an incorrect use.

It's not the same case as I provided which is normal usage.

As I've explained above, it's not normal usage when you use message IDs from deserializations.

The wrong code users should not write:

var id0 = loadMessageId();
consumer.acknowledge(id0); // batch index: 0, batch size: 2
var id1 = loadMessageId(); // batch index: 1, batch size: 2
// User does not acknowledge id1 before seek
consumer.seek(MessageId.earliest);
// Instead, user acknowledges the outdated id1 after seek.
consumer.acknowledge(id1);

The correct code users should write:

var id0 = loadMessageId();
consumer.acknowledge(id0); // batch index: 0, batch size: 2
consumer.seek(MessageId.earliest);
storeMessageId(consumer.receive().getMessageId());  // batch index: 0, batch size: 2
storeMessageId(consumer.receive().getMessageId());  // batch index: 1, batch size: 2
var id1 = loadMessageId(); // batch index: 0, batch size: 2
var id2 = loadMessageId(); // batch index: 1, batch size: 2
consumer.acknowledge(id1);
consumer.acknowledge(id2);

Regarding the reconnection or ack failure, it's just the same. When acknowledgeAsync is called on all message IDs of a batch, the cache will be removed no matter if the acknowledgment succeeded. If only partial message IDs of a batch are acknowledged, when users received messages again, they should persist message IDs again and use the new message IDs, including the repeated positions, instead of the old message IDs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The id2 can be acked first, but the client crashed while processing message 1 (Shared subscription and users multiple threads to process messages). I mean that users do not want to miss the ack intentionally. But we can't guarantee the messages of a batch will be processed in order of a Shared subscription. Even if it is a Failover or Exclusive subscription, users can also ack individually and process messages out of order.

Regarding the reconnection or ack failure

I point out this one because a server-side cursor reset can cause the reconnection. But it should not be a problem that this PR should fix.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought again. If we have to use completely new BatchMessageIdImpl objects in regular cases, i.e. retrieve MessageId from Message, in the same case when BatchMessageIdImpl objects are retrieved from deserializations, the cache should be cleared.

new ConcurrentSkipListMap<>();

static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
Expand Down Expand Up @@ -529,6 +537,64 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
return result;
}

private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) {
if (ackType == AckType.Individual) {
stats.incrementNumAcksSent(numMessages);
unAckedMessageTracker.remove(messageId);
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.remove(messageId);
}
} else {
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId));
}
}

private void removeBatchMessageAckerUntil(Pair<Long, Long> position) {
for (Pair<Long, Long> key : batchMessageToAcker.keySet()) {
if (key.compareTo(position) <= 0) {
batchMessageToAcker.remove(key);
} else {
break;
}
}
}

@Nullable
private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
final BatchMessageAcker acker;
if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
acker = batchMessageToAcker.computeIfAbsent(
Pair.of(messageId.getLedgerId(), messageId.getEntryId()),
__ -> BatchMessageAcker.newAcker(messageId.getBatchSize()));
Comment thread
BewareMyPower marked this conversation as resolved.
} else {
acker = messageId.getAcker();
}
if (ackType == AckType.Individual) {
if (acker.ackIndividual(messageId.getBatchIndex())) {
batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId()));
return messageId.toMessageIdImpl();
} else {
return conf.isBatchIndexAckEnabled() ? messageId : null;
}
} else {
if (acker.ackCumulative(messageId.getBatchIndex())) {
removeBatchMessageAckerUntil(Pair.of(messageId.getLedgerId(), messageId.getEntryId()));
return messageId.toMessageIdImpl();
} else if (conf.isBatchIndexAckEnabled()) {
return messageId;
} else {
if (acker.isPrevBatchCumulativelyAcked()) {
return null;
} else {
acker.setPrevBatchCumulativelyAcked(true);
MessageIdImpl prevMessageId = messageId.prevBatchMessageId();
removeBatchMessageAckerUntil(Pair.of(prevMessageId.getLedgerId(), prevMessageId.getEntryId()));
return prevMessageId;
}
}
}
}

@Override
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String, Long> properties,
Expand All @@ -549,13 +615,33 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
}
return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties);
if (ackType == AckType.Individual) {
onAcknowledge(messageId, null);
} else {
onAcknowledgeCumulative(messageId, null);
}
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to change the name of this method to AcknowledgeAndGet? Just like Atomic's IncreaseAndGet

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This method is to get the MessageIdImpl to acknowledge. increaseAndGet modifies the atomic variable itself, but getMessageIdToAcknowledge just calculated a MessageIdImpl to acknowledge from the BatchMessageIdImpl and does not modify the BatchMessageIdImpl itself.

if (messageIdImpl == null) {
return CompletableFuture.completedFuture(null);
} else if (messageIdImpl instanceof BatchMessageIdImpl) {
return acknowledgmentsGroupingTracker.addBatchIndexAck(
(BatchMessageIdImpl) messageIdImpl, ackType, properties);
} else {
processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getBatchSize());
return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties);
}
} else {
MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1);
return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties);
}
}

@Override
protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType,
Map<String, Long> properties, TransactionImpl txn) {

for (MessageId messageId : messageIdList) {
checkArgument(messageId instanceof MessageIdImpl);
}
Expand All @@ -573,7 +659,25 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A
return doTransactionAcknowledgeForResponse(messageIdList, ackType, null,
properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
} else {
return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, ackType, properties);
List<MessageIdImpl> messageIdListToAck = new ArrayList<>();
for (MessageId messageId : messageIdList) {
onAcknowledge(messageId, null);
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType);
if (messageIdImpl != null) {
if (!(messageIdImpl instanceof BatchMessageIdImpl)) {
processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getBatchSize());
} // else: batch index ACK
messageIdListToAck.add(messageIdImpl);
}
} else {
MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1);
messageIdListToAck.add(messageIdImpl);
}
}
return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdListToAck, properties);
}
}

Expand Down Expand Up @@ -2779,6 +2883,11 @@ boolean isAckReceiptEnabled() {
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}

@VisibleForTesting
int getBatchMessageToAckerSize() {
return batchMessageToAcker.size();
}

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

}
Loading