Skip to content

[improve][broker] Trim orphaned bucket snapshots when ledgers are deleted#25984

Open
dao-jun wants to merge 2 commits into
apache:masterfrom
dao-jun:fix/trim_bucket_tracker
Open

[improve][broker] Trim orphaned bucket snapshots when ledgers are deleted#25984
dao-jun wants to merge 2 commits into
apache:masterfrom
dao-jun:fix/trim_bucket_tracker

Conversation

@dao-jun

@dao-jun dao-jun commented Jun 9, 2026

Copy link
Copy Markdown
Member

Motivation

When a topic's managed ledger trims old ledgers (via retention or compaction), the immutable bucket snapshots in BucketDelayedDeliveryTracker that cover those deleted ledger ranges become orphaned — their underlying message data no longer exists, yet they:

  1. Continue to occupy storage as bucket snapshots.
  2. Count against the maxNumBuckets limit, potentially triggering unnecessary and expensive merge operations.

Prior to this change, the tracker only merged buckets when the count exceeded the limit; it never cleaned up buckets whose ledger range had been fully trimmed. This change introduces a trim step that runs before merge, deleting orphaned buckets and decrementing the delayed-message counter accordingly.

Modifications

Source (BucketDelayedDeliveryTracker.java)

  • Added trimFuture: A volatile CompletableFuture that represents the currently in-flight trim/merge/clear operation. It acts as both a gate (preventing concurrent trim chains) and a synchronization point for clear().
  • Added asyncTrimImmutableBuckets(): Scans the managed ledger's surviving ledger range. Buckets whose upperEndpoint falls entirely before the earliest surviving ledger are selected for deletion — these are orphaned and safe to remove.
  • Added deleteBucketSnapshot(): Removes a single bucket from immutableBuckets, asynchronously deletes its snapshot, and re-adds the bucket on failure (rolling back the message count). Failures propagate via CompletionException to stop the sequential deletion chain, avoiding wasted attempts when storage is unavailable.
  • Changed the merge trigger in addMessage(): Before merging, the tracker now runs asyncTrimImmutableBuckets() first. The trimFuture gate ensures only one trim/merge chain is in-flight at a time.
  • Added early-return to asyncMergeBucketSnapshot(): If the bucket count is already within the limit, merge returns immediately without scanning for merge candidates.
  • Rewrote clear(): Instead of immediately clearing state (which could race with an in-flight trim's failure callback), clear() now chains itself after any pending trimFuture. It also sets trimFuture to the clear chain, blocking new trim triggers until the clear completes.

Test (BucketDelayedDeliveryTrackerTest.java)

Added 4 tests using a helper that injects a mocked ManagedLedger:

Test Scenario
testTrimRemovesOrphanedBuckets Buckets covering ledgers below firstLedgerId are removed, remaining ones have ranges ≥ firstLedgerId
testTrimHandlesDeleteFailure Injected delete exception is consumed, tracker remains operational
testTrimWithNoOrphanedBuckets With firstLedgerId=0, trim is a no-op, merge runs normally
testMergeEarlyReturnWhenWithinLimit With maxNumBuckets far above actual count, merge returns early without compaction

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@dao-jun dao-jun added this to the 5.0.0-M1 milestone Jun 9, 2026
@dao-jun dao-jun self-assigned this Jun 9, 2026
@dao-jun dao-jun requested review from coderzc, lhotari and nodece June 9, 2026 19:19
Comment on lines +878 to +881
synchronized (this) {
immutableBuckets.remove(range);
numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing a bucket from immutableBuckets might not be enough. When a mutable bucket is sealed, the first snapshot segment has already been loaded into sharedBucketPriorityQueue, and snapshotSegmentLastIndexMap can still point to this bucket. After this trim removes the range and decrements numberDelayedMessages, getScheduledMessages() can still pop those queued entries and return positions from ledgers that have already been deleted, then decrement numberDelayedMessages again.

Could we either purge the in-memory state for the trimmed bucket under the same lock, such as rebuilding sharedBucketPriorityQueue and removing matching snapshotSegmentLastIndexMap entries, or otherwise avoid trimming buckets that still have loaded segments? Please also add a test that advances the clock after trim and asserts no positions below firstLedgerId are returned and the delayed-message counter remains consistent.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clearing the data of sharedBucketPriorityQueue is intentionally designed, as ManagedCursorImpl.asyncReplayEntries will filter out invalid Positions.

I did miss cleaning the snapshotSegmentLastIndexMap, this is a problem.

Double decrease numberDelayedMessages is indeed a problem, good catch.

Comment on lines +774 to +776
CompletableFuture<Void> before = trimFuture != null && !trimFuture.isDone()
? trimFuture : CompletableFuture.completedFuture(null);
trimFuture = before.thenCompose(__ -> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If clear() is called while trimFuture is still in flight, and that trim/merge chain later completes exceptionally, this before.thenCompose(...) will not execute the clear block. The whenComplete in addMessage only logs the failure and preserves the exceptional completion, so clear() can return exceptionally while leaving immutableBuckets, sharedBucketPriorityQueue, lastMutableBucket, snapshotSegmentLastIndexMap, and numberDelayedMessages uncleared.

Could we normalize the previous future before chaining clear, for example with handle/exceptionally, so clear always runs after the in-flight trim/merge settles?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! That’s a bug for sure.

Comment on lines +522 to +539
public void testTrimRemovesOrphanedBuckets() throws Exception {
TrackerWithStorage ts = createTrackerWithMockLedger(50L, 5);

for (int i = 1; i <= 31; i++) {
ts.tracker.addMessage(i, i, i * 10);
}
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream()
.noneMatch(x -> x.merging)));

int bucketCount = ts.tracker.getImmutableBuckets().asMapOfRanges().size();
assertTrue(bucketCount <= 5,
"Bucket count " + bucketCount + " should be <= maxNumBuckets=5 after trim+merge");

ts.tracker.getImmutableBuckets().asMapOfRanges().forEach((range, bucket) ->
assertTrue(range.lowerEndpoint() >= 50L,
"Remaining bucket range " + range + " should be >= 50"));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only checks that immutableBuckets no longer contains ranges below firstLedgerId. It does not verify the externally visible behavior after trim. Please advance the mock clock and call getScheduledMessages(), then assert that no position from ledgers below firstLedgerId is returned and that getNumberOfDelayedMessages() remains consistent. That would catch stale entries left in sharedBucketPriorityQueue after the bucket range is removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants