Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public int getBatchSize() {
return acker.getBatchSize();
}

public int getOriginalBatchSize() {
return this.batchSize;
}

public MessageIdImpl prevBatchMessageId() {
return new MessageIdImpl(
ledgerId, entryId - 1, partitionIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
} else {
value = ConcurrentBitSetRecyclable.create();
value.set(0, batchMessageId.getBatchIndex());
value.set(0, batchMessageId.getOriginalBatchSize());
}
return value;
});
Expand Down Expand Up @@ -553,7 +553,7 @@ private void flushAsync(ClientCnx cnx) {
if (log.isDebugEnabled()) {
log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}"
+ " -- individual-batch-index-acks: {}",
consumer, lastCumulativeAck, pendingIndividualAcks, pendingIndividualBatchIndexAcks);
consumer, lastCumulativeAck, pendingIndividualAcks, entriesToAck);
}
cnx.ctx().flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.BitSet;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -381,6 +386,36 @@ public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws Exception
tracker.close();
}

@Test
public void testDoIndividualBatchAckAsync() throws Exception{
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
AcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, BatchMessageAckerDisabled.INSTANCE);
BitSet bitSet = new BitSet(20);
for(int i = 0; i < 20; i ++) {
bitSet.set(i, true);
}
MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, BatchMessageAcker.newAcker(bitSet));
Method doIndividualBatchAckAsync = PersistentAcknowledgmentsGroupingTracker.class
.getDeclaredMethod("doIndividualBatchAckAsync", BatchMessageIdImpl.class);
doIndividualBatchAckAsync.setAccessible(true);
doIndividualBatchAckAsync.invoke(tracker, messageId1);
doIndividualBatchAckAsync.invoke(tracker, messageId2);
Field pendingIndividualBatchIndexAcks = PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks");
pendingIndividualBatchIndexAcks.setAccessible(true);
ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> batchIndexAcks =
(ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>) pendingIndividualBatchIndexAcks.get(tracker);
MessageIdImpl position1 = new MessageIdImpl(5, 1, 0);
MessageIdImpl position2 = new MessageIdImpl(3, 2, 0);
assertTrue(batchIndexAcks.containsKey(position1));
assertNotNull(batchIndexAcks.get(position1));
assertEquals(batchIndexAcks.get(position1).cardinality(), 9);
assertTrue(batchIndexAcks.containsKey(position2));
assertNotNull(batchIndexAcks.get(position2));
assertEquals(batchIndexAcks.get(position2).cardinality(), 19);
tracker.close();
}

public class ClientCnxTest extends ClientCnx {

public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
Expand Down