Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
cb78263
feat(delayed): optimize BucketDelayedDeliveryTracker with new lock st…
Denovo1998 Aug 15, 2025
ffbe45b
refactor(bucket): replace synchronized with explicit write lock in as…
Denovo1998 Aug 25, 2025
4e8b23e
fix(broker): adjust indentation in BucketDelayedDeliveryTracker
Denovo1998 Sep 17, 2025
cb5695e
Merge branch 'master' into bucket_delivery_tracker_optimize
Denovo1998 Sep 17, 2025
00fa1e3
refactor(delayed-delivery): replace synchronized with writeLock in Bu…
Denovo1998 Sep 17, 2025
47280a9
refactor(broker): Optimize the mixed read-write operations in getSche…
Denovo1998 Sep 19, 2025
2542478
refactor(broker): improve delayed delivery tracker close logic
Denovo1998 Sep 19, 2025
8c7e0ec
test(BucketDeliveredDeliveryTrackerTest): improve snapshot persistenc…
Denovo1998 Sep 19, 2025
5c942bc
Merge branch 'master' into bucket_delivery_tracker_optimize
Denovo1998 Sep 20, 2025
9db40eb
test(bucket): optimize BucketDelayedDeliveryTracker thread safety tests
Denovo1998 Sep 20, 2025
9f09e86
test(BucketDelayedDeliveryTracker): extend thread safety tests with n…
Denovo1998 Sep 20, 2025
c6dd286
feat(benchmark): add JMH benchmarks for BucketDelayedDeliveryTracker
Denovo1998 Sep 20, 2025
591c6d5
refactor(test): Make MockManagedCursor class public for testing
Denovo1998 Sep 20, 2025
fe5d98d
feat(delayed): introduce DelayedDeliveryContext abstraction for track…
Denovo1998 Sep 22, 2025
b83481c
Merge branch 'master' into bucket_delivery_tracker_optimize
Denovo1998 Oct 12, 2025
598f4e3
Merge branch 'master' into bucket_delivery_tracker_optimize
Denovo1998 Nov 30, 2025
738d0e2
feat(delayed): enhance bucket sealing and message routing logic for i…
Denovo1998 Nov 30, 2025
5a73d69
feat(delayed): improve topic name extraction and ensure sequential me…
Denovo1998 Dec 3, 2025
cc1a635
feat(delayed): optimize bucket merging logic and enhance benchmark pa…
Denovo1998 Dec 4, 2025
43d7faf
feat(delayed): streamline bucket snapshot handling and improve execut…
Denovo1998 Dec 7, 2025
f691fcc
Merge branch 'master' into bucket_delivery_tracker_optimize
Denovo1998 Jun 1, 2026
d9f2e3a
[improve][broker] Optimize fine-grained concurrency control for Bucke…
Denovo1998 Jun 1, 2026
67d4d99
refactor(bucket-delayed-delivery): optimize update timer logic
Denovo1998 Jun 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@
import org.openjdk.jmh.annotations.Warmup;

/**
* JMH benchmarks for {@link BucketDelayedDeliveryTracker}.
*
* <p>This benchmark measures tracker throughput under different read/write ratios
* and initial message counts without implying a specific lock implementation.
*
* <p>Run with: mvn exec:java -Dexec.mainClass="org.openjdk.jmh.Main"
* Enhanced JMH Benchmarks for BucketDelayedDeliveryTracker with ReentrantReadWriteLock.
* This benchmark tests the performance improvements made by transitioning from
* StampedLock to ReentrantReadWriteLock for fine-grained concurrency control.
* <p>
* Run with: mvn exec:java -Dexec.mainClass="org.openjdk.jmh.Main"
* -Dexec.args="BucketDelayedDeliveryTrackerBenchmark"
*/
@BenchmarkMode(Mode.Throughput)
Expand Down Expand Up @@ -170,6 +169,9 @@ private void preloadMessages() {

@Benchmark
public boolean benchmarkMixedOperations() {
String[] parts = readWriteRatio.split("_");
int readPercentage = Integer.parseInt(parts[0]);

if (ThreadLocalRandom.current().nextInt(100) < readPercentage) {
// Read operations
return performReadOperation();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**
* Microbenchmarks for delayed message delivery bucket implementation.
*
* <p>This package contains JMH benchmarks for testing the performance
* characteristics of the BucketDelayedDeliveryTracker, particularly
* focusing on thread safety improvements with ReentrantReadWriteLock.
*/
package org.apache.pulsar.broker;
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryT
protected final Clock clock;

private final boolean isDelayedDeliveryDeliverAtTimeStrict;
private final Object triggerLock;

private final Object timerStateLock = new Object();

public AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
Expand All @@ -76,7 +77,6 @@ public AbstractDelayedDeliveryTracker(DelayedDeliveryContext context, Timer time
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this.context = context;
this.triggerLock = context.getTriggerLock();
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.clock = clock;
Expand Down Expand Up @@ -107,67 +107,83 @@ public void resetTickTime(long tickTime) {

protected void updateTimer() {
if (getNumberOfDelayedMessages() == 0) {
if (timeout != null) {
currentTimeoutTarget = -1;
timeout.cancel();
timeout = null;
synchronized (timerStateLock) {
if (timeout != null) {
currentTimeoutTarget = -1;
timeout.cancel();
timeout = null;
}
}
return;
}
long timestamp = nextDeliveryTime();
if (timestamp == currentTimeoutTarget) {
// The timer is already set to the correct target time
return;
}
synchronized (timerStateLock) {
if (timestamp == currentTimeoutTarget) {
// The timer is already set to the correct target time
return;
}

if (timeout != null) {
timeout.cancel();
}
if (timeout != null) {
timeout.cancel();
}

long now = clock.millis();
long delayMillis = timestamp - now;

if (delayMillis < 0) {
// There are messages that are already ready to be delivered. If
// the dispatcher is not getting them is because the consumer is
// either not connected or slow.
// We don't need to keep retriggering the timer. When the consumer
// catches up, the dispatcher will do the readMoreEntries() and
// get these messages
return;
}

long now = clock.millis();
long delayMillis = timestamp - now;
// Compute the earliest time that we schedule the timer to run.
long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);

if (delayMillis < 0) {
// There are messages that are already ready to be delivered. If
// the dispatcher is not getting them is because the consumer is
// either not connected or slow.
// We don't need to keep retriggering the timer. When the consumer
// catches up, the dispatcher will do the readMoreEntries() and
// get these messages
return;
log.debug().attr("delayMillis", calculatedDelayMillis).log("Start timer");

// Even though we may delay longer than this timestamp because of the tick delay, we still track the
// current timeout with reference to the next message's timestamp.
currentTimeoutTarget = timestamp;
timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
}
}

// Compute the earliest time that we schedule the timer to run.
long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);
log.debug().attr("delayMillis", calculatedDelayMillis)
.log("Start timer");
// Even though we may delay longer than this timestamp because of the tick delay, we still track the
// current timeout with reference to the next message's timestamp.
currentTimeoutTarget = timestamp;
timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
protected final void scheduleImmediateRun() {
synchronized (timerStateLock) {
if (timeout != null) {
timeout.cancel();
}
timeout = timer.newTimeout(this, 0, TimeUnit.MILLISECONDS);
}
}

@Override
public void run(Timeout timeout) throws Exception {
log.debug("Timer triggered");
if (timeout == null || timeout.isCancelled()) {
log.debug().log("Timer triggered");
if (timeout == null || timeout.isCancelled()) {
return;
}

synchronized (triggerLock) {
synchronized (timerStateLock) {
lastTickRun = clock.millis();
currentTimeoutTarget = -1;
this.timeout = null;
context.triggerReadMoreEntries();
}
context.triggerReadMoreEntries();
}

@Override
public void close() {
if (timeout != null) {
timeout.cancel();
timeout = null;
synchronized (timerStateLock) {
if (timeout != null) {
timeout.cancel();
timeout = null;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,18 @@ public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleCon
@VisibleForTesting
BucketDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher)
throws RecoverDelayedDeliveryTrackerException {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
DelayedDeliveryContext context = new DispatcherDelayedDeliveryContext(dispatcher);
return new BucketDelayedDeliveryTracker(context, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
}

@VisibleForTesting
BucketDelayedDeliveryTracker newTracker0(String dispatcherName, ManagedCursor cursor)
throws RecoverDelayedDeliveryTrackerException {
DelayedDeliveryContext context = new NoopDelayedDeliveryContext(dispatcherName, cursor);
return new BucketDelayedDeliveryTracker(context, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public Object getTriggerLock() {

@Override
public void triggerReadMoreEntries() {
dispatcher.readMoreEntriesAsync();
synchronized (dispatcher) {
dispatcher.readMoreEntriesAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
Expand Down Expand Up @@ -83,10 +84,26 @@ public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsum
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}

private InMemoryDelayedDeliveryTracker(DelayedDeliveryContext context, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
public InMemoryDelayedDeliveryTracker(String dispatcherName, ManagedCursor cursor, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this(new NoopDelayedDeliveryContext(dispatcherName, cursor), timer, tickTimeMillis, clock,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}

public InMemoryDelayedDeliveryTracker(DelayedDeliveryContext context, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this(context, timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}

public InMemoryDelayedDeliveryTracker(DelayedDeliveryContext context, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
super(context, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.log = LOG.with().ctx(super.log).build();
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
Expand Down Expand Up @@ -131,8 +148,9 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
log.debug()
.attr("ledgerId", ledgerId)
.attr("entryId", entryId)
.attr("deliveryInMs", () -> deliverAt - clock.millis())
.attr("deliveryInMs", deliverAt - clock.millis())
.log("Add message");

long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);

Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, k -> new TreeMap<>())
Expand Down Expand Up @@ -226,10 +244,12 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
delayedMessageMap.remove(timestamp);
}
}
log.debug()
.attr("messagesCount", positions.size())
.log("Get scheduled messages");
if (delayedMessageMap.isEmpty()) {

log.debug()
.attr("messagesCount", positions.size())
.log("Get scheduled messages");

if (delayedMessageMap.isEmpty()) {
// Reset to initial state
highestDeliveryTimeTracked = 0;
messagesHaveFixedDelay = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
Expand Down Expand Up @@ -68,7 +69,15 @@ public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleCon

@VisibleForTesting
InMemoryDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
DelayedDeliveryContext context = new DispatcherDelayedDeliveryContext(dispatcher);
return new InMemoryDelayedDeliveryTracker(context, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}

@VisibleForTesting
InMemoryDelayedDeliveryTracker newTracker0(String dispatcherName, ManagedCursor cursor) {
DelayedDeliveryContext context = new NoopDelayedDeliveryContext(dispatcherName, cursor);
return new InMemoryDelayedDeliveryTracker(context, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/
package org.apache.pulsar.broker.delayed;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.ManagedCursor;

public class NoopDelayedDeliveryContext implements DelayedDeliveryContext {

private final String name;
private final ManagedCursor cursor;
private final Object triggerLock = new Object();
private final AtomicInteger triggerCount = new AtomicInteger();

public NoopDelayedDeliveryContext(String name, ManagedCursor cursor) {
this.name = name;
Expand All @@ -49,5 +51,10 @@ public Object getTriggerLock() {
@Override
public void triggerReadMoreEntries() {
// no-op; for tests/JMH
triggerCount.incrementAndGet();
}

public int getTriggerCount() {
return triggerCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,22 @@ CompletableFuture<Long> asyncSaveBucketSnapshot(
List<SnapshotSegment> bucketSnapshotSegments) {
final String bucketKey = bucket.bucketKey();
final String cursorName = Codec.decode(cursor.getName());
final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName));
final String suffix = " / " + cursorName;
final int suffixIndex = dispatcherName.lastIndexOf(suffix);
final String topicName;
if (suffixIndex >= 0) {
topicName = dispatcherName.substring(0, suffixIndex);
} else {
// Fallback for dispatcher names that don't follow the "<topic> / <cursor>" pattern.
// This can happen in tests or benchmarks that use a simplified dispatcher name.
// Using the full dispatcherName as topicName avoids StringIndexOutOfBoundsException
// while still providing a meaningful identifier to the snapshot storage.
topicName = dispatcherName;
log.debug()
.attr("dispatcher", dispatcherName)
.attr("suffix", suffix)
.log("Dispatcher name does not contain expected suffix, using full dispatcherName as topic name");
}
return executeWithRetry(
() -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey,
topicName, cursorName)
Expand Down
Loading