diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 686e2244b560a..6b1c50c98d438 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import lombok.CustomLog; import lombok.Getter; @@ -95,10 +96,8 @@ public BacklogQuotaImpl getBacklogQuota(NamespaceName namespace, BacklogQuotaTyp public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType, boolean preciseTimeBasedBacklogQuotaCheck) { if (persistentTopic.isFenced() || persistentTopic.isClosingOrDeleting()) { - // Skip eviction work on a topic that is being torn down or transiently fenced. - // Mutating cursors here (skipEntries / markDeletePosition) contends with the - // delete path and can keep namespace force-delete from completing in time; - // the entries are about to be discarded anyway. + // Skip quota handling on a topic that is temporarily unavailable or being torn down. + // For close/delete, mutating cursors here can contend with the teardown path. log.debug() .attr("topic", persistentTopic.getName()) .attr("backlogQuotaType", backlogQuotaType) @@ -196,15 +195,21 @@ private void dropBacklogForSizeLimit(PersistentTopic persistentTopic, BacklogQuo log.debug().attr("slowestConsumer", slowestConsumer).log("no messages to skip for"); break; } - // Skip messages on the slowest consumer - log.debug() - .attr("topic", persistentTopic.getName()) - .attr("messagesToSkip", messagesToSkip) - .attr("consumer", slowestConsumer.getName()) - .attr("entriesInBacklog", entriesInBacklog) - .log("Skipping messages on slowest consumer having backlog entries"); - slowestConsumer.skipEntries(messagesToSkip, IndividualDeletedEntries.Include); - markDeletePositionMoveForward(persistentTopic, slowestConsumer); + beforeBacklogQuotaCursorMutation(persistentTopic); + if (!runCursorMutationIfTopicNotClosingOrDeleting(persistentTopic, () -> { + // Skip messages on the slowest consumer + log.debug() + .attr("topic", persistentTopic.getName()) + .attr("messagesToSkip", messagesToSkip) + .attr("consumer", slowestConsumer.getName()) + .attr("entriesInBacklog", entriesInBacklog) + .log("Skipping messages on slowest consumer having backlog entries"); + slowestConsumer.skipEntries(messagesToSkip, IndividualDeletedEntries.Include); + markDeletePositionMoveForward(persistentTopic, slowestConsumer); + return null; + })) { + break; + } } catch (Exception e) { log.error() .attr("topic", persistentTopic.getName()) @@ -267,8 +272,14 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo if (ledgerInfo == null) { long ledgerId = mLedger.getLedgersInfo().ceilingKey(oldestPosition.getLedgerId() + 1); Position nextPosition = PositionFactory.create(ledgerId, -1); - slowestConsumer.markDelete(nextPosition); - markDeletePositionMoveForward(persistentTopic, slowestConsumer); + beforeBacklogQuotaCursorMutation(persistentTopic); + if (!runCursorMutationIfTopicNotClosingOrDeleting(persistentTopic, () -> { + slowestConsumer.markDelete(nextPosition); + markDeletePositionMoveForward(persistentTopic, slowestConsumer); + return null; + })) { + break; + } continue; } // Timestamp only > 0 if ledger has been closed @@ -278,8 +289,14 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo long ledgerId = mLedger.getLedgersInfo().ceilingKey(oldestPosition.getLedgerId() + 1); Position nextPosition = PositionFactory.create(ledgerId, -1); if (!nextPosition.equals(oldestPosition)) { - slowestConsumer.markDelete(nextPosition); - markDeletePositionMoveForward(persistentTopic, slowestConsumer); + beforeBacklogQuotaCursorMutation(persistentTopic); + if (!runCursorMutationIfTopicNotClosingOrDeleting(persistentTopic, () -> { + slowestConsumer.markDelete(nextPosition); + markDeletePositionMoveForward(persistentTopic, slowestConsumer); + return null; + })) { + break; + } continue; } } @@ -366,6 +383,21 @@ private void markDeletePositionMoveForward(PersistentTopic persistentTopic, Mana } } + @VisibleForTesting + protected void beforeBacklogQuotaCursorMutation(PersistentTopic persistentTopic) { + // No-op. + } + + private boolean runCursorMutationIfTopicNotClosingOrDeleting(PersistentTopic persistentTopic, + Callable mutation) throws Exception { + boolean didRun = persistentTopic.runWithTopicCloseReadLock(mutation); + if (!didRun) { + log.debug() + .attr("topic", persistentTopic.getName()) + .log("Stopping backlog-quota eviction because topic is closing or deleting"); + } + return didRun; + } /** * Compute the target value after backlog eviction. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c9bcad341fcd7..ea9f1d97ae34d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -43,6 +43,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -1749,6 +1750,26 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect return close(true, closeWithoutWaitingClientDisconnect); } + /** + * Executes an action while holding the read side of the topic close/delete lock. + * Topic close and delete acquire the write side before setting {@link #isClosingOrDeleting}, so they cannot start + * between the state check and the action. + * + * @return true if the action ran, or false if close/delete had already started + */ + public boolean runWithTopicCloseReadLock(Callable action) throws Exception { + lock.readLock().lock(); + try { + if (isClosingOrDeleting) { + return false; + } + action.call(); + return true; + } finally { + lock.readLock().unlock(); + } + } + private enum CloseTypes { transferring, notWaitDisconnectClients, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index cbc066bd99afa..b58ed8a34b913 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; @@ -2225,6 +2226,139 @@ private void assertPendingAcks(org.apache.pulsar.broker.service.Consumer consume assertThat(consumer.getUnackedMessages()).isEqualTo(expected); } + @Test + public void testSizeBacklogEvictionRaceWithTopicCloseDoesNotSkipEntries() throws Exception { + final int msgSize = 1024; + final int quotaSizeLimit = 10 * 1024; + final int numMsgs = 20; + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(adminUrl.toString()) + .build(); + final String topicName = + BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic-closing-size"); + final String subName = "closing-size-sub"; + + @Cleanup + Consumer consumer = client.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + @Cleanup + Producer producer = createProducer(client, topicName); + byte[] content = new byte[msgSize]; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + consumer.receive(); + } + PersistentTopic topic = + (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + PersistentSubscription sub = topic.getSubscription(subName); + Position markDeleteBeforeEviction = sub.getCursor().getMarkDeletedPosition(); + admin.namespaces().setBacklogQuota("prop/ns-quota", + BacklogQuota.builder() + .limitSize(quotaSizeLimit) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build()); + Awaitility.await().untilAsserted(() -> + assertEquals(topic.getBacklogQuota(destination_storage).getLimitSize(), quotaSizeLimit)); + + BlockingBacklogQuotaManager backlogQuotaManager = new BlockingBacklogQuotaManager(pulsar); + CompletableFuture evictionFuture = CompletableFuture.runAsync( + () -> backlogQuotaManager.handleExceededBacklogQuota(topic, destination_storage, false)); + backlogQuotaManager.awaitBeforeMutation(); + CompletableFuture closeFuture = topic.close(false); + Awaitility.await().untilAsserted(() -> assertTrue(topic.isClosingOrDeleting())); + backlogQuotaManager.allowMutationPointToContinue(); + evictionFuture.get(30, SECONDS); + + assertEquals(sub.getCursor().getMarkDeletedPosition(), markDeleteBeforeEviction); + closeFuture.get(30, SECONDS); + } + + @Test + public void testTimeBacklogEvictionRaceWithTopicCloseDoesNotMarkDelete() throws Exception { + final int numMsgs = 14; + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(adminUrl.toString()) + .build(); + final String topicName = + BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic-closing-time"); + final String subName = "closing-time-sub"; + + @Cleanup + Consumer consumer = client.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + @Cleanup + Producer producer = createProducer(client, topicName); + byte[] content = new byte[1024]; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + consumer.receive(); + } + PersistentTopic topic = + (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + PersistentSubscription sub = topic.getSubscription(subName); + Position markDeleteBeforeEviction = sub.getCursor().getMarkDeletedPosition(); + Thread.sleep(SECONDS.toMillis(2)); + admin.namespaces().setBacklogQuota("prop/ns-quota", + BacklogQuota.builder() + .limitTime(1) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(), message_age); + Awaitility.await().untilAsserted(() -> + assertEquals(topic.getBacklogQuota(message_age).getLimitTime(), 1)); + + BlockingBacklogQuotaManager backlogQuotaManager = new BlockingBacklogQuotaManager(pulsar); + CompletableFuture evictionFuture = CompletableFuture.runAsync( + () -> backlogQuotaManager.handleExceededBacklogQuota(topic, message_age, false)); + backlogQuotaManager.awaitBeforeMutation(); + CompletableFuture closeFuture = topic.close(false); + Awaitility.await().untilAsserted(() -> assertTrue(topic.isClosingOrDeleting())); + backlogQuotaManager.allowMutationPointToContinue(); + evictionFuture.get(30, SECONDS); + + assertEquals(sub.getCursor().getMarkDeletedPosition(), markDeleteBeforeEviction); + closeFuture.get(30, SECONDS); + } + + private static class BlockingBacklogQuotaManager extends BacklogQuotaManager { + private final CountDownLatch beforeMutation = new CountDownLatch(1); + private final CountDownLatch continueMutation = new CountDownLatch(1); + private final AtomicBoolean blocked = new AtomicBoolean(); + + BlockingBacklogQuotaManager(PulsarService pulsar) { + super(pulsar); + } + + @Override + protected void beforeBacklogQuotaCursorMutation(PersistentTopic persistentTopic) { + if (!blocked.compareAndSet(false, true)) { + return; + } + beforeMutation.countDown(); + try { + assertTrue(continueMutation.await(30, SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + void awaitBeforeMutation() throws InterruptedException { + assertTrue(beforeMutation.await(30, SECONDS)); + } + + void allowMutationPointToContinue() { + continueMutation.countDown(); + } + } + @Test public void testConsumerBacklogEvictionSizeQuotaCleansPendingAcks() throws Exception { final int msgSize = 1024;