diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java new file mode 100644 index 0000000000000..515dbfa29952a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +public class MessageIdSerializationTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testAcknowledge() throws Exception { + String topic = "test-acknowledge-" + System.currentTimeMillis(); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxMessages(100) + .batchingMaxPublishDelay(1, TimeUnit.DAYS) + .create(); + ConsumerImpl consumer = createConsumer(topic); + final int numMessages = 10; + for (int i = 0; i < numMessages; i++) { + producer.sendAsync(i); + } + producer.flush(); + final List msgIds = new ArrayList<>(); + final Set> positions = new TreeSet<>(); + receiveAtMost(consumer, numMessages, msgIds, positions); + assertEquals(positions.size(), 1); + + for (int i = 0; i < msgIds.size(); i++) { + consumer.acknowledge(msgIds.get(i)); + if (i < msgIds.size() - 1) { + assertEquals(consumer.getBatchMessageToAckerSize(), 1); + } else { + assertEquals(consumer.getBatchMessageToAckerSize(), 0); + } + } + consumer.close(); + + consumer = createConsumer(topic); + MessageId newMsgId = producer.send(0); + MessageId receivedMessageId = consumer.receive().getMessageId(); + assertEquals(newMsgId, receivedMessageId); + consumer.close(); + } + + @Test(timeOut = 30000) + public void testAcknowledgeCumulative() throws Exception { + String topic = "test-acknowledge-cumulative-" + System.currentTimeMillis(); + final int batchingMaxMessages = 10; + @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxMessages(batchingMaxMessages) + .batchingMaxPublishDelay(1, TimeUnit.DAYS) + .create(); + ConsumerImpl consumer = createConsumer(topic); + // send 3 batches + for (int i = 0; i < batchingMaxMessages * 2; i++) { + producer.sendAsync(i); + } + producer.flush(); + final List msgIds = new ArrayList<>(); + final Set> positions = new TreeSet<>(); + receiveAtMost(consumer, batchingMaxMessages * 2, msgIds, positions); + assertEquals(positions.size(), 2); + + consumer.acknowledgeCumulative(msgIds.get(2)); + assertEquals(consumer.getBatchMessageToAckerSize(), 1); + for (int i = batchingMaxMessages; i < batchingMaxMessages * 2; i++) { + consumer.acknowledgeCumulative(msgIds.get(i)); + if (i < batchingMaxMessages * 2 - 1) { + assertEquals(consumer.getBatchMessageToAckerSize(), 1); + } else { + assertEquals(consumer.getBatchMessageToAckerSize(), 0); + } + } + consumer.close(); + consumer = createConsumer(topic); + MessageId newMsgId = producer.send(0); + MessageId receivedMessageId = consumer.receive().getMessageId(); + assertEquals(newMsgId, receivedMessageId); + consumer.close(); + } + + private ConsumerImpl createConsumer(String topic) throws PulsarClientException { + return (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32).topic(topic) + .subscriptionName("sub").isAckReceiptEnabled(true).subscribe(); + } + + private static void receiveAtMost(Consumer consumer, int numMessages, List msgIds, + Set> positions) throws IOException { + for (int i = 0; i < numMessages; i++) { + final MessageIdImpl messageId = (MessageIdImpl) consumer.receive().getMessageId(); + MessageId deserialized = MessageId.fromByteArray(messageId.toByteArray()); + assertTrue(deserialized instanceof BatchMessageIdImpl); + msgIds.add(deserialized); + positions.add(Pair.of(messageId.getLedgerId(), messageId.getEntryId())); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java index d46af1a99e7f0..1200eb5c63e8f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java @@ -33,9 +33,17 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable { CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties); + default CompletableFuture addBatchIndexAck(BatchMessageIdImpl msgId, AckType ackType, + Map properties) { + return CompletableFuture.completedFuture(null); + } + + @Deprecated CompletableFuture addListAcknowledgment(List messageIds, AckType ackType, Map properties); + CompletableFuture addListAcknowledgment(List messageIds, Map properties); + void flush(); @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index 7e3a143dff8e0..3ac32c5665e24 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -105,6 +105,7 @@ public byte[] toByteArray() { return toByteArray(batchIndex, batchSize); } + @Deprecated public boolean ackIndividual() { return acker.ackIndividual(batchIndex); } @@ -113,14 +114,16 @@ public boolean ackCumulative() { return acker.ackCumulative(batchIndex); } + @Deprecated public int getOutstandingAcksInSameBatch() { return acker.getOutstandingAcks(); } public int getBatchSize() { - return acker.getBatchSize(); + return this.batchSize; } + @Deprecated public int getOriginalBatchSize() { return this.batchSize; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index dd4932ec2bca4..ef514f4389f99 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -542,12 +542,12 @@ public CompletableFuture acknowledgeAsync(Messages messages, Transactio @Override public CompletableFuture acknowledgeAsync(List messageIdList) { - return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null); + return doAcknowledgeWithTxn(messageIdList, Collections.emptyMap(), null); } @Override public CompletableFuture acknowledgeAsync(List messageIdList, Transaction txn) { - return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), (TransactionImpl) txn); + return doAcknowledgeWithTxn(messageIdList, Collections.emptyMap(), (TransactionImpl) txn); } @Override @@ -655,17 +655,17 @@ public void negativeAcknowledge(Message message) { negativeAcknowledge(message.getMessageId()); } - protected CompletableFuture doAcknowledgeWithTxn(List messageIdList, AckType ackType, + protected CompletableFuture doAcknowledgeWithTxn(List messageIdList, Map properties, TransactionImpl txn) { CompletableFuture ackFuture; if (txn != null && this instanceof ConsumerImpl) { ackFuture = txn.registerAckedTopic(getTopic(), subscription) - .thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn)); + .thenCompose(ignored -> doAcknowledge(messageIdList, AckType.Individual, properties, txn)); // register the ackFuture as part of the transaction txn.registerAckOp(ackFuture); } else { - ackFuture = doAcknowledge(messageIdList, ackType, properties, txn); + ackFuture = doAcknowledge(messageIdList, AckType.Individual, properties, txn); } return ackFuture; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 8fef739983648..5fd5f4e125e57 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -48,6 +48,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -62,9 +64,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; import lombok.AccessLevel; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.DeadLetterPolicy; @@ -204,6 +208,10 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List previousExceptions = new CopyOnWriteArrayList(); + // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged + private final ConcurrentNavigableMap, BatchMessageAcker> batchMessageToAcker = + new ConcurrentSkipListMap<>(); + static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, @@ -529,6 +537,64 @@ protected CompletableFuture> internalBatchReceiveAsync() { return result; } + private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) { + if (ackType == AckType.Individual) { + stats.incrementNumAcksSent(numMessages); + unAckedMessageTracker.remove(messageId); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.remove(messageId); + } + } else { + stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId)); + } + } + + private void removeBatchMessageAckerUntil(Pair position) { + for (Pair key : batchMessageToAcker.keySet()) { + if (key.compareTo(position) <= 0) { + batchMessageToAcker.remove(key); + } else { + break; + } + } + } + + @Nullable + private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) { + final BatchMessageAcker acker; + if (messageId.getAcker() instanceof BatchMessageAckerDisabled) { + acker = batchMessageToAcker.computeIfAbsent( + Pair.of(messageId.getLedgerId(), messageId.getEntryId()), + __ -> BatchMessageAcker.newAcker(messageId.getBatchSize())); + } else { + acker = messageId.getAcker(); + } + if (ackType == AckType.Individual) { + if (acker.ackIndividual(messageId.getBatchIndex())) { + batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId())); + return messageId.toMessageIdImpl(); + } else { + return conf.isBatchIndexAckEnabled() ? messageId : null; + } + } else { + if (acker.ackCumulative(messageId.getBatchIndex())) { + removeBatchMessageAckerUntil(Pair.of(messageId.getLedgerId(), messageId.getEntryId())); + return messageId.toMessageIdImpl(); + } else if (conf.isBatchIndexAckEnabled()) { + return messageId; + } else { + if (acker.isPrevBatchCumulativelyAcked()) { + return null; + } else { + acker.setPrevBatchCumulativelyAcked(true); + MessageIdImpl prevMessageId = messageId.prevBatchMessageId(); + removeBatchMessageAckerUntil(Pair.of(prevMessageId.getLedgerId(), prevMessageId.getEntryId())); + return prevMessageId; + } + } + } + } + @Override protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties, @@ -549,13 +615,33 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); } - return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties); + if (ackType == AckType.Individual) { + onAcknowledge(messageId, null); + } else { + onAcknowledgeCumulative(messageId, null); + } + if (messageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType); + if (messageIdImpl == null) { + return CompletableFuture.completedFuture(null); + } else if (messageIdImpl instanceof BatchMessageIdImpl) { + return acknowledgmentsGroupingTracker.addBatchIndexAck( + (BatchMessageIdImpl) messageIdImpl, ackType, properties); + } else { + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getBatchSize()); + return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties); + } + } else { + MessageIdImpl messageIdImpl = (MessageIdImpl) messageId; + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1); + return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties); + } } @Override protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { - for (MessageId messageId : messageIdList) { checkArgument(messageId instanceof MessageIdImpl); } @@ -573,7 +659,25 @@ protected CompletableFuture doAcknowledge(List messageIdList, A return doTransactionAcknowledgeForResponse(messageIdList, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); } else { - return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, ackType, properties); + List messageIdListToAck = new ArrayList<>(); + for (MessageId messageId : messageIdList) { + onAcknowledge(messageId, null); + if (messageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType); + if (messageIdImpl != null) { + if (!(messageIdImpl instanceof BatchMessageIdImpl)) { + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getBatchSize()); + } // else: batch index ACK + messageIdListToAck.add(messageIdImpl); + } + } else { + MessageIdImpl messageIdImpl = (MessageIdImpl) messageId; + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1); + messageIdListToAck.add(messageIdImpl); + } + } + return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdListToAck, properties); } } @@ -2779,6 +2883,11 @@ boolean isAckReceiptEnabled() { && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion()); } + @VisibleForTesting + int getBatchMessageToAckerSize() { + return batchMessageToAcker.size(); + } + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 02298e0f9d66d..6f03da11ce156 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -96,13 +96,9 @@ public static MessageId fromByteArray(byte[] data) throws IOException { MessageIdImpl messageId; if (idData.hasBatchIndex()) { - if (idData.hasBatchSize()) { - messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), - idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize())); - } else { - messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), - idData.getBatchIndex()); - } + int batchSize = idData.hasBatchSize() ? idData.getBatchSize() : -1; + messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), + idData.getBatchIndex(), batchSize, BatchMessageAckerDisabled.INSTANCE); } else if (idData.hasFirstChunkMessageId()) { MessageIdData firstChunkIdData = idData.getFirstChunkMessageId(); messageId = new ChunkMessageIdImpl( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 224276ba5f08f..2105fec21589f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -493,7 +493,7 @@ protected CompletableFuture doAcknowledge(List messageIdList, } topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> { ConsumerImpl consumer = consumers.get(topicPartitionName); - resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) + resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, properties, txn) .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); }); return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java index 32f8fb922304b..576fe73c516a6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java @@ -50,8 +50,13 @@ public CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ac } @Override - public CompletableFuture addListAcknowledgment(List messageIds, - AckType ackType, + public CompletableFuture addListAcknowledgment(List messageIds, AckType ackType, + Map properties) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture addListAcknowledgment(List messageIds, Map properties) { // no-op return CompletableFuture.completedFuture(null); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index fef0bcb8906f1..c20affc11bad6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -18,18 +18,17 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -37,8 +36,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import javax.annotation.Nullable; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Triple; @@ -83,7 +80,6 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments private final ConcurrentHashMap pendingIndividualBatchIndexAcks; private final ScheduledFuture scheduledTask; - private final boolean batchIndexAckEnabled; private final boolean ackReceiptEnabled; public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, ConsumerConfigurationData conf, @@ -93,7 +89,6 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>(); this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros(); this.maxAckGroupSize = conf.getMaxAcknowledgmentGroupSize(); - this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled(); this.ackReceiptEnabled = conf.isAckReceiptEnabled(); this.currentIndividualAckFuture = new TimedCompletableFuture<>(); this.currentCumulativeAckFuture = new TimedCompletableFuture<>(); @@ -129,53 +124,42 @@ public boolean isDuplicate(MessageId messageId) { } @Override - public CompletableFuture addListAcknowledgment(List messageIds, - AckType ackType, Map properties) { - if (AckType.Cumulative.equals(ackType)) { - if (consumer.isAckReceiptEnabled()) { - Set> completableFutureSet = new HashSet<>(); - messageIds.forEach(messageId -> - completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties))); - return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet)); + public CompletableFuture addListAcknowledgment(List messageIds, AckType ackType, + Map properties) { + checkArgument(ackType == AckType.Individual); + final List messageIdImpls = new ArrayList<>(); + for (MessageId messageId : messageIds) { + messageIdImpls.add((MessageIdImpl) messageId); + } + return addListAcknowledgment(messageIdImpls, properties); + } + + @Override + public CompletableFuture addListAcknowledgment(List messageIds, + Map properties) { + Optional readLock = acquireReadLock(); + try { + if (messageIds.size() != 0) { + addListAcknowledgment(messageIds); + return readLock.map(__ -> currentIndividualAckFuture) + .orElse(CompletableFuture.completedFuture(null)); } else { - messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties)); return CompletableFuture.completedFuture(null); } - } else { - Optional readLock = acquireReadLock(); - try { - if (messageIds.size() != 0) { - addListAcknowledgment(messageIds); - return readLock.map(__ -> currentIndividualAckFuture) - .orElse(CompletableFuture.completedFuture(null)); - } else { - return CompletableFuture.completedFuture(null); - } - } finally { - readLock.ifPresent(Lock::unlock); - if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= maxAckGroupSize) { - flush(); - } + } finally { + readLock.ifPresent(Lock::unlock); + if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= maxAckGroupSize) { + flush(); } } } - private void addListAcknowledgment(List messageIds) { - for (MessageId messageId : messageIds) { + private void addListAcknowledgment(List messageIds) { + for (MessageIdImpl messageId : messageIds) { if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - addIndividualAcknowledgment(batchMessageId.toMessageIdImpl(), - batchMessageId, - this::doIndividualAckAsync, - this::doIndividualBatchAckAsync); - } else if (messageId instanceof MessageIdImpl) { - addIndividualAcknowledgment((MessageIdImpl) messageId, - null, - this::doIndividualAckAsync, - this::doIndividualBatchAckAsync); + doIndividualBatchAckAsync((BatchMessageIdImpl) messageId); } else { - throw new IllegalStateException("Unsupported message id type in addListAcknowledgement: " - + messageId.getClass().getCanonicalName()); + doIndividualAckAsync(messageId); } } } @@ -183,67 +167,21 @@ private void addListAcknowledgment(List messageIds) { @Override public CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties) { - if (msgId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId; - return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId); + checkArgument(!(msgId instanceof BatchMessageIdImpl)); + if (ackType == AckType.Individual) { + return doIndividualAck(msgId, properties); } else { - return addAcknowledgment(msgId, ackType, properties, null); + return doCumulativeAck(msgId, properties, null); } } - private CompletableFuture addIndividualAcknowledgment( - MessageIdImpl msgId, - @Nullable BatchMessageIdImpl batchMessageId, - Function> individualAckFunction, - Function> batchAckFunction) { - if (batchMessageId != null) { - consumer.onAcknowledge(batchMessageId, null); - } else { - consumer.onAcknowledge(msgId, null); - } - if (batchMessageId == null || batchMessageId.ackIndividual()) { - consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1); - consumer.getUnAckedMessageTracker().remove(msgId); - if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) { - consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId); - } - return individualAckFunction.apply(msgId); - } else if (batchIndexAckEnabled) { - return batchAckFunction.apply(batchMessageId); + @Override + public CompletableFuture addBatchIndexAck(BatchMessageIdImpl msgId, AckType ackType, + Map properties) { + if (ackType == AckType.Individual) { + return doIndividualBatchAck(msgId, properties); } else { - return CompletableFuture.completedFuture(null); - } - } - - private CompletableFuture addAcknowledgment(MessageIdImpl msgId, - AckType ackType, - Map properties, - @Nullable BatchMessageIdImpl batchMessageId) { - switch (ackType) { - case Individual: - return addIndividualAcknowledgment(msgId, - batchMessageId, - __ -> doIndividualAck(__, properties), - __ -> doIndividualBatchAck(__, properties)); - case Cumulative: - if (batchMessageId != null) { - consumer.onAcknowledgeCumulative(batchMessageId, null); - } else { - consumer.onAcknowledgeCumulative(msgId, null); - } - if (batchMessageId == null || batchMessageId.ackCumulative()) { - return doCumulativeAck(msgId, properties, null); - } else if (batchIndexAckEnabled) { - return doCumulativeBatchIndexAck(batchMessageId, properties); - } else { - if (!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) { - doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null); - batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true); - } - return CompletableFuture.completedFuture(null); - } - default: - throw new IllegalStateException("Unknown AckType: " + ackType); + return doCumulativeBatchIndexAck(msgId, properties); } } @@ -267,10 +205,9 @@ private CompletableFuture doIndividualAck(MessageIdImpl messageId, Map doIndividualAckAsync(MessageIdImpl messageId) { + private void doIndividualAckAsync(MessageIdImpl messageId) { pendingIndividualAcks.add(messageId); pendingIndividualBatchIndexAcks.remove(messageId); - return CompletableFuture.completedFuture(null); } private CompletableFuture doIndividualBatchAck(BatchMessageIdImpl batchMessageId, @@ -298,7 +235,6 @@ private CompletableFuture doIndividualBatchAck(BatchMessageIdImpl batchMes private CompletableFuture doCumulativeAck(MessageIdImpl messageId, Map properties, BitSetRecyclable bitSet) { - consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId)); if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an // uncommon condition since it's only used for the compaction subscription. @@ -314,7 +250,7 @@ private CompletableFuture doCumulativeAck(MessageIdImpl messageId, Map doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { + private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( batchMessageId.toMessageIdImpl(), __ -> { ConcurrentBitSetRecyclable value; @@ -323,12 +259,11 @@ private CompletableFuture doIndividualBatchAckAsync(BatchMessageIdImpl bat value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet()); } else { value = ConcurrentBitSetRecyclable.create(); - value.set(0, batchMessageId.getOriginalBatchSize()); + value.set(0, batchMessageId.getBatchSize()); } return value; }); bitSet.clear(batchMessageId.getBatchIndex()); - return CompletableFuture.completedFuture(null); } private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index ddca6951e49e1..51e3a5099cc11 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -154,51 +154,30 @@ public void testBatchAckTracker(boolean isNeedReceipt) throws Exception { MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); MessageIdImpl msg3 = new MessageIdImpl(5, 3, 0); - MessageIdImpl msg4 = new MessageIdImpl(5, 4, 0); - MessageIdImpl msg5 = new MessageIdImpl(5, 5, 0); - MessageIdImpl msg6 = new MessageIdImpl(5, 6, 0); assertFalse(tracker.isDuplicate(msg1)); - tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg1), Collections.emptyMap()); assertTrue(tracker.isDuplicate(msg1)); assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg5), AckType.Cumulative, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); - // Flush while disconnected. the internal tracking will not change tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg6), AckType.Individual, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg6)); + tracker.addListAcknowledgment(Collections.singletonList(msg3), Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg3)); when(consumer.getClientCnx()).thenReturn(cnx); tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); + assertFalse(tracker.isDuplicate(msg3)); tracker.close(); } @@ -247,7 +226,7 @@ public void testImmediateBatchAckingTracker(boolean isNeedReceipt) throws Except when(consumer.getClientCnx()).thenReturn(null); - tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg1), Collections.emptyMap()); assertTrue(tracker.isDuplicate(msg1)); when(consumer.getClientCnx()).thenReturn(cnx); @@ -255,7 +234,7 @@ public void testImmediateBatchAckingTracker(boolean isNeedReceipt) throws Except tracker.flush(); assertFalse(tracker.isDuplicate(msg1)); - tracker.addListAcknowledgment(Collections.singletonList(msg2), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg2), Collections.emptyMap()); tracker.flush(); // Since we were connected, the ack went out immediately @@ -337,52 +316,31 @@ public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws Exception MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); - MessageIdImpl msg3 = new MessageIdImpl(5, 3, 0); - MessageIdImpl msg4 = new MessageIdImpl(5, 4, 0); - MessageIdImpl msg5 = new MessageIdImpl(5, 5, 0); - MessageIdImpl msg6 = new MessageIdImpl(5, 6, 0); + MessageIdImpl msg3 = new MessageIdImpl(5, 6, 0); assertFalse(tracker.isDuplicate(msg1)); - tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg1), Collections.emptyMap()); assertTrue(tracker.isDuplicate(msg1)); assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg5), AckType.Cumulative, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); - // Flush while disconnected. the internal tracking will not change tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg6), AckType.Individual, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg6)); + tracker.addListAcknowledgment(Collections.singletonList(msg3), Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg3)); when(consumer.getClientCnx()).thenReturn(cnx); tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); + assertFalse(tracker.isDuplicate(msg3)); tracker.close(); }