diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 19c774852c9cb..c652b5d9c8dec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -39,6 +39,7 @@ import java.util.Optional; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -48,6 +49,7 @@ import javax.annotation.concurrent.ThreadSafe; import lombok.Getter; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.commons.collections4.CollectionUtils; @@ -115,6 +117,8 @@ public static record SnapshotKey(long ledgerId, long entryId) {} private CompletableFuture pendingLoad = null; + private volatile CompletableFuture trimFuture; + public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict, @@ -409,8 +413,15 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime); lastMutableBucket.resetLastMutableBucketRange(); - if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > maxNumBuckets) { - asyncMergeBucketSnapshot(); + if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > maxNumBuckets + && (trimFuture == null || trimFuture.isDone())) { + trimFuture = asyncTrimImmutableBuckets() + .thenCompose(ignore -> asyncMergeBucketSnapshot()) + .whenComplete((ignore, t) -> { + if (t != null) { + log.warn().exception(t).log("Failed to trim or merge bucket snapshots"); + } + }); } } @@ -473,6 +484,10 @@ private synchronized List selectMergedBuckets(final List asyncMergeBucketSnapshot() { List immutableBucketList = immutableBuckets.asMapOfRanges().values().stream().toList(); + if (maxNumBuckets <= 0 || immutableBucketList.size() <= maxNumBuckets) { + return CompletableFuture.completedFuture(null); + } + List toBeMergeImmutableBuckets = selectMergedBuckets(immutableBucketList, MAX_MERGE_NUM); if (toBeMergeImmutableBuckets.isEmpty()) { @@ -626,6 +641,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessages) } long cutoffTime = getCutoffTime(); + Long firstLiveLedgerId = firstActiveLedgerId(); lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue); @@ -634,13 +650,16 @@ public synchronized NavigableSet getScheduledMessages(int maxMessages) while (n > 0 && !sharedBucketPriorityQueue.isEmpty()) { long timestamp = sharedBucketPriorityQueue.peekN1(); + long ledgerId = sharedBucketPriorityQueue.peekN2(); + long entryId = sharedBucketPriorityQueue.peekN3(); + if (firstLiveLedgerId != null && ledgerId < firstLiveLedgerId && !containsMessage(ledgerId, entryId)) { + sharedBucketPriorityQueue.pop(); + continue; + } if (timestamp > cutoffTime) { break; } - long ledgerId = sharedBucketPriorityQueue.peekN2(); - long entryId = sharedBucketPriorityQueue.peekN3(); - SnapshotKey snapshotKey = new SnapshotKey(ledgerId, entryId); ImmutableBucket bucket = snapshotSegmentLastIndexMap.get(snapshotKey); @@ -754,12 +773,26 @@ public boolean shouldPauseAllDeliveries() { @Override public synchronized CompletableFuture clear() { - CompletableFuture future = cleanImmutableBuckets(); - sharedBucketPriorityQueue.clear(); - lastMutableBucket.clear(); - snapshotSegmentLastIndexMap.clear(); - numberDelayedMessages.set(0); - return future; + // Wait for any in-flight trim+merge to settle, then clear. + // Reuse trimFuture to block new triggers until the clear chain completes. + CompletableFuture before = trimFuture != null && !trimFuture.isDone() + ? trimFuture : CompletableFuture.completedFuture(null); + trimFuture = before + .exceptionally(t -> { + log.warn().exception(t).log("Trim/merge buckets failed, but still clear"); + return null; + }) + .thenCompose(__ -> { + synchronized (BucketDelayedDeliveryTracker.this) { + CompletableFuture future = cleanImmutableBuckets(); + sharedBucketPriorityQueue.clear(); + lastMutableBucket.clear(); + snapshotSegmentLastIndexMap.clear(); + numberDelayedMessages.set(0); + return future; + } + }); + return trimFuture; } @Override @@ -816,4 +849,60 @@ public Map genTopicMetricMap() { stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue()); return stats.genTopicMetricMap(); } + + /** + * Delete orphaned bucket snapshots whose ledger range lies entirely before the earliest + * surviving ledger. Buckets are deleted sequentially; the chain stops on first failure + * to avoid wasted work when storage is unavailable. + */ + private synchronized CompletableFuture asyncTrimImmutableBuckets() { + Long firstLedgerId = firstActiveLedgerId(); + if (null == firstLedgerId) { + return CompletableFuture.completedFuture(null); + } + ManagedLedger ledger = context.getCursor().getManagedLedger(); + + Map, ImmutableBucket> toBeDeletedBuckets = immutableBuckets.asMapOfRanges().entrySet().stream() + .filter(e -> e.getKey().upperEndpoint() < firstLedgerId) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (toBeDeletedBuckets.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + String ledgerName = ledger.getName(); + CompletableFuture chain = CompletableFuture.completedFuture(null); + for (Map.Entry, ImmutableBucket> entry : toBeDeletedBuckets.entrySet()) { + chain = chain.thenCompose(__ -> + deleteBucketSnapshot(ledgerName, entry.getKey(), entry.getValue())); + } + return chain; + } + + private CompletableFuture deleteBucketSnapshot(String ledgerName, + Range range, ImmutableBucket bucket) { + return bucket.asyncDeleteBucketSnapshot(stats) + .handle((__, t) -> { + if (t != null) { + log.warn().attr("LedgerName", ledgerName) + .attr("BucketKey", bucket.bucketKey()) + .log("Failed to delete bucket snapshot"); + throw new CompletionException(t); + } + synchronized (this) { + snapshotSegmentLastIndexMap.entrySet().removeIf(entry -> entry.getValue() == bucket); + immutableBuckets.remove(range); + numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages()); + } + return null; + }); + } + + private Long firstActiveLedgerId() { + ManagedLedger ledger = context.getCursor().getManagedLedger(); + if (ledger == null || ledger.getLedgersInfo().isEmpty()) { + return null; + } + return ledger.getLedgersInfo().firstKey(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index efce0f1d8e51f..8e97b3942ed29 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -40,13 +40,16 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo.LedgerInfo; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.pulsar.broker.delayed.AbstractDeliveryTrackerTest; import org.apache.pulsar.broker.delayed.MockBucketSnapshotStorage; @@ -469,4 +472,206 @@ public void testClear(BucketDelayedDeliveryTracker tracker) tracker.close(); } + + private static class TrackerWithStorage { + final BucketDelayedDeliveryTracker tracker; + final MockBucketSnapshotStorage storage; + final AtomicLong clockTime; + + TrackerWithStorage(BucketDelayedDeliveryTracker tracker, MockBucketSnapshotStorage storage, + AtomicLong clockTime) { + this.tracker = tracker; + this.storage = storage; + this.clockTime = clockTime; + } + + void close() throws Exception { + tracker.close(); + storage.close(); + } + } + + private static class BlockingDeleteStorage extends MockBucketSnapshotStorage { + final CompletableFuture firstDeleteFuture = new CompletableFuture<>(); + final AtomicLong deleteCalls = new AtomicLong(); + + @Override + public CompletableFuture deleteBucketSnapshot(long bucketId) { + if (deleteCalls.incrementAndGet() <= 4) { + return firstDeleteFuture; + } + return super.deleteBucketSnapshot(bucketId); + } + } + + private TrackerWithStorage createTrackerWithMockLedger(long firstLedgerId, int maxNumBuckets) + throws Exception { + return createTrackerWithMockLedger(firstLedgerId, maxNumBuckets, new MockBucketSnapshotStorage()); + } + + private TrackerWithStorage createTrackerWithMockLedger(long firstLedgerId, int maxNumBuckets, + MockBucketSnapshotStorage storage) + throws Exception { + storage.start(); + + ManagedLedger mockLedger = mock(ManagedLedger.class); + NavigableMap ledgerInfo = new TreeMap<>(); + ledgerInfo.put(firstLedgerId, mock(LedgerInfo.class)); + when(mockLedger.getLedgersInfo()).thenReturn(ledgerInfo); + when(mockLedger.getName()).thenReturn("test-ledger"); + + ManagedCursor mockCursor = new MockManagedCursor("test-cursor") { + @Override + public ManagedLedger getManagedLedger() { + return mockLedger; + } + }; + + AbstractPersistentDispatcherMultipleConsumers disp = + mock(AbstractPersistentDispatcherMultipleConsumers.class); + Clock mockClock = mock(Clock.class); + AtomicLong mockClockTime = new AtomicLong(); + when(mockClock.millis()).then(x -> mockClockTime.get()); + doReturn(mockCursor).when(disp).getCursor(); + doReturn("persistent://public/default/testDelay" + " / " + mockCursor.getName()).when(disp).getName(); + + BucketDelayedDeliveryTracker tracker = new BucketDelayedDeliveryTracker(disp, mock(Timer.class), + 100000, mockClock, true, storage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, maxNumBuckets); + return new TrackerWithStorage(tracker, storage, mockClockTime); + } + + @Test + public void testTrimRemovesOrphanedBuckets() throws Exception { + long firstLedgerId = 31L; + int messageCount = 36; + TrackerWithStorage ts = createTrackerWithMockLedger(firstLedgerId, 5); + + for (int i = 1; i <= messageCount; i++) { + ts.tracker.addMessage(i, i, i * 10); + } + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream() + .noneMatch(x -> x.merging))); + + int bucketCount = ts.tracker.getImmutableBuckets().asMapOfRanges().size(); + assertTrue(bucketCount <= 5, + "Bucket count " + bucketCount + " should be <= maxNumBuckets=5 after trim+merge"); + + ts.tracker.getImmutableBuckets().asMapOfRanges().forEach((range, bucket) -> + assertTrue(range.lowerEndpoint() >= firstLedgerId, + "Remaining bucket range " + range + " should be >= " + firstLedgerId)); + + long messagesAfterTrim = ts.tracker.getNumberOfDelayedMessages(); + ts.clockTime.set(messageCount * 10); + NavigableSet scheduledMessages = ts.tracker.getScheduledMessages(1); + assertTrue(scheduledMessages.stream().noneMatch(position -> position.getLedgerId() < firstLedgerId), + "Trimmed ledgers should not be returned from the loaded shared queue"); + assertEquals(ts.tracker.getNumberOfDelayedMessages(), messagesAfterTrim - scheduledMessages.size()); + + ts.close(); + } + + @Test + public void testTrimHandlesDeleteFailure() throws Exception { + long firstLedgerId = 50L; + int messageCount = 31; + TrackerWithStorage ts = createTrackerWithMockLedger(firstLedgerId, 5); + + for (int i = 0; i < 4; i++) { + ts.storage.injectDeleteException( + new BucketSnapshotPersistenceException("Delete failed")); + } + + for (int i = 1; i <= messageCount; i++) { + ts.tracker.addMessage(i, i, i * 10); + } + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream() + .noneMatch(x -> x.merging))); + + Awaitility.await().untilAsserted(() -> + assertTrue(ts.storage.deleteExceptionQueue.isEmpty(), + "Delete exception should have been consumed")); + + long messagesBeforeSchedule = ts.tracker.getNumberOfDelayedMessages(); + ts.clockTime.set(messageCount * 10); + Awaitility.await().untilAsserted(() -> { + NavigableSet scheduledMessages = ts.tracker.getScheduledMessages(1); + assertEquals(scheduledMessages.size(), 1); + assertTrue(scheduledMessages.first().getLedgerId() < firstLedgerId); + assertEquals(ts.tracker.getNumberOfDelayedMessages(), messagesBeforeSchedule - 1); + }); + + ts.close(); + } + + @Test + public void testClearRunsAfterInFlightTrimFailure() throws Exception { + long firstLedgerId = 50L; + int messageCount = 31; + BlockingDeleteStorage storage = new BlockingDeleteStorage(); + TrackerWithStorage ts = createTrackerWithMockLedger(firstLedgerId, 5, storage); + + for (int i = 1; i <= messageCount; i++) { + ts.tracker.addMessage(i, i, i * 10); + } + Awaitility.await().untilAsserted(() -> + assertTrue(storage.deleteCalls.get() > 0, "Trim delete should be in flight")); + + CompletableFuture clearFuture = ts.tracker.clear(); + storage.firstDeleteFuture.completeExceptionally(new BucketSnapshotPersistenceException("Delete failed")); + + clearFuture.get(1, TimeUnit.MINUTES); + assertEquals(ts.tracker.getNumberOfDelayedMessages(), 0); + assertEquals(ts.tracker.getImmutableBuckets().asMapOfRanges().size(), 0); + assertEquals(ts.tracker.getLastMutableBucket().size(), 0); + assertEquals(ts.tracker.getSharedBucketPriorityQueue().size(), 0); + + ts.close(); + } + + @Test + public void testTrimWithNoOrphanedBuckets() throws Exception { + TrackerWithStorage ts = createTrackerWithMockLedger(0L, 5); + + for (int i = 1; i <= 31; i++) { + ts.tracker.addMessage(i, i, i * 10); + } + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream() + .noneMatch(x -> x.merging))); + + int bucketCount = ts.tracker.getImmutableBuckets().asMapOfRanges().size(); + assertTrue(bucketCount <= 5, + "Bucket count " + bucketCount + " should be <= maxNumBuckets=5"); + assertTrue(bucketCount > 0, "Should have at least one bucket after merge"); + + ts.close(); + } + + @Test + public void testMergeEarlyReturnWhenWithinLimit() throws Exception { + TrackerWithStorage ts = createTrackerWithMockLedger(0L, 50); + + for (int i = 1; i <= 30; i++) { + ts.tracker.addMessage(i, i, i * 10); + } + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream() + .noneMatch(x -> x.merging))); + + int bucketCount = ts.tracker.getImmutableBuckets().asMapOfRanges().size(); + assertTrue(bucketCount < 50, + "Bucket count " + bucketCount + " should be well below maxNumBuckets=50"); + + long msgsBefore = ts.tracker.getNumberOfDelayedMessages(); + ts.tracker.addMessage(200, 200, 200 * 10); + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream() + .noneMatch(x -> x.merging))); + + assertEquals(ts.tracker.getNumberOfDelayedMessages(), msgsBefore + 1); + + ts.close(); + } }