From b217a55f437236f1466d563d3cb1230ecb6c394e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 21 Mar 2023 15:25:32 +0800 Subject: [PATCH 1/2] [fix][client] Fix NPE when acknowledging multiple messages ### Motivation For a multi-topics consumer, when it acknowledges a single message, it will first find the owner topic from its message ID. If the owner topic is not subscribed by the consumer, `NotConnectedException` will be thrown. However, when it acknowledges multiple messages, if any of them is the message whose owner topic is not subscribed by the consumer, NPE will happen instead. ### Modifications When acknowledging multiple messages, ignore the message IDs whose owner topic is not subscribed. `testAckMessageInAnotherTopic` is added to cover this case. ### TODO There are many other places that do not check if `consumers.get` returns `null`, like `doReconsumeLater`, `negativeAcknowledge`, etc. This patch does not cover them. --- .../client/api/ConsumerAckListTest.java | 36 +++++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 9 +++-- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java index d01e5b764a7a1..64c1a4a67e1fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Test(groups = "broker-api") public class ConsumerAckListTest extends ProducerConsumerBase { @@ -111,4 +112,39 @@ private void sendMessagesAsyncAndWait(Producer producer, int messages) t latch.await(); } + @Test(timeOut = 30000) + public void testAckMessageInAnotherTopic() throws Exception { + final String[] topics = { + "persistent://my-property/my-ns/test-ack-message-in-other-topic1" + UUID.randomUUID(), + "persistent://my-property/my-ns/test-ack-message-in-other-topic2" + UUID.randomUUID(), + "persistent://my-property/my-ns/test-ack-message-in-other-topic3" + UUID.randomUUID() + }; + @Cleanup final Consumer allTopicsConsumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topics) + .subscriptionName("sub1") + .subscribe(); + Consumer partialTopicsConsumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topics[0], topics[1]) + .subscriptionName("sub2") + .subscribe(); + for (String topic : topics) { + final Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + producer.send("msg"); + producer.close(); + } + final List> messages = new ArrayList<>(); + for (int i = 0; i < topics.length; i++) { + messages.add(allTopicsConsumer.receive()); + } + partialTopicsConsumer.acknowledge(messages.stream().map(Message::getMessageId).collect(Collectors.toList())); + pulsarClient.newProducer(Schema.STRING).topic(topics[0]).create().send("done"); + partialTopicsConsumer.close(); + partialTopicsConsumer = pulsarClient.newConsumer(Schema.STRING).topic(topics[0]) + .subscriptionName("sub2").subscribe(); + final Message msg = partialTopicsConsumer.receive(); + Assert.assertEquals(msg.getValue(), "done"); + partialTopicsConsumer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 341a91e97348e..03d782588c62e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -496,8 +496,13 @@ protected CompletableFuture doAcknowledge(List messageIdList, } topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> { ConsumerImpl consumer = consumers.get(topicPartitionName); - resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) - .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); + if (consumer != null) { + resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) + .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); + } else { + log.warn("MessageIds whose owner topic is {} will be discard because the consumer is not connected", + topicPartitionName); + } }); } return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); From ef9f00db1dc6a07dd86ccc1136883182646a22d3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 23 Mar 2023 15:34:07 +0800 Subject: [PATCH 2/2] Fail the acknowledge if any message id is invalid --- .../client/api/ConsumerAckListTest.java | 21 +++++++++++-------- .../client/impl/MultiTopicsConsumerImpl.java | 20 +++++++++++------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java index 64c1a4a67e1fe..baf0000be6e4f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java @@ -31,7 +31,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; @Test(groups = "broker-api") public class ConsumerAckListTest extends ProducerConsumerBase { @@ -127,24 +126,28 @@ public void testAckMessageInAnotherTopic() throws Exception { .topic(topics[0], topics[1]) .subscriptionName("sub2") .subscribe(); - for (String topic : topics) { + for (int i = 0; i < topics.length; i++) { final Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic(topic) + .topic(topics[i]) .create(); - producer.send("msg"); + producer.send("msg-" + i); producer.close(); } - final List> messages = new ArrayList<>(); + final List messageIdList = new ArrayList<>(); for (int i = 0; i < topics.length; i++) { - messages.add(allTopicsConsumer.receive()); + messageIdList.add(allTopicsConsumer.receive().getMessageId()); + } + try { + partialTopicsConsumer.acknowledge(messageIdList); + Assert.fail(); + } catch (PulsarClientException.NotConnectedException ignored) { } - partialTopicsConsumer.acknowledge(messages.stream().map(Message::getMessageId).collect(Collectors.toList())); - pulsarClient.newProducer(Schema.STRING).topic(topics[0]).create().send("done"); partialTopicsConsumer.close(); partialTopicsConsumer = pulsarClient.newConsumer(Schema.STRING).topic(topics[0]) .subscriptionName("sub2").subscribe(); + pulsarClient.newProducer(Schema.STRING).topic(topics[0]).create().send("done"); final Message msg = partialTopicsConsumer.receive(); - Assert.assertEquals(msg.getValue(), "done"); + Assert.assertEquals(msg.getValue(), "msg-0"); partialTopicsConsumer.close(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 03d782588c62e..f993304b0780a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -32,6 +32,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -494,15 +495,18 @@ protected CompletableFuture doAcknowledge(List messageIdList, topicToMessageIdMap.get(topicMessageId.getOwnerTopic()) .add(MessageIdImpl.convertToMessageIdImpl(topicMessageId)); } - topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> { - ConsumerImpl consumer = consumers.get(topicPartitionName); - if (consumer != null) { - resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) - .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); - } else { - log.warn("MessageIds whose owner topic is {} will be discard because the consumer is not connected", - topicPartitionName); + final Map, List> consumerToMessageIds = new IdentityHashMap<>(); + for (Map.Entry> entry : topicToMessageIdMap.entrySet()) { + ConsumerImpl consumer = consumers.get(entry.getKey()); + if (consumer == null) { + return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); } + // Trigger the acknowledgment later to avoid sending partial acknowledgments + consumerToMessageIds.put(consumer, entry.getValue()); + } + consumerToMessageIds.forEach((consumer, messageIds) -> { + resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) + .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); }); } return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0]));