Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -48,6 +49,7 @@
import javax.annotation.concurrent.ThreadSafe;
import lombok.Getter;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -115,6 +117,8 @@ public static record SnapshotKey(long ledgerId, long entryId) {}

private CompletableFuture<Void> pendingLoad = null;

private volatile CompletableFuture<Void> trimFuture;

public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
Expand Down Expand Up @@ -409,8 +413,15 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
lastMutableBucket.resetLastMutableBucketRange();

if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
asyncMergeBucketSnapshot();
if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > maxNumBuckets
&& (trimFuture == null || trimFuture.isDone())) {
trimFuture = asyncTrimImmutableBuckets()
.thenCompose(ignore -> asyncMergeBucketSnapshot())
.whenComplete((ignore, t) -> {
if (t != null) {
log.warn().exception(t).log("Failed to trim or merge bucket snapshots");
}
});
}
}

Expand Down Expand Up @@ -473,6 +484,10 @@ private synchronized List<ImmutableBucket> selectMergedBuckets(final List<Immuta

private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
List<ImmutableBucket> immutableBucketList = immutableBuckets.asMapOfRanges().values().stream().toList();
if (maxNumBuckets <= 0 || immutableBucketList.size() <= maxNumBuckets) {
return CompletableFuture.completedFuture(null);
}

List<ImmutableBucket> toBeMergeImmutableBuckets = selectMergedBuckets(immutableBucketList, MAX_MERGE_NUM);

if (toBeMergeImmutableBuckets.isEmpty()) {
Expand Down Expand Up @@ -626,6 +641,7 @@ public synchronized NavigableSet<Position> getScheduledMessages(int maxMessages)
}

long cutoffTime = getCutoffTime();
Long firstLiveLedgerId = firstActiveLedgerId();

lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue);

Expand All @@ -634,13 +650,16 @@ public synchronized NavigableSet<Position> getScheduledMessages(int maxMessages)

while (n > 0 && !sharedBucketPriorityQueue.isEmpty()) {
long timestamp = sharedBucketPriorityQueue.peekN1();
long ledgerId = sharedBucketPriorityQueue.peekN2();
long entryId = sharedBucketPriorityQueue.peekN3();
if (firstLiveLedgerId != null && ledgerId < firstLiveLedgerId && !containsMessage(ledgerId, entryId)) {
sharedBucketPriorityQueue.pop();
continue;
}
if (timestamp > cutoffTime) {
break;
}

long ledgerId = sharedBucketPriorityQueue.peekN2();
long entryId = sharedBucketPriorityQueue.peekN3();

SnapshotKey snapshotKey = new SnapshotKey(ledgerId, entryId);

ImmutableBucket bucket = snapshotSegmentLastIndexMap.get(snapshotKey);
Expand Down Expand Up @@ -754,12 +773,26 @@ public boolean shouldPauseAllDeliveries() {

@Override
public synchronized CompletableFuture<Void> clear() {
CompletableFuture<Void> future = cleanImmutableBuckets();
sharedBucketPriorityQueue.clear();
lastMutableBucket.clear();
snapshotSegmentLastIndexMap.clear();
numberDelayedMessages.set(0);
return future;
// Wait for any in-flight trim+merge to settle, then clear.
// Reuse trimFuture to block new triggers until the clear chain completes.
CompletableFuture<Void> before = trimFuture != null && !trimFuture.isDone()
? trimFuture : CompletableFuture.completedFuture(null);
trimFuture = before
.exceptionally(t -> {
log.warn().exception(t).log("Trim/merge buckets failed, but still clear");
return null;
})
.thenCompose(__ -> {
synchronized (BucketDelayedDeliveryTracker.this) {
CompletableFuture<Void> future = cleanImmutableBuckets();
sharedBucketPriorityQueue.clear();
lastMutableBucket.clear();
snapshotSegmentLastIndexMap.clear();
numberDelayedMessages.set(0);
return future;
}
});
return trimFuture;
}

@Override
Expand Down Expand Up @@ -816,4 +849,60 @@ public Map<String, TopicMetricBean> genTopicMetricMap() {
stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
return stats.genTopicMetricMap();
}

/**
* Delete orphaned bucket snapshots whose ledger range lies entirely before the earliest
* surviving ledger. Buckets are deleted sequentially; the chain stops on first failure
* to avoid wasted work when storage is unavailable.
*/
private synchronized CompletableFuture<Void> asyncTrimImmutableBuckets() {
Long firstLedgerId = firstActiveLedgerId();
if (null == firstLedgerId) {
return CompletableFuture.completedFuture(null);
}
ManagedLedger ledger = context.getCursor().getManagedLedger();

Map<Range<Long>, ImmutableBucket> toBeDeletedBuckets = immutableBuckets.asMapOfRanges().entrySet().stream()
.filter(e -> e.getKey().upperEndpoint() < firstLedgerId)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (toBeDeletedBuckets.isEmpty()) {
return CompletableFuture.completedFuture(null);
}

String ledgerName = ledger.getName();
CompletableFuture<Void> chain = CompletableFuture.completedFuture(null);
for (Map.Entry<Range<Long>, ImmutableBucket> entry : toBeDeletedBuckets.entrySet()) {
chain = chain.thenCompose(__ ->
deleteBucketSnapshot(ledgerName, entry.getKey(), entry.getValue()));
}
return chain;
}

private CompletableFuture<Void> deleteBucketSnapshot(String ledgerName,
Range<Long> range, ImmutableBucket bucket) {
return bucket.asyncDeleteBucketSnapshot(stats)
.handle((__, t) -> {
if (t != null) {
log.warn().attr("LedgerName", ledgerName)
.attr("BucketKey", bucket.bucketKey())
.log("Failed to delete bucket snapshot");
throw new CompletionException(t);
}
synchronized (this) {
snapshotSegmentLastIndexMap.entrySet().removeIf(entry -> entry.getValue() == bucket);
immutableBuckets.remove(range);
numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages());
}
return null;
});
}

private Long firstActiveLedgerId() {
ManagedLedger ledger = context.getCursor().getManagedLedger();
if (ledger == null || ledger.getLedgersInfo().isEmpty()) {
return null;
}
return ledger.getLedgersInfo().firstKey();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo.LedgerInfo;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.delayed.AbstractDeliveryTrackerTest;
import org.apache.pulsar.broker.delayed.MockBucketSnapshotStorage;
Expand Down Expand Up @@ -469,4 +472,206 @@ public void testClear(BucketDelayedDeliveryTracker tracker)

tracker.close();
}

private static class TrackerWithStorage {
final BucketDelayedDeliveryTracker tracker;
final MockBucketSnapshotStorage storage;
final AtomicLong clockTime;

TrackerWithStorage(BucketDelayedDeliveryTracker tracker, MockBucketSnapshotStorage storage,
AtomicLong clockTime) {
this.tracker = tracker;
this.storage = storage;
this.clockTime = clockTime;
}

void close() throws Exception {
tracker.close();
storage.close();
}
}

private static class BlockingDeleteStorage extends MockBucketSnapshotStorage {
final CompletableFuture<Void> firstDeleteFuture = new CompletableFuture<>();
final AtomicLong deleteCalls = new AtomicLong();

@Override
public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
if (deleteCalls.incrementAndGet() <= 4) {
return firstDeleteFuture;
}
return super.deleteBucketSnapshot(bucketId);
}
}

private TrackerWithStorage createTrackerWithMockLedger(long firstLedgerId, int maxNumBuckets)
throws Exception {
return createTrackerWithMockLedger(firstLedgerId, maxNumBuckets, new MockBucketSnapshotStorage());
}

private TrackerWithStorage createTrackerWithMockLedger(long firstLedgerId, int maxNumBuckets,
MockBucketSnapshotStorage storage)
throws Exception {
storage.start();

ManagedLedger mockLedger = mock(ManagedLedger.class);
NavigableMap<Long, LedgerInfo> ledgerInfo = new TreeMap<>();
ledgerInfo.put(firstLedgerId, mock(LedgerInfo.class));
when(mockLedger.getLedgersInfo()).thenReturn(ledgerInfo);
when(mockLedger.getName()).thenReturn("test-ledger");

ManagedCursor mockCursor = new MockManagedCursor("test-cursor") {
@Override
public ManagedLedger getManagedLedger() {
return mockLedger;
}
};

AbstractPersistentDispatcherMultipleConsumers disp =
mock(AbstractPersistentDispatcherMultipleConsumers.class);
Clock mockClock = mock(Clock.class);
AtomicLong mockClockTime = new AtomicLong();
when(mockClock.millis()).then(x -> mockClockTime.get());
doReturn(mockCursor).when(disp).getCursor();
doReturn("persistent://public/default/testDelay" + " / " + mockCursor.getName()).when(disp).getName();

BucketDelayedDeliveryTracker tracker = new BucketDelayedDeliveryTracker(disp, mock(Timer.class),
100000, mockClock, true, storage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, maxNumBuckets);
return new TrackerWithStorage(tracker, storage, mockClockTime);
}

@Test
public void testTrimRemovesOrphanedBuckets() throws Exception {
long firstLedgerId = 31L;
int messageCount = 36;
TrackerWithStorage ts = createTrackerWithMockLedger(firstLedgerId, 5);

for (int i = 1; i <= messageCount; i++) {
ts.tracker.addMessage(i, i, i * 10);
}
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream()
.noneMatch(x -> x.merging)));

int bucketCount = ts.tracker.getImmutableBuckets().asMapOfRanges().size();
assertTrue(bucketCount <= 5,
"Bucket count " + bucketCount + " should be <= maxNumBuckets=5 after trim+merge");

ts.tracker.getImmutableBuckets().asMapOfRanges().forEach((range, bucket) ->
assertTrue(range.lowerEndpoint() >= firstLedgerId,
"Remaining bucket range " + range + " should be >= " + firstLedgerId));

long messagesAfterTrim = ts.tracker.getNumberOfDelayedMessages();
ts.clockTime.set(messageCount * 10);
NavigableSet<Position> scheduledMessages = ts.tracker.getScheduledMessages(1);
assertTrue(scheduledMessages.stream().noneMatch(position -> position.getLedgerId() < firstLedgerId),
"Trimmed ledgers should not be returned from the loaded shared queue");
assertEquals(ts.tracker.getNumberOfDelayedMessages(), messagesAfterTrim - scheduledMessages.size());

Comment on lines +544 to +570

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only checks that immutableBuckets no longer contains ranges below firstLedgerId. It does not verify the externally visible behavior after trim. Please advance the mock clock and call getScheduledMessages(), then assert that no position from ledgers below firstLedgerId is returned and that getNumberOfDelayedMessages() remains consistent. That would catch stale entries left in sharedBucketPriorityQueue after the bucket range is removed.

ts.close();
}

@Test
public void testTrimHandlesDeleteFailure() throws Exception {
long firstLedgerId = 50L;
int messageCount = 31;
TrackerWithStorage ts = createTrackerWithMockLedger(firstLedgerId, 5);

for (int i = 0; i < 4; i++) {
ts.storage.injectDeleteException(
new BucketSnapshotPersistenceException("Delete failed"));
}

for (int i = 1; i <= messageCount; i++) {
ts.tracker.addMessage(i, i, i * 10);
}
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream()
.noneMatch(x -> x.merging)));

Awaitility.await().untilAsserted(() ->
assertTrue(ts.storage.deleteExceptionQueue.isEmpty(),
"Delete exception should have been consumed"));

long messagesBeforeSchedule = ts.tracker.getNumberOfDelayedMessages();
ts.clockTime.set(messageCount * 10);
Awaitility.await().untilAsserted(() -> {
NavigableSet<Position> scheduledMessages = ts.tracker.getScheduledMessages(1);
assertEquals(scheduledMessages.size(), 1);
assertTrue(scheduledMessages.first().getLedgerId() < firstLedgerId);
assertEquals(ts.tracker.getNumberOfDelayedMessages(), messagesBeforeSchedule - 1);
});

ts.close();
}

@Test
public void testClearRunsAfterInFlightTrimFailure() throws Exception {
long firstLedgerId = 50L;
int messageCount = 31;
BlockingDeleteStorage storage = new BlockingDeleteStorage();
TrackerWithStorage ts = createTrackerWithMockLedger(firstLedgerId, 5, storage);

for (int i = 1; i <= messageCount; i++) {
ts.tracker.addMessage(i, i, i * 10);
}
Awaitility.await().untilAsserted(() ->
assertTrue(storage.deleteCalls.get() > 0, "Trim delete should be in flight"));

CompletableFuture<Void> clearFuture = ts.tracker.clear();
storage.firstDeleteFuture.completeExceptionally(new BucketSnapshotPersistenceException("Delete failed"));

clearFuture.get(1, TimeUnit.MINUTES);
assertEquals(ts.tracker.getNumberOfDelayedMessages(), 0);
assertEquals(ts.tracker.getImmutableBuckets().asMapOfRanges().size(), 0);
assertEquals(ts.tracker.getLastMutableBucket().size(), 0);
assertEquals(ts.tracker.getSharedBucketPriorityQueue().size(), 0);

ts.close();
}

@Test
public void testTrimWithNoOrphanedBuckets() throws Exception {
TrackerWithStorage ts = createTrackerWithMockLedger(0L, 5);

for (int i = 1; i <= 31; i++) {
ts.tracker.addMessage(i, i, i * 10);
}
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream()
.noneMatch(x -> x.merging)));

int bucketCount = ts.tracker.getImmutableBuckets().asMapOfRanges().size();
assertTrue(bucketCount <= 5,
"Bucket count " + bucketCount + " should be <= maxNumBuckets=5");
assertTrue(bucketCount > 0, "Should have at least one bucket after merge");

ts.close();
}

@Test
public void testMergeEarlyReturnWhenWithinLimit() throws Exception {
TrackerWithStorage ts = createTrackerWithMockLedger(0L, 50);

for (int i = 1; i <= 30; i++) {
ts.tracker.addMessage(i, i, i * 10);
}
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream()
.noneMatch(x -> x.merging)));

int bucketCount = ts.tracker.getImmutableBuckets().asMapOfRanges().size();
assertTrue(bucketCount < 50,
"Bucket count " + bucketCount + " should be well below maxNumBuckets=50");

long msgsBefore = ts.tracker.getNumberOfDelayedMessages();
ts.tracker.addMessage(200, 200, 200 * 10);
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream()
.noneMatch(x -> x.merging)));

assertEquals(ts.tracker.getNumberOfDelayedMessages(), msgsBefore + 1);

ts.close();
}
}
Loading