diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 236a55162347b..c7278257f8aaf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -58,6 +58,7 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.AbstractSubscription; import org.apache.pulsar.broker.service.AnalyzeBacklogResult; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -471,6 +472,23 @@ public CompletableFuture acknowledgeMessageAsync(List positions, .attr("position", position) .log("Cumulative ack on"); AckCallback callback = new AckCallback(previousMarkDeletePosition, future); + if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer singleConsumerDispatcher) { + // For compacted consumer, we should ignore the position that does not exist in the managed ledger, + // otherwise, the `asyncMarkDelete` call could jump the read position to the active ledger, which will + // skip all entries present in the compacted ledger but not present in the managed ledger. + final var consumer = singleConsumerDispatcher.getActiveConsumer(); + final var ml = cursor.getManagedLedger(); + if (consumer != null + && consumer.readCompacted() + && !cursor.isDurable() + && ml.getOptionalLedgerInfo(position.getLedgerId()).isEmpty()) { + if (ml.getFirstPosition() == null || position.getLedgerId() > ml.getFirstPosition().getLedgerId()) { + log.warn("Received an ACK whose position is " + position + ", valid ledgers: " + + ml.getLedgersInfo().keySet()); + } + return CompletableFuture.completedFuture(null); + } + } cursor.asyncMarkDelete(position, mergeCursorProperties(properties), callback, callback); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 6b625d1cd65c4..b9aff08119ba5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -56,6 +56,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; @@ -70,12 +71,15 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.intercept.MockBrokerInterceptor; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -87,6 +91,7 @@ import org.apache.pulsar.client.api.EncryptionKeyInfo; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; @@ -124,6 +129,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { protected ScheduledExecutorService compactionScheduler; protected BookKeeper bk; private PublishingOrderCompactor compactor; + private volatile java.util.function.Consumer consumerCreated = __ -> {}; @Override protected void doInitConf() throws Exception { @@ -135,6 +141,14 @@ protected void doInitConf() throws Exception { @Override public void setup() throws Exception { super.internalSetup(); + pulsar.getBrokerService().setInterceptor(new MockBrokerInterceptor() { + + @Override + public void consumerCreated(ServerCnx cnx, org.apache.pulsar.broker.service.Consumer consumer, + Map metadata) { + consumerCreated.accept(consumer); + } + }); admin.clusters().createCluster(configClusterName, ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); @@ -164,6 +178,8 @@ public void beforeMethod() throws Exception { admin.namespaces().removeRetention("my-tenant/my-ns"); AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {}; AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync; + consumerCreated = __ -> {}; + pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(1); } protected long compact(String topic) throws ExecutionException, InterruptedException { @@ -2648,4 +2664,74 @@ private void triggerAndWaitCompaction(String topic) throws Exception { Awaitility.await().untilAsserted(() -> assertEquals( admin.topics().compactionStatus(topic).status, LongRunningProcessStatus.Status.SUCCESS)); } + + @Test + public void testReaderReadOnDeletedLedger() throws Exception { + final var topic = "persistent://my-tenant/my-ns/reader-read-on-deleted-ledger"; + try (final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create()) { + for (int i = 0; i < 3; i++) { + producer.newMessage().key("key-" + i).value("value-" + i).send(); + } + } + // Trigger the ledger rollover + var ml = (ManagedLedgerImpl) ((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get() + .orElseThrow()).getManagedLedger(); + ml.getConfig().setMaxEntriesPerLedger(1); + ml.getConfig().setMaxSizePerLedgerMb(0); + ml.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS); + ml.rollCurrentLedgerIfFull(); + Awaitility.await().untilAsserted(() -> assertEquals(ml.getLedgersInfo().size(), 2)); + + final var subName = "sub-" + System.currentTimeMillis(); + @Cleanup final var reader = pulsarClient.newReader(Schema.STRING).readCompacted(true).topic(topic) + .subscriptionName(subName) + .startMessageId(MessageId.earliest).create(); + + // Slow down the pre-fetching + pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(500); + + // Receive 1 message so that the startMessageId will be reset to ledger_id:0 after reconnection + assertTrue(reader.hasMessageAvailable()); + final var firstMsg = reader.readNext(3, TimeUnit.SECONDS); + assertNotNull(firstMsg); + + triggerAndWaitCompaction(topic); + + // Simulate the pending cumulative acknowledgment is flushed after the consumer is created + // We don't need such interception if we can support controlling the acknowledgment flush for reader. + final var firstTime = new AtomicBoolean(true); + consumerCreated = serverConsumer -> { + final var subscription = serverConsumer.getSubscription(); + if (subscription.getName().contains(subName) && firstTime.compareAndSet(true, false)) { + final var msgId = (MessageIdAdv) firstMsg.getMessageId(); + subscription.acknowledgeMessageAsync(List.of(PositionFactory.create(msgId.getLedgerId(), + msgId.getEntryId())), CommandAck.AckType.Cumulative, Map.of()); + } + }; + + // Trigger the reconnection and trim the first ledger. + admin.namespaces().unload("my-tenant/my-ns"); + admin.lookups().lookupTopic(topic); + final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, true).get() + .orElseThrow(); + final var trimFuture = new CompletableFuture(); + persistentTopic.getManagedLedger().trimConsumedLedgersInBackground(trimFuture); + trimFuture.get(); + assertEquals(persistentTopic.getManagedLedger().getLedgersInfo().size(), 1); + + pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(1); + + while (reader.hasMessageAvailable()) { + final var msg = reader.readNextAsync().get(3, TimeUnit.SECONDS); + log.info().attr("id", msg.getMessageId()).attr("key", msg.getKey()) + .attr("value", msg.getValue()).log("read"); + } + + final var serverConsumer = persistentTopic.getSubscription(subName).getDispatcher().getConsumers().get(0); + assertEquals(((MessageIdAdv) serverConsumer.getStartMessageId()).getEntryId(), 0L); + + final var emptyLedgerId = persistentTopic.getManagedLedger().getLedgersInfo().lastEntry().getKey(); + assertEquals(persistentTopic.getTopicCompactionService().getLastCompactedPosition().get(), + PositionFactory.create(emptyLedgerId, -1L)); + } }