From 758d3ff088f751b891fc32af982d40306d3b0abb Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 11 Jun 2026 14:59:22 +0800 Subject: [PATCH 1/6] [fix][broker] Fix compacted read could be stuck forever or message loss due to delayed acknowledgment --- .../persistent/PersistentSubscription.java | 12 +++ .../pulsar/compaction/CompactionTest.java | 84 +++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 236a55162347b..8771c8167094a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -58,6 +58,7 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.AbstractSubscription; import org.apache.pulsar.broker.service.AnalyzeBacklogResult; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -471,6 +472,17 @@ public CompletableFuture acknowledgeMessageAsync(List positions, .attr("position", position) .log("Cumulative ack on"); AckCallback callback = new AckCallback(previousMarkDeletePosition, future); + if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer singleConsumerDispatcher) { + // For compacted consumer, we should ignore the position that does not exist in the managed ledger, + // otherwise, the `asyncMarkDelete` call could jump the read position to the active ledger, which will + // skip all entries present in the compacted ledger but not present in the managed ledger. + final var consumer = singleConsumerDispatcher.getActiveConsumer(); + if (consumer != null + && consumer.readCompacted() + && cursor.getManagedLedger().getOptionalLedgerInfo(position.getLedgerId()).isEmpty()) { + return CompletableFuture.completedFuture(null); + } + } cursor.asyncMarkDelete(position, mergeCursorProperties(properties), callback, callback); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 6b625d1cd65c4..5be478cd0b0d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -56,6 +56,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; @@ -70,12 +71,15 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.intercept.MockBrokerInterceptor; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -87,6 +91,7 @@ import org.apache.pulsar.client.api.EncryptionKeyInfo; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; @@ -124,10 +129,12 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { protected ScheduledExecutorService compactionScheduler; protected BookKeeper bk; private PublishingOrderCompactor compactor; + private volatile java.util.function.Consumer consumerCreated = __ -> {}; @Override protected void doInitConf() throws Exception { super.doInitConf(); + conf.setSystemTopicEnabled(false); conf.setDispatcherMaxReadBatchSize(1); } @@ -135,6 +142,14 @@ protected void doInitConf() throws Exception { @Override public void setup() throws Exception { super.internalSetup(); + pulsar.getBrokerService().setInterceptor(new MockBrokerInterceptor() { + + @Override + public void consumerCreated(ServerCnx cnx, org.apache.pulsar.broker.service.Consumer consumer, + Map metadata) { + consumerCreated.accept(consumer); + } + }); admin.clusters().createCluster(configClusterName, ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); @@ -2648,4 +2663,73 @@ private void triggerAndWaitCompaction(String topic) throws Exception { Awaitility.await().untilAsserted(() -> assertEquals( admin.topics().compactionStatus(topic).status, LongRunningProcessStatus.Status.SUCCESS)); } + + @Test + public void testReceiveAckAfterReconnectionOnEmptyLedger() throws Exception { + final var topic = "persistent://my-tenant/my-ns/receive-ack-after-reconnection-on-empty-ledger"; + try (final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create()) { + for (int i = 0; i < 3; i++) { + producer.newMessage().key("key-" + i).value("value-" + i).send(); + } + } + // Trigger the ledger rollover + var ml = (ManagedLedgerImpl) ((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get() + .orElseThrow()).getManagedLedger(); + ml.getConfig().setMaxEntriesPerLedger(1); + ml.getConfig().setMaxSizePerLedgerMb(0); + ml.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS); + ml.rollCurrentLedgerIfFull(); + Awaitility.await().untilAsserted(() -> assertEquals(ml.getLedgersInfo().size(), 2)); + + @Cleanup final var reader = pulsarClient.newReader(Schema.STRING).readCompacted(true).topic(topic) + .subscriptionName("reader") + .startMessageId(MessageId.earliest).create(); + + // Slow down the pre-fetching + pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(500); + + // Receive 1 message so that the startMessageId will be reset to ledger_id:0 after reconnection + assertTrue(reader.hasMessageAvailable()); + final var firstMsg = reader.readNext(3, TimeUnit.SECONDS); + assertNotNull(firstMsg); + + triggerAndWaitCompaction(topic); + + // Trigger the reconnection and trim the first ledger. + admin.namespaces().unload("my-tenant/my-ns"); + // Simulate the pending cumulative acknowledgment is flushed after the consumer is created + // We don't need such interception if we can support controlling the acknowledgment flush for reader. + final var firstTime = new AtomicBoolean(true); + consumerCreated = serverConsumer -> { + final var subscription = serverConsumer.getSubscription(); + if (subscription.getName().contains("reader") && firstTime.compareAndSet(true, false)) { + final var msgId = (MessageIdAdv) firstMsg.getMessageId(); + subscription.acknowledgeMessageAsync(List.of(PositionFactory.create(msgId.getLedgerId(), + msgId.getEntryId())), CommandAck.AckType.Cumulative, Map.of()); + } + }; + + admin.lookups().lookupTopic(topic); + final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, true).get() + .orElseThrow(); + final var trimFuture = new CompletableFuture(); + persistentTopic.getManagedLedger().trimConsumedLedgersInBackground(trimFuture); + trimFuture.get(); + assertEquals(persistentTopic.getManagedLedger().getLedgersInfo().size(), 1); + + pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(1); + + while (reader.hasMessageAvailable()) { + final var msg = reader.readNextAsync().get(3, TimeUnit.SECONDS); + log.info().attr("id", msg.getMessageId()).attr("key", msg.getKey()) + .attr("value", msg.getValue()).log("read"); + } + + final var serverConsumer = persistentTopic.getSubscription("reader").getDispatcher().getConsumers().get(0); + assertEquals(((MessageIdAdv) serverConsumer.getStartMessageId()).getEntryId(), 0L); + + final var emptyLedgerId = persistentTopic.getManagedLedger().getLedgersInfo().lastEntry().getKey(); + assertEquals(persistentTopic.getTopicCompactionService().getLastCompactedPosition().get(), + PositionFactory.create(emptyLedgerId, -1L)); + } } From bd8ebf303be4fcf54c7e1e4759424f7251d56f36 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 11 Jun 2026 17:56:52 +0800 Subject: [PATCH 2/6] revert change on topic policies --- .../test/java/org/apache/pulsar/compaction/CompactionTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 5be478cd0b0d2..bb7fefd8599de 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -134,7 +134,6 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { @Override protected void doInitConf() throws Exception { super.doInitConf(); - conf.setSystemTopicEnabled(false); conf.setDispatcherMaxReadBatchSize(1); } From b451e45dc5474d294c161d28374292e9a3f19e6e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 11 Jun 2026 19:20:03 +0800 Subject: [PATCH 3/6] add warn logs for invalid ledger id in ack position --- .../broker/service/persistent/PersistentSubscription.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 8771c8167094a..d47cde486c6ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -477,9 +477,14 @@ public CompletableFuture acknowledgeMessageAsync(List positions, // otherwise, the `asyncMarkDelete` call could jump the read position to the active ledger, which will // skip all entries present in the compacted ledger but not present in the managed ledger. final var consumer = singleConsumerDispatcher.getActiveConsumer(); + final var ml = cursor.getManagedLedger(); if (consumer != null && consumer.readCompacted() - && cursor.getManagedLedger().getOptionalLedgerInfo(position.getLedgerId()).isEmpty()) { + && ml.getOptionalLedgerInfo(position.getLedgerId()).isEmpty()) { + if (ml.getFirstPosition() == null || position.getLedgerId() > ml.getFirstPosition().getLedgerId()) { + log.warn("Received an ACK whose position is " + position + ", valid ledgers: " + + ml.getLedgersInfo().keySet()); + } return CompletableFuture.completedFuture(null); } } From cea81240ae81741c19e1b90103207bce57d21154 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 11 Jun 2026 20:18:04 +0800 Subject: [PATCH 4/6] improve test when system topic is enabled --- .../apache/pulsar/compaction/CompactionTest.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index bb7fefd8599de..04445e2eabcda 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -178,6 +178,8 @@ public void beforeMethod() throws Exception { admin.namespaces().removeRetention("my-tenant/my-ns"); AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {}; AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync; + consumerCreated = __ -> {}; + pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(1); } protected long compact(String topic) throws ExecutionException, InterruptedException { @@ -2664,8 +2666,8 @@ private void triggerAndWaitCompaction(String topic) throws Exception { } @Test - public void testReceiveAckAfterReconnectionOnEmptyLedger() throws Exception { - final var topic = "persistent://my-tenant/my-ns/receive-ack-after-reconnection-on-empty-ledger"; + public void testReaderReadOnDeletedLedger() throws Exception { + final var topic = "persistent://my-tenant/my-ns/reader-read-on-deleted-ledger"; try (final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create()) { for (int i = 0; i < 3; i++) { producer.newMessage().key("key-" + i).value("value-" + i).send(); @@ -2680,8 +2682,9 @@ public void testReceiveAckAfterReconnectionOnEmptyLedger() throws Exception { ml.rollCurrentLedgerIfFull(); Awaitility.await().untilAsserted(() -> assertEquals(ml.getLedgersInfo().size(), 2)); + final var subName = "sub-" + System.currentTimeMillis(); @Cleanup final var reader = pulsarClient.newReader(Schema.STRING).readCompacted(true).topic(topic) - .subscriptionName("reader") + .subscriptionName(subName) .startMessageId(MessageId.earliest).create(); // Slow down the pre-fetching @@ -2701,7 +2704,7 @@ public void testReceiveAckAfterReconnectionOnEmptyLedger() throws Exception { final var firstTime = new AtomicBoolean(true); consumerCreated = serverConsumer -> { final var subscription = serverConsumer.getSubscription(); - if (subscription.getName().contains("reader") && firstTime.compareAndSet(true, false)) { + if (subscription.getName().contains(subName) && firstTime.compareAndSet(true, false)) { final var msgId = (MessageIdAdv) firstMsg.getMessageId(); subscription.acknowledgeMessageAsync(List.of(PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId())), CommandAck.AckType.Cumulative, Map.of()); @@ -2724,7 +2727,7 @@ public void testReceiveAckAfterReconnectionOnEmptyLedger() throws Exception { .attr("value", msg.getValue()).log("read"); } - final var serverConsumer = persistentTopic.getSubscription("reader").getDispatcher().getConsumers().get(0); + final var serverConsumer = persistentTopic.getSubscription(subName).getDispatcher().getConsumers().get(0); assertEquals(((MessageIdAdv) serverConsumer.getStartMessageId()).getEntryId(), 0L); final var emptyLedgerId = persistentTopic.getManagedLedger().getLedgersInfo().lastEntry().getKey(); From 9c07dc868760c0355386a66762cb4dc738fb72fa Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 11 Jun 2026 21:32:50 +0800 Subject: [PATCH 5/6] only apply it for non-durable cursor --- .../pulsar/broker/service/persistent/PersistentSubscription.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index d47cde486c6ff..c7278257f8aaf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -480,6 +480,7 @@ public CompletableFuture acknowledgeMessageAsync(List positions, final var ml = cursor.getManagedLedger(); if (consumer != null && consumer.readCompacted() + && !cursor.isDurable() && ml.getOptionalLedgerInfo(position.getLedgerId()).isEmpty()) { if (ml.getFirstPosition() == null || position.getLedgerId() > ml.getFirstPosition().getLedgerId()) { log.warn("Received an ACK whose position is " + position + ", valid ledgers: " From d19ea223a109f555c6d100c1bb9c9c8a6086752c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 12 Jun 2026 09:42:22 +0800 Subject: [PATCH 6/6] advance consumerCreated before unload --- .../java/org/apache/pulsar/compaction/CompactionTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 04445e2eabcda..b9aff08119ba5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -2697,8 +2697,6 @@ public void testReaderReadOnDeletedLedger() throws Exception { triggerAndWaitCompaction(topic); - // Trigger the reconnection and trim the first ledger. - admin.namespaces().unload("my-tenant/my-ns"); // Simulate the pending cumulative acknowledgment is flushed after the consumer is created // We don't need such interception if we can support controlling the acknowledgment flush for reader. final var firstTime = new AtomicBoolean(true); @@ -2711,6 +2709,8 @@ public void testReaderReadOnDeletedLedger() throws Exception { } }; + // Trigger the reconnection and trim the first ledger. + admin.namespaces().unload("my-tenant/my-ns"); admin.lookups().lookupTopic(topic); final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, true).get() .orElseThrow();