Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import lombok.CustomLog;
import lombok.Getter;
Expand Down Expand Up @@ -95,10 +96,8 @@ public BacklogQuotaImpl getBacklogQuota(NamespaceName namespace, BacklogQuotaTyp
public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType,
boolean preciseTimeBasedBacklogQuotaCheck) {
if (persistentTopic.isFenced() || persistentTopic.isClosingOrDeleting()) {
// Skip eviction work on a topic that is being torn down or transiently fenced.
// Mutating cursors here (skipEntries / markDeletePosition) contends with the
// delete path and can keep namespace force-delete from completing in time;
// the entries are about to be discarded anyway.
// Skip quota handling on a topic that is temporarily unavailable or being torn down.
// For close/delete, mutating cursors here can contend with the teardown path.
log.debug()
.attr("topic", persistentTopic.getName())
.attr("backlogQuotaType", backlogQuotaType)
Expand Down Expand Up @@ -196,15 +195,21 @@ private void dropBacklogForSizeLimit(PersistentTopic persistentTopic, BacklogQuo
log.debug().attr("slowestConsumer", slowestConsumer).log("no messages to skip for");
break;
}
// Skip messages on the slowest consumer
log.debug()
.attr("topic", persistentTopic.getName())
.attr("messagesToSkip", messagesToSkip)
.attr("consumer", slowestConsumer.getName())
.attr("entriesInBacklog", entriesInBacklog)
.log("Skipping messages on slowest consumer having backlog entries");
slowestConsumer.skipEntries(messagesToSkip, IndividualDeletedEntries.Include);
markDeletePositionMoveForward(persistentTopic, slowestConsumer);
beforeBacklogQuotaCursorMutation(persistentTopic);
if (!runCursorMutationIfTopicNotClosingOrDeleting(persistentTopic, () -> {
// Skip messages on the slowest consumer
log.debug()
.attr("topic", persistentTopic.getName())
.attr("messagesToSkip", messagesToSkip)
.attr("consumer", slowestConsumer.getName())
.attr("entriesInBacklog", entriesInBacklog)
.log("Skipping messages on slowest consumer having backlog entries");
slowestConsumer.skipEntries(messagesToSkip, IndividualDeletedEntries.Include);
markDeletePositionMoveForward(persistentTopic, slowestConsumer);
return null;
})) {
break;
}
} catch (Exception e) {
log.error()
.attr("topic", persistentTopic.getName())
Expand Down Expand Up @@ -267,8 +272,14 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
if (ledgerInfo == null) {
long ledgerId = mLedger.getLedgersInfo().ceilingKey(oldestPosition.getLedgerId() + 1);
Position nextPosition = PositionFactory.create(ledgerId, -1);
slowestConsumer.markDelete(nextPosition);
markDeletePositionMoveForward(persistentTopic, slowestConsumer);
beforeBacklogQuotaCursorMutation(persistentTopic);
if (!runCursorMutationIfTopicNotClosingOrDeleting(persistentTopic, () -> {
slowestConsumer.markDelete(nextPosition);
markDeletePositionMoveForward(persistentTopic, slowestConsumer);
return null;
})) {
break;
}
continue;
}
// Timestamp only > 0 if ledger has been closed
Expand All @@ -278,8 +289,14 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
long ledgerId = mLedger.getLedgersInfo().ceilingKey(oldestPosition.getLedgerId() + 1);
Position nextPosition = PositionFactory.create(ledgerId, -1);
if (!nextPosition.equals(oldestPosition)) {
slowestConsumer.markDelete(nextPosition);
markDeletePositionMoveForward(persistentTopic, slowestConsumer);
beforeBacklogQuotaCursorMutation(persistentTopic);
if (!runCursorMutationIfTopicNotClosingOrDeleting(persistentTopic, () -> {
slowestConsumer.markDelete(nextPosition);
markDeletePositionMoveForward(persistentTopic, slowestConsumer);
return null;
})) {
break;
}
continue;
}
}
Expand Down Expand Up @@ -366,6 +383,21 @@ private void markDeletePositionMoveForward(PersistentTopic persistentTopic, Mana
}
}

@VisibleForTesting
protected void beforeBacklogQuotaCursorMutation(PersistentTopic persistentTopic) {
// No-op.
}

private boolean runCursorMutationIfTopicNotClosingOrDeleting(PersistentTopic persistentTopic,
Callable<Void> mutation) throws Exception {
boolean didRun = persistentTopic.runWithTopicCloseReadLock(mutation);
if (!didRun) {
log.debug()
.attr("topic", persistentTopic.getName())
.log("Stopping backlog-quota eviction because topic is closing or deleting");
}
return didRun;
}

/**
* Compute the target value after backlog eviction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -1749,6 +1750,26 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
return close(true, closeWithoutWaitingClientDisconnect);
}

/**
* Executes an action while holding the read side of the topic close/delete lock.
* Topic close and delete acquire the write side before setting {@link #isClosingOrDeleting}, so they cannot start
* between the state check and the action.
*
* @return true if the action ran, or false if close/delete had already started
*/
public boolean runWithTopicCloseReadLock(Callable<Void> action) throws Exception {
lock.readLock().lock();
try {
if (isClosingOrDeleting) {
return false;
}
action.call();
return true;
} finally {
lock.readLock().unlock();
}
}

private enum CloseTypes {
transferring,
notWaitDisconnectClients,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -2225,6 +2226,139 @@ private void assertPendingAcks(org.apache.pulsar.broker.service.Consumer consume
assertThat(consumer.getUnackedMessages()).isEqualTo(expected);
}

@Test
public void testSizeBacklogEvictionRaceWithTopicCloseDoesNotSkipEntries() throws Exception {
final int msgSize = 1024;
final int quotaSizeLimit = 10 * 1024;
final int numMsgs = 20;
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(adminUrl.toString())
.build();
final String topicName =
BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic-closing-size");
final String subName = "closing-size-sub";

@Cleanup
Consumer<byte[]> consumer = client.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
@Cleanup
Producer<byte[]> producer = createProducer(client, topicName);
byte[] content = new byte[msgSize];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
consumer.receive();
}
PersistentTopic topic =
(PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription sub = topic.getSubscription(subName);
Position markDeleteBeforeEviction = sub.getCursor().getMarkDeletedPosition();
admin.namespaces().setBacklogQuota("prop/ns-quota",
BacklogQuota.builder()
.limitSize(quotaSizeLimit)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build());
Awaitility.await().untilAsserted(() ->
assertEquals(topic.getBacklogQuota(destination_storage).getLimitSize(), quotaSizeLimit));

BlockingBacklogQuotaManager backlogQuotaManager = new BlockingBacklogQuotaManager(pulsar);
CompletableFuture<Void> evictionFuture = CompletableFuture.runAsync(
() -> backlogQuotaManager.handleExceededBacklogQuota(topic, destination_storage, false));
backlogQuotaManager.awaitBeforeMutation();
CompletableFuture<Void> closeFuture = topic.close(false);
Awaitility.await().untilAsserted(() -> assertTrue(topic.isClosingOrDeleting()));
backlogQuotaManager.allowMutationPointToContinue();
evictionFuture.get(30, SECONDS);

assertEquals(sub.getCursor().getMarkDeletedPosition(), markDeleteBeforeEviction);
closeFuture.get(30, SECONDS);
}

@Test
public void testTimeBacklogEvictionRaceWithTopicCloseDoesNotMarkDelete() throws Exception {
final int numMsgs = 14;
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(adminUrl.toString())
.build();
final String topicName =
BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic-closing-time");
final String subName = "closing-time-sub";

@Cleanup
Consumer<byte[]> consumer = client.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
@Cleanup
Producer<byte[]> producer = createProducer(client, topicName);
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
consumer.receive();
}
PersistentTopic topic =
(PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription sub = topic.getSubscription(subName);
Position markDeleteBeforeEviction = sub.getCursor().getMarkDeletedPosition();
Thread.sleep(SECONDS.toMillis(2));
admin.namespaces().setBacklogQuota("prop/ns-quota",
BacklogQuota.builder()
.limitTime(1)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build(), message_age);
Awaitility.await().untilAsserted(() ->
assertEquals(topic.getBacklogQuota(message_age).getLimitTime(), 1));

BlockingBacklogQuotaManager backlogQuotaManager = new BlockingBacklogQuotaManager(pulsar);
CompletableFuture<Void> evictionFuture = CompletableFuture.runAsync(
() -> backlogQuotaManager.handleExceededBacklogQuota(topic, message_age, false));
backlogQuotaManager.awaitBeforeMutation();
CompletableFuture<Void> closeFuture = topic.close(false);
Awaitility.await().untilAsserted(() -> assertTrue(topic.isClosingOrDeleting()));
backlogQuotaManager.allowMutationPointToContinue();
evictionFuture.get(30, SECONDS);

assertEquals(sub.getCursor().getMarkDeletedPosition(), markDeleteBeforeEviction);
closeFuture.get(30, SECONDS);
}

private static class BlockingBacklogQuotaManager extends BacklogQuotaManager {
private final CountDownLatch beforeMutation = new CountDownLatch(1);
private final CountDownLatch continueMutation = new CountDownLatch(1);
private final AtomicBoolean blocked = new AtomicBoolean();

BlockingBacklogQuotaManager(PulsarService pulsar) {
super(pulsar);
}

@Override
protected void beforeBacklogQuotaCursorMutation(PersistentTopic persistentTopic) {
if (!blocked.compareAndSet(false, true)) {
return;
}
beforeMutation.countDown();
try {
assertTrue(continueMutation.await(30, SECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

void awaitBeforeMutation() throws InterruptedException {
assertTrue(beforeMutation.await(30, SECONDS));
}

void allowMutationPointToContinue() {
continueMutation.countDown();
}
}

@Test
public void testConsumerBacklogEvictionSizeQuotaCleansPendingAcks() throws Exception {
final int msgSize = 1024;
Expand Down