From 1eb278ddd04857977137e986048f1057fa1e0832 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 22 Dec 2022 23:10:45 +0800 Subject: [PATCH 1/7] [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure Fixes https://github.com/apache/pulsar/issues/19030 ### Motivation When a `BatchMessageIdImpl` is created from a deserialization, the `BatchMessageAcker` object cannot be shared by all instances in the same batch, which leads to an acknowledgment failure when batch index ACK is disabled (by default). ### Modifications Maintain a map from the `(ledger id, entry id)` pair to the `BatchMessageAcker` in `ConsumerImpl`. If the `BatchMessageIdImpl` doesn't carry a valid `BatchMessageAcker`, create and cache a `BatchMessageAcker` instance and remove it when all messages in the batch are acknowledged. It requires a change in `MessageIdImpl#fromByteArray` that a `BatchMessageAckerDisabled` will be created to indicate there is no shared acker. To avoid making code more complicated, this patch refactors the existing code that many logics about consumer are moved from the ACK tracker to the consumer. It also removes the `AckType` parameter when acknowledging a list of messages. --- .../api/MessageIdSerializationTest.java | 105 ++++++++++++++ .../impl/AcknowledgmentsGroupingTracker.java | 8 +- .../pulsar/client/impl/ConsumerBase.java | 10 +- .../pulsar/client/impl/ConsumerImpl.java | 96 ++++++++++++- .../pulsar/client/impl/MessageIdImpl.java | 11 +- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- ...rsistentAcknowledgmentGroupingTracker.java | 3 +- ...sistentAcknowledgmentsGroupingTracker.java | 134 ++++-------------- .../AcknowledgementsGroupingTrackerTest.java | 80 +++-------- 9 files changed, 262 insertions(+), 187 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java new file mode 100644 index 0000000000000..ae68a520044e6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Cleanup; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +public class MessageIdSerializationTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testSerialization() throws Exception { + String topic = "test-serialization-origin"; + @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxMessages(100) + .batchingMaxPublishDelay(1, TimeUnit.DAYS) + .create(); + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .isAckReceiptEnabled(true) + .subscribe(); + + final int numMessages = 10; + for (int i = 0; i < numMessages; i++) { + producer.sendAsync(i); + } + producer.flush(); + final List msgIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + msgIds.add(consumer.receive().getMessageId()); + } + final AtomicLong ledgerId = new AtomicLong(-1L); + final AtomicLong entryId = new AtomicLong(-1L); + for (int i = 0; i < numMessages; i++) { + assertTrue(msgIds.get(i) instanceof BatchMessageIdImpl); + final BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgIds.get(i); + ledgerId.compareAndSet(-1L, batchMessageId.getLedgerId()); + assertEquals(batchMessageId.getLedgerId(), ledgerId.get()); + entryId.compareAndSet(-1L, batchMessageId.getEntryId()); + assertEquals(batchMessageId.getEntryId(), entryId.get()); + assertEquals(batchMessageId.getBatchSize(), numMessages); + } + + final List deserializedMsgIds = new ArrayList<>(); + for (MessageId msgId : msgIds) { + MessageId deserialized = MessageId.fromByteArray(msgId.toByteArray()); + assertTrue(deserialized instanceof BatchMessageIdImpl); + deserializedMsgIds.add(deserialized); + } + for (MessageId msgId : deserializedMsgIds) { + consumer.acknowledge(msgId); + } + consumer.close(); + + consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .isAckReceiptEnabled(true) + .subscribe(); + MessageId newMsgId = producer.send(0); + MessageId receivedMessageId = consumer.receive().getMessageId(); + assertEquals(newMsgId, receivedMessageId); + consumer.close(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java index d46af1a99e7f0..d9e2f3e4c7bcd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java @@ -33,8 +33,12 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable { CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties); - CompletableFuture addListAcknowledgment(List messageIds, AckType ackType, - Map properties); + default CompletableFuture addBatchIndexAck(BatchMessageIdImpl msgId, AckType ackType, + Map properties) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture addListAcknowledgment(List messageIds, Map properties); void flush(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index dd4932ec2bca4..ef514f4389f99 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -542,12 +542,12 @@ public CompletableFuture acknowledgeAsync(Messages messages, Transactio @Override public CompletableFuture acknowledgeAsync(List messageIdList) { - return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null); + return doAcknowledgeWithTxn(messageIdList, Collections.emptyMap(), null); } @Override public CompletableFuture acknowledgeAsync(List messageIdList, Transaction txn) { - return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), (TransactionImpl) txn); + return doAcknowledgeWithTxn(messageIdList, Collections.emptyMap(), (TransactionImpl) txn); } @Override @@ -655,17 +655,17 @@ public void negativeAcknowledge(Message message) { negativeAcknowledge(message.getMessageId()); } - protected CompletableFuture doAcknowledgeWithTxn(List messageIdList, AckType ackType, + protected CompletableFuture doAcknowledgeWithTxn(List messageIdList, Map properties, TransactionImpl txn) { CompletableFuture ackFuture; if (txn != null && this instanceof ConsumerImpl) { ackFuture = txn.registerAckedTopic(getTopic(), subscription) - .thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn)); + .thenCompose(ignored -> doAcknowledge(messageIdList, AckType.Individual, properties, txn)); // register the ackFuture as part of the transaction txn.registerAckOp(ackFuture); } else { - ackFuture = doAcknowledge(messageIdList, ackType, properties, txn); + ackFuture = doAcknowledge(messageIdList, AckType.Individual, properties, txn); } return ackFuture; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2680e70703022..ac79550aa317f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -62,9 +62,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; import lombok.AccessLevel; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.DeadLetterPolicy; @@ -204,6 +206,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List previousExceptions = new CopyOnWriteArrayList(); + // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged + private final Map, BatchMessageAcker> batchMessageToAcker = new ConcurrentHashMap<>(); + static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, @@ -529,6 +534,51 @@ protected CompletableFuture> internalBatchReceiveAsync() { return result; } + private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) { + if (ackType == AckType.Individual) { + stats.incrementNumAcksSent(numMessages); + unAckedMessageTracker.remove(messageId); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.remove(messageId); + } + } else { + stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId)); + } + } + + private @Nullable MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) { + final BatchMessageAcker acker; + if (messageId.getAcker() instanceof BatchMessageAckerDisabled) { + acker = batchMessageToAcker.computeIfAbsent( + Pair.of(messageId.getLedgerId(), messageId.getEntryId()), + __ -> BatchMessageAcker.newAcker(messageId.getOriginalBatchSize())); + } else { + acker = messageId.getAcker(); + } + if (ackType == AckType.Individual) { + if (acker.ackIndividual(messageId.getBatchIndex())) { + batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId())); + return messageId.toMessageIdImpl(); + } else { + return conf.isBatchIndexAckEnabled() ? messageId : null; + } + } else { + if (acker.ackCumulative(messageId.getBatchIndex())) { + batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId())); + return messageId.toMessageIdImpl(); + } else if (conf.isBatchIndexAckEnabled()) { + return messageId; + } else { + if (acker.isPrevBatchCumulativelyAcked()) { + return null; + } else { + acker.setPrevBatchCumulativelyAcked(true); + return messageId.prevBatchMessageId(); + } + } + } + } + @Override protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties, @@ -549,13 +599,34 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); } - return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties); + if (ackType == AckType.Individual) { + onAcknowledge(messageId, null); + } else { + onAcknowledgeCumulative(messageId, null); + } + if (messageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType); + if (messageIdImpl == null) { + return CompletableFuture.completedFuture(null); + } else if (messageIdImpl instanceof BatchMessageIdImpl) { + return acknowledgmentsGroupingTracker.addBatchIndexAck( + (BatchMessageIdImpl) messageIdImpl, ackType, properties); + } else { + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getOriginalBatchSize()); + return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties); + } + } else { + MessageIdImpl messageIdImpl = (MessageIdImpl) messageId; + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1); + return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties); + } } @Override protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { - + List messageIdListToAck = new ArrayList<>(); for (MessageId messageId : messageIdList) { checkArgument(messageId instanceof MessageIdImpl); } @@ -573,7 +644,26 @@ protected CompletableFuture doAcknowledge(List messageIdList, A return doTransactionAcknowledgeForResponse(messageIdList, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); } else { - return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, ackType, properties); + for (MessageId messageId : messageIdList) { + checkArgument(messageId instanceof MessageIdImpl); + onAcknowledge(messageId, null); + if (messageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType); + if (messageIdImpl != null) { + if (!(messageIdImpl instanceof BatchMessageIdImpl)) { + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, + batchMessageId.getOriginalBatchSize()); + } // else: batch index ACK + messageIdListToAck.add(messageIdImpl); + } + } else { + MessageIdImpl messageIdImpl = (MessageIdImpl) messageId; + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1); + messageIdListToAck.add(messageIdImpl); + } + } + return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdListToAck, properties); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 02298e0f9d66d..1ffb5fb12385d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -95,14 +95,9 @@ public static MessageId fromByteArray(byte[] data) throws IOException { } MessageIdImpl messageId; - if (idData.hasBatchIndex()) { - if (idData.hasBatchSize()) { - messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), - idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize())); - } else { - messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), - idData.getBatchIndex()); - } + if (idData.hasBatchIndex() && idData.hasBatchSize()) { + messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), + idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAckerDisabled.INSTANCE); } else if (idData.hasFirstChunkMessageId()) { MessageIdData firstChunkIdData = idData.getFirstChunkMessageId(); messageId = new ChunkMessageIdImpl( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 224276ba5f08f..2105fec21589f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -493,7 +493,7 @@ protected CompletableFuture doAcknowledge(List messageIdList, } topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> { ConsumerImpl consumer = consumers.get(topicPartitionName); - resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) + resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, properties, txn) .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); }); return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java index 32f8fb922304b..6faf95b5b6b36 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java @@ -50,8 +50,7 @@ public CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ac } @Override - public CompletableFuture addListAcknowledgment(List messageIds, - AckType ackType, + public CompletableFuture addListAcknowledgment(List messageIds, Map properties) { // no-op return CompletableFuture.completedFuture(null); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index fef0bcb8906f1..efd77e8a18221 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -18,18 +18,17 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -37,8 +36,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import javax.annotation.Nullable; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Triple; @@ -83,7 +80,6 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments private final ConcurrentHashMap pendingIndividualBatchIndexAcks; private final ScheduledFuture scheduledTask; - private final boolean batchIndexAckEnabled; private final boolean ackReceiptEnabled; public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, ConsumerConfigurationData conf, @@ -93,7 +89,6 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>(); this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros(); this.maxAckGroupSize = conf.getMaxAcknowledgmentGroupSize(); - this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled(); this.ackReceiptEnabled = conf.isAckReceiptEnabled(); this.currentIndividualAckFuture = new TimedCompletableFuture<>(); this.currentCumulativeAckFuture = new TimedCompletableFuture<>(); @@ -129,53 +124,31 @@ public boolean isDuplicate(MessageId messageId) { } @Override - public CompletableFuture addListAcknowledgment(List messageIds, - AckType ackType, Map properties) { - if (AckType.Cumulative.equals(ackType)) { - if (consumer.isAckReceiptEnabled()) { - Set> completableFutureSet = new HashSet<>(); - messageIds.forEach(messageId -> - completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties))); - return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet)); + public CompletableFuture addListAcknowledgment(List messageIds, + Map properties) { + Optional readLock = acquireReadLock(); + try { + if (messageIds.size() != 0) { + addListAcknowledgment(messageIds); + return readLock.map(__ -> currentIndividualAckFuture) + .orElse(CompletableFuture.completedFuture(null)); } else { - messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties)); return CompletableFuture.completedFuture(null); } - } else { - Optional readLock = acquireReadLock(); - try { - if (messageIds.size() != 0) { - addListAcknowledgment(messageIds); - return readLock.map(__ -> currentIndividualAckFuture) - .orElse(CompletableFuture.completedFuture(null)); - } else { - return CompletableFuture.completedFuture(null); - } - } finally { - readLock.ifPresent(Lock::unlock); - if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= maxAckGroupSize) { - flush(); - } + } finally { + readLock.ifPresent(Lock::unlock); + if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= maxAckGroupSize) { + flush(); } } } - private void addListAcknowledgment(List messageIds) { - for (MessageId messageId : messageIds) { + private void addListAcknowledgment(List messageIds) { + for (MessageIdImpl messageId : messageIds) { if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - addIndividualAcknowledgment(batchMessageId.toMessageIdImpl(), - batchMessageId, - this::doIndividualAckAsync, - this::doIndividualBatchAckAsync); - } else if (messageId instanceof MessageIdImpl) { - addIndividualAcknowledgment((MessageIdImpl) messageId, - null, - this::doIndividualAckAsync, - this::doIndividualBatchAckAsync); + doIndividualBatchAckAsync((BatchMessageIdImpl) messageId); } else { - throw new IllegalStateException("Unsupported message id type in addListAcknowledgement: " - + messageId.getClass().getCanonicalName()); + doIndividualAckAsync(messageId); } } } @@ -183,67 +156,21 @@ private void addListAcknowledgment(List messageIds) { @Override public CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties) { - if (msgId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId; - return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId); + checkArgument(!(msgId instanceof BatchMessageIdImpl)); + if (ackType == AckType.Individual) { + return doIndividualAck(msgId, properties); } else { - return addAcknowledgment(msgId, ackType, properties, null); + return doCumulativeAck(msgId, properties, null); } } - private CompletableFuture addIndividualAcknowledgment( - MessageIdImpl msgId, - @Nullable BatchMessageIdImpl batchMessageId, - Function> individualAckFunction, - Function> batchAckFunction) { - if (batchMessageId != null) { - consumer.onAcknowledge(batchMessageId, null); - } else { - consumer.onAcknowledge(msgId, null); - } - if (batchMessageId == null || batchMessageId.ackIndividual()) { - consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1); - consumer.getUnAckedMessageTracker().remove(msgId); - if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) { - consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId); - } - return individualAckFunction.apply(msgId); - } else if (batchIndexAckEnabled) { - return batchAckFunction.apply(batchMessageId); + @Override + public CompletableFuture addBatchIndexAck(BatchMessageIdImpl msgId, AckType ackType, + Map properties) { + if (ackType == AckType.Individual) { + return doIndividualBatchAck(msgId, properties); } else { - return CompletableFuture.completedFuture(null); - } - } - - private CompletableFuture addAcknowledgment(MessageIdImpl msgId, - AckType ackType, - Map properties, - @Nullable BatchMessageIdImpl batchMessageId) { - switch (ackType) { - case Individual: - return addIndividualAcknowledgment(msgId, - batchMessageId, - __ -> doIndividualAck(__, properties), - __ -> doIndividualBatchAck(__, properties)); - case Cumulative: - if (batchMessageId != null) { - consumer.onAcknowledgeCumulative(batchMessageId, null); - } else { - consumer.onAcknowledgeCumulative(msgId, null); - } - if (batchMessageId == null || batchMessageId.ackCumulative()) { - return doCumulativeAck(msgId, properties, null); - } else if (batchIndexAckEnabled) { - return doCumulativeBatchIndexAck(batchMessageId, properties); - } else { - if (!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) { - doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null); - batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true); - } - return CompletableFuture.completedFuture(null); - } - default: - throw new IllegalStateException("Unknown AckType: " + ackType); + return doCumulativeBatchIndexAck(msgId, properties); } } @@ -267,10 +194,9 @@ private CompletableFuture doIndividualAck(MessageIdImpl messageId, Map doIndividualAckAsync(MessageIdImpl messageId) { + private void doIndividualAckAsync(MessageIdImpl messageId) { pendingIndividualAcks.add(messageId); pendingIndividualBatchIndexAcks.remove(messageId); - return CompletableFuture.completedFuture(null); } private CompletableFuture doIndividualBatchAck(BatchMessageIdImpl batchMessageId, @@ -298,7 +224,6 @@ private CompletableFuture doIndividualBatchAck(BatchMessageIdImpl batchMes private CompletableFuture doCumulativeAck(MessageIdImpl messageId, Map properties, BitSetRecyclable bitSet) { - consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId)); if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an // uncommon condition since it's only used for the compaction subscription. @@ -314,7 +239,7 @@ private CompletableFuture doCumulativeAck(MessageIdImpl messageId, Map doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { + private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( batchMessageId.toMessageIdImpl(), __ -> { ConcurrentBitSetRecyclable value; @@ -328,7 +253,6 @@ private CompletableFuture doIndividualBatchAckAsync(BatchMessageIdImpl bat return value; }); bitSet.clear(batchMessageId.getBatchIndex()); - return CompletableFuture.completedFuture(null); } private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index ddca6951e49e1..51e3a5099cc11 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -154,51 +154,30 @@ public void testBatchAckTracker(boolean isNeedReceipt) throws Exception { MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); MessageIdImpl msg3 = new MessageIdImpl(5, 3, 0); - MessageIdImpl msg4 = new MessageIdImpl(5, 4, 0); - MessageIdImpl msg5 = new MessageIdImpl(5, 5, 0); - MessageIdImpl msg6 = new MessageIdImpl(5, 6, 0); assertFalse(tracker.isDuplicate(msg1)); - tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg1), Collections.emptyMap()); assertTrue(tracker.isDuplicate(msg1)); assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg5), AckType.Cumulative, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); - // Flush while disconnected. the internal tracking will not change tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg6), AckType.Individual, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg6)); + tracker.addListAcknowledgment(Collections.singletonList(msg3), Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg3)); when(consumer.getClientCnx()).thenReturn(cnx); tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); + assertFalse(tracker.isDuplicate(msg3)); tracker.close(); } @@ -247,7 +226,7 @@ public void testImmediateBatchAckingTracker(boolean isNeedReceipt) throws Except when(consumer.getClientCnx()).thenReturn(null); - tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg1), Collections.emptyMap()); assertTrue(tracker.isDuplicate(msg1)); when(consumer.getClientCnx()).thenReturn(cnx); @@ -255,7 +234,7 @@ public void testImmediateBatchAckingTracker(boolean isNeedReceipt) throws Except tracker.flush(); assertFalse(tracker.isDuplicate(msg1)); - tracker.addListAcknowledgment(Collections.singletonList(msg2), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg2), Collections.emptyMap()); tracker.flush(); // Since we were connected, the ack went out immediately @@ -337,52 +316,31 @@ public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws Exception MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); - MessageIdImpl msg3 = new MessageIdImpl(5, 3, 0); - MessageIdImpl msg4 = new MessageIdImpl(5, 4, 0); - MessageIdImpl msg5 = new MessageIdImpl(5, 5, 0); - MessageIdImpl msg6 = new MessageIdImpl(5, 6, 0); + MessageIdImpl msg3 = new MessageIdImpl(5, 6, 0); assertFalse(tracker.isDuplicate(msg1)); - tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg1), Collections.emptyMap()); assertTrue(tracker.isDuplicate(msg1)); assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg5), AckType.Cumulative, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); - // Flush while disconnected. the internal tracking will not change tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg6), AckType.Individual, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg6)); + tracker.addListAcknowledgment(Collections.singletonList(msg3), Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg3)); when(consumer.getClientCnx()).thenReturn(cnx); tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); + assertFalse(tracker.isDuplicate(msg3)); tracker.close(); } From 3628ecc3b20749af966911aedfd074de57a0f927 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 23 Dec 2022 16:22:36 +0800 Subject: [PATCH 2/7] Fix getBatchSize() method --- .../pulsar/client/impl/BatchMessageIdImpl.java | 12 ------------ .../org/apache/pulsar/client/impl/ConsumerImpl.java | 7 +++---- .../PersistentAcknowledgmentsGroupingTracker.java | 2 +- 3 files changed, 4 insertions(+), 17 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index 7e3a143dff8e0..c4aada5ca199f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -105,23 +105,11 @@ public byte[] toByteArray() { return toByteArray(batchIndex, batchSize); } - public boolean ackIndividual() { - return acker.ackIndividual(batchIndex); - } - public boolean ackCumulative() { return acker.ackCumulative(batchIndex); } - public int getOutstandingAcksInSameBatch() { - return acker.getOutstandingAcks(); - } - public int getBatchSize() { - return acker.getBatchSize(); - } - - public int getOriginalBatchSize() { return this.batchSize; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ac79550aa317f..58b8c0195a697 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -551,7 +551,7 @@ private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType if (messageId.getAcker() instanceof BatchMessageAckerDisabled) { acker = batchMessageToAcker.computeIfAbsent( Pair.of(messageId.getLedgerId(), messageId.getEntryId()), - __ -> BatchMessageAcker.newAcker(messageId.getOriginalBatchSize())); + __ -> BatchMessageAcker.newAcker(messageId.getBatchSize())); } else { acker = messageId.getAcker(); } @@ -613,7 +613,7 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return acknowledgmentsGroupingTracker.addBatchIndexAck( (BatchMessageIdImpl) messageIdImpl, ackType, properties); } else { - processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getOriginalBatchSize()); + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getBatchSize()); return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties); } } else { @@ -652,8 +652,7 @@ protected CompletableFuture doAcknowledge(List messageIdList, A MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType); if (messageIdImpl != null) { if (!(messageIdImpl instanceof BatchMessageIdImpl)) { - processMessageIdBeforeAcknowledge(messageIdImpl, ackType, - batchMessageId.getOriginalBatchSize()); + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getBatchSize()); } // else: batch index ACK messageIdListToAck.add(messageIdImpl); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index efd77e8a18221..1fec87ffa02b6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -248,7 +248,7 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet()); } else { value = ConcurrentBitSetRecyclable.create(); - value.set(0, batchMessageId.getOriginalBatchSize()); + value.set(0, batchMessageId.getBatchSize()); } return value; }); From 4b3a840b68136c0254a01f93f88d3fcde479c72f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 27 Dec 2022 16:12:34 +0800 Subject: [PATCH 3/7] Avoid unnecessary initialization and argument check --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 58b8c0195a697..1f0578efd4599 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -626,7 +626,6 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack @Override protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { - List messageIdListToAck = new ArrayList<>(); for (MessageId messageId : messageIdList) { checkArgument(messageId instanceof MessageIdImpl); } @@ -644,8 +643,8 @@ protected CompletableFuture doAcknowledge(List messageIdList, A return doTransactionAcknowledgeForResponse(messageIdList, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); } else { + List messageIdListToAck = new ArrayList<>(); for (MessageId messageId : messageIdList) { - checkArgument(messageId instanceof MessageIdImpl); onAcknowledge(messageId, null); if (messageId instanceof BatchMessageIdImpl) { BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; From 5fb4b9b4390117aae2cabb25f4ee546d54fdd4bc Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 28 Dec 2022 09:25:54 +0800 Subject: [PATCH 4/7] fix tests --- .../java/org/apache/pulsar/client/impl/MessageIdImpl.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 1ffb5fb12385d..6f03da11ce156 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -95,9 +95,10 @@ public static MessageId fromByteArray(byte[] data) throws IOException { } MessageIdImpl messageId; - if (idData.hasBatchIndex() && idData.hasBatchSize()) { + if (idData.hasBatchIndex()) { + int batchSize = idData.hasBatchSize() ? idData.getBatchSize() : -1; messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), - idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAckerDisabled.INSTANCE); + idData.getBatchIndex(), batchSize, BatchMessageAckerDisabled.INSTANCE); } else if (idData.hasFirstChunkMessageId()) { MessageIdData firstChunkIdData = idData.getFirstChunkMessageId(); messageId = new ChunkMessageIdImpl( From f40926791001637f6d93680e7db865c205e555c0 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 28 Dec 2022 16:15:11 +0800 Subject: [PATCH 5/7] Adjust @Nullable position Co-authored-by: tison --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 27ab1b6af67d7..3fefed580efc1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -546,7 +546,8 @@ private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType } } - private @Nullable MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) { + @Nullable + private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) { final BatchMessageAcker acker; if (messageId.getAcker() instanceof BatchMessageAckerDisabled) { acker = batchMessageToAcker.computeIfAbsent( From 6ed02869bb206bad4718f9fe8740658596a179e6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 29 Dec 2022 13:53:48 +0800 Subject: [PATCH 6/7] Remove BatchMessageAcker for cumulative ACK --- .../api/MessageIdSerializationTest.java | 105 ------------- .../impl/MessageIdSerializationTest.java | 147 ++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 26 +++- 3 files changed, 170 insertions(+), 108 deletions(-) delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java deleted file mode 100644 index ae68a520044e6..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import lombok.Cleanup; -import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -@Test(groups = "broker-api") -public class MessageIdSerializationTest extends ProducerConsumerBase { - - @BeforeClass - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - - @Test(timeOut = 30000) - public void testSerialization() throws Exception { - String topic = "test-serialization-origin"; - @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) - .topic(topic) - .batchingMaxMessages(100) - .batchingMaxPublishDelay(1, TimeUnit.DAYS) - .create(); - Consumer consumer = pulsarClient.newConsumer(Schema.INT32) - .topic(topic) - .subscriptionName("sub") - .isAckReceiptEnabled(true) - .subscribe(); - - final int numMessages = 10; - for (int i = 0; i < numMessages; i++) { - producer.sendAsync(i); - } - producer.flush(); - final List msgIds = new ArrayList<>(); - for (int i = 0; i < numMessages; i++) { - msgIds.add(consumer.receive().getMessageId()); - } - final AtomicLong ledgerId = new AtomicLong(-1L); - final AtomicLong entryId = new AtomicLong(-1L); - for (int i = 0; i < numMessages; i++) { - assertTrue(msgIds.get(i) instanceof BatchMessageIdImpl); - final BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgIds.get(i); - ledgerId.compareAndSet(-1L, batchMessageId.getLedgerId()); - assertEquals(batchMessageId.getLedgerId(), ledgerId.get()); - entryId.compareAndSet(-1L, batchMessageId.getEntryId()); - assertEquals(batchMessageId.getEntryId(), entryId.get()); - assertEquals(batchMessageId.getBatchSize(), numMessages); - } - - final List deserializedMsgIds = new ArrayList<>(); - for (MessageId msgId : msgIds) { - MessageId deserialized = MessageId.fromByteArray(msgId.toByteArray()); - assertTrue(deserialized instanceof BatchMessageIdImpl); - deserializedMsgIds.add(deserialized); - } - for (MessageId msgId : deserializedMsgIds) { - consumer.acknowledge(msgId); - } - consumer.close(); - - consumer = pulsarClient.newConsumer(Schema.INT32) - .topic(topic) - .subscriptionName("sub") - .isAckReceiptEnabled(true) - .subscribe(); - MessageId newMsgId = producer.send(0); - MessageId receivedMessageId = consumer.receive().getMessageId(); - assertEquals(newMsgId, receivedMessageId); - consumer.close(); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java new file mode 100644 index 0000000000000..515dbfa29952a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +public class MessageIdSerializationTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testAcknowledge() throws Exception { + String topic = "test-acknowledge-" + System.currentTimeMillis(); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxMessages(100) + .batchingMaxPublishDelay(1, TimeUnit.DAYS) + .create(); + ConsumerImpl consumer = createConsumer(topic); + final int numMessages = 10; + for (int i = 0; i < numMessages; i++) { + producer.sendAsync(i); + } + producer.flush(); + final List msgIds = new ArrayList<>(); + final Set> positions = new TreeSet<>(); + receiveAtMost(consumer, numMessages, msgIds, positions); + assertEquals(positions.size(), 1); + + for (int i = 0; i < msgIds.size(); i++) { + consumer.acknowledge(msgIds.get(i)); + if (i < msgIds.size() - 1) { + assertEquals(consumer.getBatchMessageToAckerSize(), 1); + } else { + assertEquals(consumer.getBatchMessageToAckerSize(), 0); + } + } + consumer.close(); + + consumer = createConsumer(topic); + MessageId newMsgId = producer.send(0); + MessageId receivedMessageId = consumer.receive().getMessageId(); + assertEquals(newMsgId, receivedMessageId); + consumer.close(); + } + + @Test(timeOut = 30000) + public void testAcknowledgeCumulative() throws Exception { + String topic = "test-acknowledge-cumulative-" + System.currentTimeMillis(); + final int batchingMaxMessages = 10; + @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxMessages(batchingMaxMessages) + .batchingMaxPublishDelay(1, TimeUnit.DAYS) + .create(); + ConsumerImpl consumer = createConsumer(topic); + // send 3 batches + for (int i = 0; i < batchingMaxMessages * 2; i++) { + producer.sendAsync(i); + } + producer.flush(); + final List msgIds = new ArrayList<>(); + final Set> positions = new TreeSet<>(); + receiveAtMost(consumer, batchingMaxMessages * 2, msgIds, positions); + assertEquals(positions.size(), 2); + + consumer.acknowledgeCumulative(msgIds.get(2)); + assertEquals(consumer.getBatchMessageToAckerSize(), 1); + for (int i = batchingMaxMessages; i < batchingMaxMessages * 2; i++) { + consumer.acknowledgeCumulative(msgIds.get(i)); + if (i < batchingMaxMessages * 2 - 1) { + assertEquals(consumer.getBatchMessageToAckerSize(), 1); + } else { + assertEquals(consumer.getBatchMessageToAckerSize(), 0); + } + } + consumer.close(); + consumer = createConsumer(topic); + MessageId newMsgId = producer.send(0); + MessageId receivedMessageId = consumer.receive().getMessageId(); + assertEquals(newMsgId, receivedMessageId); + consumer.close(); + } + + private ConsumerImpl createConsumer(String topic) throws PulsarClientException { + return (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32).topic(topic) + .subscriptionName("sub").isAckReceiptEnabled(true).subscribe(); + } + + private static void receiveAtMost(Consumer consumer, int numMessages, List msgIds, + Set> positions) throws IOException { + for (int i = 0; i < numMessages; i++) { + final MessageIdImpl messageId = (MessageIdImpl) consumer.receive().getMessageId(); + MessageId deserialized = MessageId.fromByteArray(messageId.toByteArray()); + assertTrue(deserialized instanceof BatchMessageIdImpl); + msgIds.add(deserialized); + positions.add(Pair.of(messageId.getLedgerId(), messageId.getEntryId())); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 3fefed580efc1..5fd5f4e125e57 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -48,6 +48,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -207,7 +209,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List previousExceptions = new CopyOnWriteArrayList(); // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged - private final Map, BatchMessageAcker> batchMessageToAcker = new ConcurrentHashMap<>(); + private final ConcurrentNavigableMap, BatchMessageAcker> batchMessageToAcker = + new ConcurrentSkipListMap<>(); static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic, @@ -546,6 +549,16 @@ private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType } } + private void removeBatchMessageAckerUntil(Pair position) { + for (Pair key : batchMessageToAcker.keySet()) { + if (key.compareTo(position) <= 0) { + batchMessageToAcker.remove(key); + } else { + break; + } + } + } + @Nullable private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) { final BatchMessageAcker acker; @@ -565,7 +578,7 @@ private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, Ac } } else { if (acker.ackCumulative(messageId.getBatchIndex())) { - batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId())); + removeBatchMessageAckerUntil(Pair.of(messageId.getLedgerId(), messageId.getEntryId())); return messageId.toMessageIdImpl(); } else if (conf.isBatchIndexAckEnabled()) { return messageId; @@ -574,7 +587,9 @@ private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, Ac return null; } else { acker.setPrevBatchCumulativelyAcked(true); - return messageId.prevBatchMessageId(); + MessageIdImpl prevMessageId = messageId.prevBatchMessageId(); + removeBatchMessageAckerUntil(Pair.of(prevMessageId.getLedgerId(), prevMessageId.getEntryId())); + return prevMessageId; } } } @@ -2868,6 +2883,11 @@ boolean isAckReceiptEnabled() { && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion()); } + @VisibleForTesting + int getBatchMessageToAckerSize() { + return batchMessageToAcker.size(); + } + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } From 15774bc14cf120d01a2edcea0b35611a7ff4359c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 29 Dec 2022 14:04:51 +0800 Subject: [PATCH 7/7] Restore API compatibility --- .../impl/AcknowledgmentsGroupingTracker.java | 4 ++++ .../pulsar/client/impl/BatchMessageIdImpl.java | 15 +++++++++++++++ ...onPersistentAcknowledgmentGroupingTracker.java | 6 ++++++ .../PersistentAcknowledgmentsGroupingTracker.java | 11 +++++++++++ 4 files changed, 36 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java index d9e2f3e4c7bcd..1200eb5c63e8f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java @@ -38,6 +38,10 @@ default CompletableFuture addBatchIndexAck(BatchMessageIdImpl msgId, AckTy return CompletableFuture.completedFuture(null); } + @Deprecated + CompletableFuture addListAcknowledgment(List messageIds, AckType ackType, + Map properties); + CompletableFuture addListAcknowledgment(List messageIds, Map properties); void flush(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index c4aada5ca199f..3ac32c5665e24 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -105,14 +105,29 @@ public byte[] toByteArray() { return toByteArray(batchIndex, batchSize); } + @Deprecated + public boolean ackIndividual() { + return acker.ackIndividual(batchIndex); + } + public boolean ackCumulative() { return acker.ackCumulative(batchIndex); } + @Deprecated + public int getOutstandingAcksInSameBatch() { + return acker.getOutstandingAcks(); + } + public int getBatchSize() { return this.batchSize; } + @Deprecated + public int getOriginalBatchSize() { + return this.batchSize; + } + public MessageIdImpl prevBatchMessageId() { return new MessageIdImpl( ledgerId, entryId - 1, partitionIndex); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java index 6faf95b5b6b36..576fe73c516a6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java @@ -49,6 +49,12 @@ public CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ac return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture addListAcknowledgment(List messageIds, AckType ackType, + Map properties) { + return CompletableFuture.completedFuture(null); + } + @Override public CompletableFuture addListAcknowledgment(List messageIds, Map properties) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 1fec87ffa02b6..c20affc11bad6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -123,6 +123,17 @@ public boolean isDuplicate(MessageId messageId) { } } + @Override + public CompletableFuture addListAcknowledgment(List messageIds, AckType ackType, + Map properties) { + checkArgument(ackType == AckType.Individual); + final List messageIdImpls = new ArrayList<>(); + for (MessageId messageId : messageIds) { + messageIdImpls.add((MessageIdImpl) messageId); + } + return addListAcknowledgment(messageIdImpls, properties); + } + @Override public CompletableFuture addListAcknowledgment(List messageIds, Map properties) {