From 5b759d6b085f670796c147c4f143d6178a142d7f Mon Sep 17 00:00:00 2001 From: technoboy Date: Sun, 13 Feb 2022 21:05:25 +0800 Subject: [PATCH] Fix PersistentAcknowledgmentsGroupingTracker set bitSet issue. --- .../client/impl/BatchMessageIdImpl.java | 4 ++ ...sistentAcknowledgmentsGroupingTracker.java | 4 +- .../AcknowledgementsGroupingTrackerTest.java | 41 +++++++++++++++++-- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index dc177235c4f6a..104d36b4b2fae 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -138,6 +138,10 @@ public int getBatchSize() { return acker.getBatchSize(); } + public int getOriginalBatchSize() { + return this.batchSize; + } + public MessageIdImpl prevBatchMessageId() { return new MessageIdImpl( ledgerId, entryId - 1, partitionIndex); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index b705453cb022c..a1831e13e0941 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -364,7 +364,7 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet()); } else { value = ConcurrentBitSetRecyclable.create(); - value.set(0, batchMessageId.getBatchIndex()); + value.set(0, batchMessageId.getOriginalBatchSize()); } return value; }); @@ -553,7 +553,7 @@ private void flushAsync(ClientCnx cnx) { if (log.isDebugEnabled()) { log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}" + " -- individual-batch-index-acks: {}", - consumer, lastCumulativeAck, pendingIndividualAcks, pendingIndividualBatchIndexAcks); + consumer, lastCumulativeAck, pendingIndividualAcks, entriesToAck); } cnx.ctx().flush(); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index 9632a88793d20..c0b952a281a8e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -22,22 +22,27 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; - +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.BitSet; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; - +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.TimedCompletableFuture; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.testng.annotations.AfterClass; @@ -381,6 +386,36 @@ public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws Exception tracker.close(); } + @Test + public void testDoIndividualBatchAckAsync() throws Exception{ + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + AcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, BatchMessageAckerDisabled.INSTANCE); + BitSet bitSet = new BitSet(20); + for(int i = 0; i < 20; i ++) { + bitSet.set(i, true); + } + MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, BatchMessageAcker.newAcker(bitSet)); + Method doIndividualBatchAckAsync = PersistentAcknowledgmentsGroupingTracker.class + .getDeclaredMethod("doIndividualBatchAckAsync", BatchMessageIdImpl.class); + doIndividualBatchAckAsync.setAccessible(true); + doIndividualBatchAckAsync.invoke(tracker, messageId1); + doIndividualBatchAckAsync.invoke(tracker, messageId2); + Field pendingIndividualBatchIndexAcks = PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks"); + pendingIndividualBatchIndexAcks.setAccessible(true); + ConcurrentHashMap batchIndexAcks = + (ConcurrentHashMap) pendingIndividualBatchIndexAcks.get(tracker); + MessageIdImpl position1 = new MessageIdImpl(5, 1, 0); + MessageIdImpl position2 = new MessageIdImpl(3, 2, 0); + assertTrue(batchIndexAcks.containsKey(position1)); + assertNotNull(batchIndexAcks.get(position1)); + assertEquals(batchIndexAcks.get(position1).cardinality(), 9); + assertTrue(batchIndexAcks.containsKey(position2)); + assertNotNull(batchIndexAcks.get(position2)); + assertEquals(batchIndexAcks.get(position2).cardinality(), 19); + tracker.close(); + } + public class ClientCnxTest extends ClientCnx { public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {