diff --git a/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java
index 08d02195c8781..1c501f6c2da99 100644
--- a/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java
+++ b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java
@@ -48,12 +48,11 @@
import org.openjdk.jmh.annotations.Warmup;
/**
- * JMH benchmarks for {@link BucketDelayedDeliveryTracker}.
- *
- *
This benchmark measures tracker throughput under different read/write ratios
- * and initial message counts without implying a specific lock implementation.
- *
- *
Run with: mvn exec:java -Dexec.mainClass="org.openjdk.jmh.Main"
+ * Enhanced JMH Benchmarks for BucketDelayedDeliveryTracker with ReentrantReadWriteLock.
+ * This benchmark tests the performance improvements made by transitioning from
+ * StampedLock to ReentrantReadWriteLock for fine-grained concurrency control.
+ *
+ * Run with: mvn exec:java -Dexec.mainClass="org.openjdk.jmh.Main"
* -Dexec.args="BucketDelayedDeliveryTrackerBenchmark"
*/
@BenchmarkMode(Mode.Throughput)
@@ -170,6 +169,9 @@ private void preloadMessages() {
@Benchmark
public boolean benchmarkMixedOperations() {
+ String[] parts = readWriteRatio.split("_");
+ int readPercentage = Integer.parseInt(parts[0]);
+
if (ThreadLocalRandom.current().nextInt(100) < readPercentage) {
// Read operations
return performReadOperation();
diff --git a/microbench/src/main/java/org/apache/pulsar/broker/package-info.java b/microbench/src/main/java/org/apache/pulsar/broker/package-info.java
new file mode 100644
index 0000000000000..a7620134f2f84
--- /dev/null
+++ b/microbench/src/main/java/org/apache/pulsar/broker/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Microbenchmarks for delayed message delivery bucket implementation.
+ *
+ *
This package contains JMH benchmarks for testing the performance
+ * characteristics of the BucketDelayedDeliveryTracker, particularly
+ * focusing on thread safety improvements with ReentrantReadWriteLock.
+ */
+package org.apache.pulsar.broker;
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
index 2caf71a6eda30..4e00986a5064d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
@@ -50,7 +50,8 @@ public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryT
protected final Clock clock;
private final boolean isDelayedDeliveryDeliverAtTimeStrict;
- private final Object triggerLock;
+
+ private final Object timerStateLock = new Object();
public AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
@@ -76,7 +77,6 @@ public AbstractDelayedDeliveryTracker(DelayedDeliveryContext context, Timer time
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this.context = context;
- this.triggerLock = context.getTriggerLock();
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.clock = clock;
@@ -107,67 +107,83 @@ public void resetTickTime(long tickTime) {
protected void updateTimer() {
if (getNumberOfDelayedMessages() == 0) {
- if (timeout != null) {
- currentTimeoutTarget = -1;
- timeout.cancel();
- timeout = null;
+ synchronized (timerStateLock) {
+ if (timeout != null) {
+ currentTimeoutTarget = -1;
+ timeout.cancel();
+ timeout = null;
+ }
}
return;
}
long timestamp = nextDeliveryTime();
- if (timestamp == currentTimeoutTarget) {
- // The timer is already set to the correct target time
- return;
- }
+ synchronized (timerStateLock) {
+ if (timestamp == currentTimeoutTarget) {
+ // The timer is already set to the correct target time
+ return;
+ }
- if (timeout != null) {
- timeout.cancel();
- }
+ if (timeout != null) {
+ timeout.cancel();
+ }
+
+ long now = clock.millis();
+ long delayMillis = timestamp - now;
+
+ if (delayMillis < 0) {
+ // There are messages that are already ready to be delivered. If
+ // the dispatcher is not getting them is because the consumer is
+ // either not connected or slow.
+ // We don't need to keep retriggering the timer. When the consumer
+ // catches up, the dispatcher will do the readMoreEntries() and
+ // get these messages
+ return;
+ }
- long now = clock.millis();
- long delayMillis = timestamp - now;
+ // Compute the earliest time that we schedule the timer to run.
+ long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+ long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);
- if (delayMillis < 0) {
- // There are messages that are already ready to be delivered. If
- // the dispatcher is not getting them is because the consumer is
- // either not connected or slow.
- // We don't need to keep retriggering the timer. When the consumer
- // catches up, the dispatcher will do the readMoreEntries() and
- // get these messages
- return;
+ log.debug().attr("delayMillis", calculatedDelayMillis).log("Start timer");
+
+ // Even though we may delay longer than this timestamp because of the tick delay, we still track the
+ // current timeout with reference to the next message's timestamp.
+ currentTimeoutTarget = timestamp;
+ timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
}
+ }
- // Compute the earliest time that we schedule the timer to run.
- long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
- long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);
- log.debug().attr("delayMillis", calculatedDelayMillis)
- .log("Start timer");
- // Even though we may delay longer than this timestamp because of the tick delay, we still track the
- // current timeout with reference to the next message's timestamp.
- currentTimeoutTarget = timestamp;
- timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
+ protected final void scheduleImmediateRun() {
+ synchronized (timerStateLock) {
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ timeout = timer.newTimeout(this, 0, TimeUnit.MILLISECONDS);
+ }
}
@Override
public void run(Timeout timeout) throws Exception {
- log.debug("Timer triggered");
- if (timeout == null || timeout.isCancelled()) {
+ log.debug().log("Timer triggered");
+ if (timeout == null || timeout.isCancelled()) {
return;
}
- synchronized (triggerLock) {
+ synchronized (timerStateLock) {
lastTickRun = clock.millis();
currentTimeoutTarget = -1;
this.timeout = null;
- context.triggerReadMoreEntries();
}
+ context.triggerReadMoreEntries();
}
@Override
public void close() {
- if (timeout != null) {
- timeout.cancel();
- timeout = null;
+ synchronized (timerStateLock) {
+ if (timeout != null) {
+ timeout.cancel();
+ timeout = null;
+ }
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 80d1076c1c8f4..5e6e8cafcdbeb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -101,7 +101,18 @@ public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleCon
@VisibleForTesting
BucketDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher)
throws RecoverDelayedDeliveryTrackerException {
- return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
+ DelayedDeliveryContext context = new DispatcherDelayedDeliveryContext(dispatcher);
+ return new BucketDelayedDeliveryTracker(context, timer, tickTimeMillis,
+ isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
+ TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
+ delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
+ }
+
+ @VisibleForTesting
+ BucketDelayedDeliveryTracker newTracker0(String dispatcherName, ManagedCursor cursor)
+ throws RecoverDelayedDeliveryTrackerException {
+ DelayedDeliveryContext context = new NoopDelayedDeliveryContext(dispatcherName, cursor);
+ return new BucketDelayedDeliveryTracker(context, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DispatcherDelayedDeliveryContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DispatcherDelayedDeliveryContext.java
index 2ea388d504c24..889d519c12b8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DispatcherDelayedDeliveryContext.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DispatcherDelayedDeliveryContext.java
@@ -46,6 +46,8 @@ public Object getTriggerLock() {
@Override
public void triggerReadMoreEntries() {
- dispatcher.readMoreEntriesAsync();
+ synchronized (dispatcher) {
+ dispatcher.readMoreEntriesAsync();
+ }
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index b1d4e8cecbd14..b68efdcf62bf7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
@@ -83,10 +84,26 @@ public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsum
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}
- private InMemoryDelayedDeliveryTracker(DelayedDeliveryContext context, Timer timer,
- long tickTimeMillis, Clock clock,
- boolean isDelayedDeliveryDeliverAtTimeStrict,
- long fixedDelayDetectionLookahead) {
+ public InMemoryDelayedDeliveryTracker(String dispatcherName, ManagedCursor cursor, Timer timer,
+ long tickTimeMillis, Clock clock,
+ boolean isDelayedDeliveryDeliverAtTimeStrict,
+ long fixedDelayDetectionLookahead) {
+ this(new NoopDelayedDeliveryContext(dispatcherName, cursor), timer, tickTimeMillis, clock,
+ isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
+ }
+
+ public InMemoryDelayedDeliveryTracker(DelayedDeliveryContext context, Timer timer,
+ long tickTimeMillis,
+ boolean isDelayedDeliveryDeliverAtTimeStrict,
+ long fixedDelayDetectionLookahead) {
+ this(context, timer, tickTimeMillis, Clock.systemUTC(),
+ isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
+ }
+
+ public InMemoryDelayedDeliveryTracker(DelayedDeliveryContext context, Timer timer,
+ long tickTimeMillis, Clock clock,
+ boolean isDelayedDeliveryDeliverAtTimeStrict,
+ long fixedDelayDetectionLookahead) {
super(context, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.log = LOG.with().ctx(super.log).build();
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
@@ -131,8 +148,9 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
log.debug()
.attr("ledgerId", ledgerId)
.attr("entryId", entryId)
- .attr("deliveryInMs", () -> deliverAt - clock.millis())
+ .attr("deliveryInMs", deliverAt - clock.millis())
.log("Add message");
+
long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, k -> new TreeMap<>())
@@ -226,10 +244,12 @@ public NavigableSet getScheduledMessages(int maxMessages) {
delayedMessageMap.remove(timestamp);
}
}
- log.debug()
- .attr("messagesCount", positions.size())
- .log("Get scheduled messages");
- if (delayedMessageMap.isEmpty()) {
+
+ log.debug()
+ .attr("messagesCount", positions.size())
+ .log("Get scheduled messages");
+
+ if (delayedMessageMap.isEmpty()) {
// Reset to initial state
highestDeliveryTimeTracked = 0;
messagesHaveFixedDelay = true;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
index 4f31c9a1972d1..e50c181e36fb3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
@@ -24,6 +24,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
@@ -68,7 +69,15 @@ public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleCon
@VisibleForTesting
InMemoryDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
- return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
+ DelayedDeliveryContext context = new DispatcherDelayedDeliveryContext(dispatcher);
+ return new InMemoryDelayedDeliveryTracker(context, timer, tickTimeMillis,
+ isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
+ }
+
+ @VisibleForTesting
+ InMemoryDelayedDeliveryTracker newTracker0(String dispatcherName, ManagedCursor cursor) {
+ DelayedDeliveryContext context = new NoopDelayedDeliveryContext(dispatcherName, cursor);
+ return new InMemoryDelayedDeliveryTracker(context, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java
index 826e127df005a..a8378432f0189 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.delayed;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.ManagedCursor;
public class NoopDelayedDeliveryContext implements DelayedDeliveryContext {
@@ -25,6 +26,7 @@ public class NoopDelayedDeliveryContext implements DelayedDeliveryContext {
private final String name;
private final ManagedCursor cursor;
private final Object triggerLock = new Object();
+ private final AtomicInteger triggerCount = new AtomicInteger();
public NoopDelayedDeliveryContext(String name, ManagedCursor cursor) {
this.name = name;
@@ -49,5 +51,10 @@ public Object getTriggerLock() {
@Override
public void triggerReadMoreEntries() {
// no-op; for tests/JMH
+ triggerCount.incrementAndGet();
+ }
+
+ public int getTriggerCount() {
+ return triggerCount.get();
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index 2aad122752fd6..a7d84d6ff4c3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -135,7 +135,22 @@ CompletableFuture asyncSaveBucketSnapshot(
List bucketSnapshotSegments) {
final String bucketKey = bucket.bucketKey();
final String cursorName = Codec.decode(cursor.getName());
- final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName));
+ final String suffix = " / " + cursorName;
+ final int suffixIndex = dispatcherName.lastIndexOf(suffix);
+ final String topicName;
+ if (suffixIndex >= 0) {
+ topicName = dispatcherName.substring(0, suffixIndex);
+ } else {
+ // Fallback for dispatcher names that don't follow the " / " pattern.
+ // This can happen in tests or benchmarks that use a simplified dispatcher name.
+ // Using the full dispatcherName as topicName avoids StringIndexOutOfBoundsException
+ // while still providing a meaningful identifier to the snapshot storage.
+ topicName = dispatcherName;
+ log.debug()
+ .attr("dispatcher", dispatcherName)
+ .attr("suffix", suffix)
+ .log("Dispatcher name does not contain expected suffix, using full dispatcherName as topic name");
+ }
return executeWithRetry(
() -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey,
topicName, cursorName)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 19c774852c9cb..09a4fbb0d8e45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -41,9 +41,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import lombok.Getter;
@@ -57,9 +62,11 @@
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.DelayedDeliveryContext;
import org.apache.pulsar.broker.delayed.DispatcherDelayedDeliveryContext;
+import org.apache.pulsar.broker.delayed.NoopDelayedDeliveryContext;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
@@ -99,7 +106,7 @@ public static record SnapshotKey(long ledgerId, long entryId) {}
@Getter
@VisibleForTesting
- private final MutableBucket lastMutableBucket;
+ private volatile MutableBucket lastMutableBucket;
@Getter
@VisibleForTesting
@@ -115,6 +122,34 @@ public static record SnapshotKey(long ledgerId, long entryId) {}
private CompletableFuture pendingLoad = null;
+ private volatile CompletableFuture pendingBucketSnapshot = CompletableFuture.completedFuture(null);
+
+ private final ExecutorService bucketSnapshotExecutor;
+ private final AtomicBoolean bucketSnapshotInProgress = new AtomicBoolean(false);
+ /**
+ * Bucket that is currently being sealed into an immutable bucket.
+ *
+ * Lifecycle:
+ *
+ * - When a mutable bucket reaches the threshold to be persisted, it is assigned to
+ * {@code bucketBeingSealed} under the {@link #writeLock} and a fresh {@link #lastMutableBucket}
+ * is created for new messages.
+ * - While a bucket is being sealed, messages whose ledger id falls into its range are not written
+ * to this bucket anymore. Instead they are routed directly to {@link #sharedBucketPriorityQueue}
+ * and de-duplication is done using this sealed bucket bitmap plus the current
+ * {@link #lastMutableBucket} bitmap.
+ * - Once {@link #createBucketSnapshotAsync(MutableBucket)} finishes (successfully or exceptionally),
+ * this field is cleared. This transition also happens under the {@link #writeLock}.
+ *
+ *
+ * All access to this field must be done under {@link #writeLock} to keep bucket routing and sealing
+ * consistent across threads.
+ */
+ private volatile MutableBucket bucketBeingSealed = null;
+
+ private final Lock readLock;
+ private final Lock writeLock;
+
public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
@@ -139,6 +174,30 @@ public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumer
timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}
+ public BucketDelayedDeliveryTracker(String dispatcherName, ManagedCursor cursor,
+ Timer timer, long tickTimeMillis,
+ boolean isDelayedDeliveryDeliverAtTimeStrict,
+ BucketSnapshotStorage bucketSnapshotStorage,
+ long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
+ int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
+ throws RecoverDelayedDeliveryTrackerException {
+ this(new NoopDelayedDeliveryContext(dispatcherName, cursor), timer, tickTimeMillis, Clock.systemUTC(),
+ isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, minIndexCountPerBucket,
+ timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
+ }
+
+ public BucketDelayedDeliveryTracker(DelayedDeliveryContext context,
+ Timer timer, long tickTimeMillis,
+ boolean isDelayedDeliveryDeliverAtTimeStrict,
+ BucketSnapshotStorage bucketSnapshotStorage,
+ long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
+ int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
+ throws RecoverDelayedDeliveryTrackerException {
+ this(context, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
+ bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis,
+ maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
+ }
+
@VisibleForTesting
public BucketDelayedDeliveryTracker(DelayedDeliveryContext context,
Timer timer, long tickTimeMillis, Clock clock,
@@ -160,8 +219,12 @@ public BucketDelayedDeliveryTracker(DelayedDeliveryContext context,
new MutableBucket(context.getName(), context.getCursor(), FutureUtil.Sequencer.create(),
bucketSnapshotStorage);
this.stats = new BucketDelayedMessageIndexStats();
+ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ this.readLock = rwLock.readLock();
+ this.writeLock = rwLock.writeLock();
+ bucketSnapshotExecutor = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory(context.getName() + "bucket-creation"));
- // Close the tracker if failed to recover.
try {
long recoveredMessages = recoverBucketSnapshot();
this.numberDelayedMessages.set(recoveredMessages);
@@ -175,7 +238,7 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT
ManagedCursor cursor = this.lastMutableBucket.getCursor();
Map cursorProperties = cursor.getCursorProperties();
if (MapUtils.isEmpty(cursorProperties)) {
- log.info("Recover delayed message index bucket snapshot finish, don't find bucket snapshot");
+ log.info().log("Recover delayed message index bucket snapshot finish, don't find bucket snapshot");
return 0;
}
FutureUtil.Sequencer sequencer = this.lastMutableBucket.getSequencer();
@@ -195,7 +258,7 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT
Map, ImmutableBucket> immutableBucketMap = immutableBuckets.asMapOfRanges();
if (immutableBucketMap.isEmpty()) {
- log.info("Recover delayed message index bucket snapshot finish, don't find bucket snapshot");
+ log.info().log("Recover delayed message index bucket snapshot finish, don't find bucket snapshot");
return 0;
}
@@ -308,11 +371,14 @@ private synchronized void putAndCleanOverlapRange(Range range, ImmutableBu
@Override
public void run(Timeout timeout) throws Exception {
- synchronized (this) {
+ writeLock.lock();
+ try {
if (timeout == null || timeout.isCancelled()) {
return;
}
lastMutableBucket.moveScheduledMessageToSharedQueue(getCutoffTime(), sharedBucketPriorityQueue);
+ } finally {
+ writeLock.unlock();
}
super.run(timeout);
}
@@ -344,7 +410,7 @@ private void afterCreateImmutableBucket(Pair immu
immutableBucket.asyncUpdateSnapshotLength();
log.info()
.attr("bucketKey", immutableBucket.bucketKey())
- .log("Create bucket snapshot finish, bucketKey");
+ .log("Create bucket snapshot finish");
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
System.currentTimeMillis() - startTime);
@@ -359,7 +425,8 @@ private void afterCreateImmutableBucket(Pair immu
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create);
// Put indexes back into the shared queue and downgrade to memory mode
- synchronized (BucketDelayedDeliveryTracker.this) {
+ writeLock.lock();
+ try {
immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
for (SnapshotSegment snapshotSegment : snapshotSegments) {
for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) {
@@ -375,6 +442,8 @@ private void afterCreateImmutableBucket(Pair immu
Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId));
snapshotSegmentLastIndexMap.remove(
new SnapshotKey(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId()));
+ } finally {
+ writeLock.unlock();
}
return INVALID_BUCKET_ID;
});
@@ -383,58 +452,122 @@ private void afterCreateImmutableBucket(Pair immu
}
}
+ /**
+ * Add a delayed message to the tracker.
+ *
+ * Routing rules (under {@link #writeLock} for the mutating part):
+ *
+ * - If the message already exists, it is ignored.
+ * - If the delivery time is before the cutoff, it is dropped.
+ * - If the current {@link #lastMutableBucket} has accumulated enough indexes and the message
+ * ledger id is strictly after its range, a snapshot is triggered:
+ *
+ * - The current {@code lastMutableBucket} is moved to {@link #bucketBeingSealed}.
+ * - A fresh {@code lastMutableBucket} is created for subsequent messages.
+ * - Sealing and persistence happen asynchronously in
+ * {@link #createBucketSnapshotAsync(MutableBucket)}.
+ *
+ *
+ * - Messages whose ledger id falls into the range of a bucket that is currently being sealed, or
+ * into an already immutable bucket, or before the current {@code lastMutableBucket} range,
+ * are routed directly to {@link #sharedBucketPriorityQueue} and tracked only via the bitmap
+ * of the current {@code lastMutableBucket}. The sealed bucket bitmap is still consulted for
+ * de-duplication until the immutable bucket is registered.
+ * - All remaining messages are added to {@code lastMutableBucket}.
+ *
+ */
@Override
- public synchronized boolean addMessage(long ledgerId, long entryId, long deliverAt) {
- if (containsMessage(ledgerId, entryId)) {
- return true;
- }
+ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
+ readLock.lock();
+ try {
+ if (containsMessageUnsafe(ledgerId, entryId)) {
+ return true;
+ }
- if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
- return false;
+ if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
+ return false;
+ }
+ } finally {
+ readLock.unlock();
}
- boolean existBucket = findImmutableBucket(ledgerId).isPresent();
+ writeLock.lock();
+ try {
+ // Double check
+ if (containsMessageUnsafe(ledgerId, entryId)) {
+ return true;
+ }
- // Create bucket snapshot
- if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
- && lastMutableBucket.size() >= minIndexCountPerBucket
- && !lastMutableBucket.isEmpty()) {
- long createStartTime = System.currentTimeMillis();
- stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
- Pair immutableBucketDelayedIndexPair =
- lastMutableBucket.sealBucketAndAsyncPersistent(
- this.timeStepPerBucketSnapshotSegmentInMillis,
- this.maxIndexesPerBucketSnapshotSegment,
- this.sharedBucketPriorityQueue);
- afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
- lastMutableBucket.resetLastMutableBucketRange();
+ if (deliverAt <= getCutoffTime()) {
+ return false;
+ }
- if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
- asyncMergeBucketSnapshot();
+ final MutableBucket sealingBucket = this.bucketBeingSealed;
+ final boolean sealingBucketContainsLedger = sealingBucket != null
+ && ledgerId >= sealingBucket.startLedgerId
+ && ledgerId <= sealingBucket.endLedgerId;
+
+ boolean existBucket = findImmutableBucket(ledgerId).isPresent();
+
+ if (!sealingBucketContainsLedger
+ && !existBucket
+ && ledgerId > lastMutableBucket.endLedgerId
+ && lastMutableBucket.size() >= minIndexCountPerBucket
+ && !lastMutableBucket.isEmpty()
+ && bucketSnapshotInProgress.compareAndSet(false, true)) {
+ // Create bucket snapshot using current lastMutableBucket as the bucket to seal.
+ final MutableBucket bucketToSeal = this.lastMutableBucket;
+ final CompletableFuture snapshotFuture = new CompletableFuture<>();
+ this.bucketBeingSealed = bucketToSeal;
+ this.pendingBucketSnapshot = snapshotFuture;
+ this.lastMutableBucket = new MutableBucket(context.getName(), context.getCursor(),
+ FutureUtil.Sequencer.create(), bucketToSeal.getBucketSnapshotStorage());
+ bucketSnapshotExecutor.execute(() -> {
+ try {
+ createBucketSnapshotAsync(bucketToSeal);
+ snapshotFuture.complete(null);
+ } catch (Throwable t) {
+ snapshotFuture.completeExceptionally(t);
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ if (t instanceof Error) {
+ throw (Error) t;
+ }
+ throw new RuntimeException(t);
+ } finally {
+ bucketSnapshotInProgress.set(false);
+ }
+ });
}
- }
- if (ledgerId >= lastMutableBucket.endLedgerId && !existBucket) {
- lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
- } else {
- // Message index belongs to previous bucket range or the current mutable bucket range,
- // enter sharedBucketPriorityQueue directly
- sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
- lastMutableBucket.putIndexBit(ledgerId, entryId);
- }
+ if (sealingBucketContainsLedger || ledgerId < lastMutableBucket.startLedgerId || existBucket) {
+ // If message belongs to a bucket currently being sealed, or an existing immutable bucket,
+ // or has ledgerId smaller than current lastMutableBucket range, we put it directly into
+ // the shared queue and track its index bit in the current mutable bucket.
+ sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
+ lastMutableBucket.putIndexBit(ledgerId, entryId);
+ } else {
+ checkArgument(lastMutableBucket.startLedgerId == -1L || ledgerId >= lastMutableBucket.startLedgerId);
+ lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
+ }
+
+ numberDelayedMessages.incrementAndGet();
- numberDelayedMessages.incrementAndGet();
log.debug()
.attr("ledgerId", ledgerId)
.attr("entryId", entryId)
.attr("deliveryInMs", deliverAt - clock.millis())
.log("Add message");
- updateTimer();
- return true;
+ updateTimer();
+ return true;
+ } finally {
+ writeLock.unlock();
+ }
}
- private synchronized List selectMergedBuckets(final List values, int mergeNum) {
+ private List selectMergedBuckets(final List values, int mergeNum) {
checkArgument(mergeNum < values.size());
long minNumberMessages = Long.MAX_VALUE;
long minScheduleTimestamp = Long.MAX_VALUE;
@@ -452,7 +585,7 @@ private synchronized List selectMergedBuckets(final List bucket.firstScheduleTimestamps.get(bucket.currentSegmentEntryId + 1))
+ .mapToLong(bucket -> bucket.firstScheduleTimestamps.get(bucket.currentSegmentEntryId))
.min().getAsLong();
if (scheduleTimestamp < minScheduleTimestamp) {
minScheduleTimestamp = scheduleTimestamp;
@@ -471,52 +604,61 @@ private synchronized List selectMergedBuckets(final List asyncMergeBucketSnapshot() {
- List immutableBucketList = immutableBuckets.asMapOfRanges().values().stream().toList();
- List toBeMergeImmutableBuckets = selectMergedBuckets(immutableBucketList, MAX_MERGE_NUM);
+ private CompletableFuture asyncMergeBucketSnapshot() {
+ writeLock.lock();
+ try {
+ List immutableBucketList = immutableBuckets.asMapOfRanges().values().stream().toList();
+ List toBeMergeImmutableBuckets = selectMergedBuckets(immutableBucketList, MAX_MERGE_NUM);
- if (toBeMergeImmutableBuckets.isEmpty()) {
- log.warn("Can't find able merged buckets");
- return CompletableFuture.completedFuture(null);
- }
+ if (toBeMergeImmutableBuckets.isEmpty()) {
+ log.warn().log("Can't find able merged buckets");
+ return CompletableFuture.completedFuture(null);
+ }
- final String bucketsStr = toBeMergeImmutableBuckets.stream().map(Bucket::bucketKey).collect(
- Collectors.joining(",")).replaceAll(DELAYED_BUCKET_KEY_PREFIX + "_", "");
+ final String bucketsStr = toBeMergeImmutableBuckets.stream().map(Bucket::bucketKey).collect(
+ Collectors.joining(",")).replaceAll(DELAYED_BUCKET_KEY_PREFIX + "_", "");
log.info()
.attr("bucketKeys", bucketsStr)
- .log("Merging bucket snapshot, bucketKeys");
- for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
- immutableBucket.merging = true;
- }
+ .log("Merging bucket snapshot");
- long mergeStartTime = System.currentTimeMillis();
- stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge);
- return asyncMergeBucketSnapshot(toBeMergeImmutableBuckets).whenComplete((__, ex) -> {
- synchronized (this) {
- for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
- immutableBucket.merging = false;
- }
+ for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
+ immutableBucket.merging = true;
}
- if (ex != null) {
- log.error()
- .attr("bucketKeys", bucketsStr)
- .exception(ex)
- .log("Failed to merge bucket snapshot, bucketKeys");
- stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
- } else {
- log.info()
- .attr("bucketKeys", bucketsStr)
- .attr("bucketNum", immutableBuckets.asMapOfRanges().size())
- .log("Merge bucket snapshot finish");
+ long mergeStartTime = System.currentTimeMillis();
+ stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge);
+ return asyncMergeBucketSnapshot(toBeMergeImmutableBuckets).whenComplete((__, ex) -> {
+ writeLock.lock();
+ try {
+ for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
+ immutableBucket.merging = false;
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ if (ex != null) {
+ log.error()
+ .attr("bucketKeys", bucketsStr)
+ .exception(ex)
+ .log("Failed to merge bucket snapshot");
- stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge,
- System.currentTimeMillis() - mergeStartTime);
- }
- });
+ stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
+ } else {
+ log.info()
+ .attr("bucketKeys", bucketsStr)
+ .attr("bucketNum", immutableBuckets.asMapOfRanges().size())
+ .log("Merge bucket snapshot finish");
+
+ stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge,
+ System.currentTimeMillis() - mergeStartTime);
+ }
+ });
+ } finally {
+ writeLock.unlock();
+ }
}
- private synchronized CompletableFuture asyncMergeBucketSnapshot(List buckets) {
+ private CompletableFuture asyncMergeBucketSnapshot(List buckets) {
List> createFutures =
buckets.stream().map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE))
.toList();
@@ -535,7 +677,8 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot(List {
- synchronized (BucketDelayedDeliveryTracker.this) {
+ writeLock.lock();
+ try {
long createStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
Pair immutableBucketDelayedIndexPair =
@@ -546,6 +689,12 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot(List delayedIndexBitMap =
new HashMap<>(buckets.get(0).getDelayedIndexBitMap());
@@ -580,24 +729,145 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot(ListThis method is always executed on {@link #bucketSnapshotExecutor} and never mutates the
+ * actively written {@link #lastMutableBucket}. The {@code bucketToSeal} instance is detached
+ * from writes before this method is scheduled. Snapshot generation writes the first segment to
+ * a local queue first; it is drained into {@link #sharedBucketPriorityQueue} only while holding
+ * {@link #writeLock}.
+ *
+ * On success, {@link #afterCreateImmutableBucket(Pair, long)} is called under the
+ * {@link #writeLock}, the new immutable bucket is registered in {@link #immutableBuckets},
+ * and {@link #bucketBeingSealed} is cleared. On failure or empty bucket, only the
+ * {@code bucketBeingSealed} state is cleared.
+ *
+ * @param bucketToSeal the mutable bucket that was selected under the write lock when the
+ * snapshot was triggered; it must not be modified by callers afterwards
+ */
+ private void createBucketSnapshotAsync(MutableBucket bucketToSeal) {
+ if (bucketToSeal == null) {
+ return;
+ }
+
+ TripleLongPriorityQueue firstSegmentQueue = new TripleLongPriorityQueue();
+ try {
+ if (bucketToSeal.isEmpty()) {
+ clearBucketBeingSealed(bucketToSeal);
+ return;
+ }
+
+ long createStartTime = System.currentTimeMillis();
+ stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
+ Pair immutableBucketDelayedIndexPair =
+ bucketToSeal.sealBucketAndAsyncPersistent(
+ this.timeStepPerBucketSnapshotSegmentInMillis,
+ this.maxIndexesPerBucketSnapshotSegment,
+ firstSegmentQueue);
+ if (immutableBucketDelayedIndexPair == null) {
+ clearBucketBeingSealed(bucketToSeal);
+ return;
+ }
+
+ writeLock.lock();
+ try {
+ if (this.bucketBeingSealed != bucketToSeal) {
+ return;
+ }
+ drainQueue(firstSegmentQueue, sharedBucketPriorityQueue);
+ afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
+ clearBucketBeingSealedUnsafe(bucketToSeal);
+ if (!sharedBucketPriorityQueue.isEmpty() && sharedBucketPriorityQueue.peekN1() <= getCutoffTime()) {
+ scheduleImmediateRun();
+ } else {
+ updateTimer();
+ }
+
+ if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
+ asyncMergeBucketSnapshot();
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ } catch (Throwable t) {
+ log.error()
+ .exception(t)
+ .log("Failed to create bucket snapshot");
+ clearBucketBeingSealed(bucketToSeal);
+ } finally {
+ firstSegmentQueue.close();
+ }
+ }
+
+ private void drainQueue(TripleLongPriorityQueue source, TripleLongPriorityQueue target) {
+ while (!source.isEmpty()) {
+ target.add(source.peekN1(), source.peekN2(), source.peekN3());
+ source.pop();
+ }
+ }
+
+ /**
+ * Clear {@link #bucketBeingSealed} if it still refers to the given bucket.
+ *
+ * This method acquires the {@link #writeLock}. Use
+ * {@link #clearBucketBeingSealedUnsafe(MutableBucket)} instead when the caller already
+ * holds the write lock.
+ */
+ private void clearBucketBeingSealed(MutableBucket bucketToSeal) {
+ writeLock.lock();
+ try {
+ clearBucketBeingSealedUnsafe(bucketToSeal);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Clear {@link #bucketBeingSealed} without acquiring the {@link #writeLock}.
+ *
+ * Callers must already hold {@link #writeLock} when invoking this method.
+ */
+ private void clearBucketBeingSealedUnsafe(MutableBucket bucketToSeal) {
+ if (this.bucketBeingSealed == bucketToSeal) {
+ this.bucketBeingSealed = null;
+ }
+ }
+
@Override
- public synchronized boolean hasMessageAvailable() {
- long cutoffTime = getCutoffTime();
+ public boolean hasMessageAvailable() {
+ readLock.lock();
+ try {
+ long cutoffTime = getCutoffTime();
- boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 && nextDeliveryTime() <= cutoffTime;
- if (!hasMessageAvailable) {
- updateTimer();
+ boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 && nextDeliveryTimeUnsafe() <= cutoffTime;
+ if (!hasMessageAvailable) {
+ updateTimer();
+ }
+ return hasMessageAvailable;
+ } finally {
+ readLock.unlock();
}
- return hasMessageAvailable;
}
@Override
- protected synchronized long nextDeliveryTime() {
+ protected long nextDeliveryTime() {
+ readLock.lock();
+ try {
+ return nextDeliveryTimeUnsafe();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private long nextDeliveryTimeUnsafe() {
if (lastMutableBucket.isEmpty() && !sharedBucketPriorityQueue.isEmpty()) {
return sharedBucketPriorityQueue.peekN1();
} else if (sharedBucketPriorityQueue.isEmpty() && !lastMutableBucket.isEmpty()) {
@@ -619,83 +889,109 @@ public long getBufferMemoryUsage() {
}
@Override
- public synchronized NavigableSet getScheduledMessages(int maxMessages) {
- if (!checkPendingLoadDone()) {
- log.debug("Skip getScheduledMessages to wait for bucket snapshot load finish");
- return Collections.emptyNavigableSet();
- }
-
+ public NavigableSet getScheduledMessages(int maxMessages) {
long cutoffTime = getCutoffTime();
- lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue);
-
NavigableSet positions = new TreeSet<>();
- int n = maxMessages;
-
- while (n > 0 && !sharedBucketPriorityQueue.isEmpty()) {
- long timestamp = sharedBucketPriorityQueue.peekN1();
- if (timestamp > cutoffTime) {
- break;
+ writeLock.lock();
+ try {
+ if (pendingLoad != null) {
+ if (!pendingLoad.isDone()) {
+ log.debug().log("Skip getScheduledMessages to wait for bucket snapshot load finish");
+ return Collections.emptyNavigableSet();
+ }
+ pendingLoad = null;
}
- long ledgerId = sharedBucketPriorityQueue.peekN2();
- long entryId = sharedBucketPriorityQueue.peekN3();
-
- SnapshotKey snapshotKey = new SnapshotKey(ledgerId, entryId);
-
- ImmutableBucket bucket = snapshotSegmentLastIndexMap.get(snapshotKey);
- if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
- // All message of current snapshot segment are scheduled, try load next snapshot segment
- if (bucket.merging) {
- log.info()
- .attr("bucketKey", bucket.bucketKey())
- .log("Skip load to wait for bucket snapshot merge finish");
+ lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue);
+ while (positions.size() < maxMessages && !sharedBucketPriorityQueue.isEmpty()) {
+ if (sharedBucketPriorityQueue.peekN1() > cutoffTime) {
+ // All remaining messages are scheduled for the future
break;
}
+ long ledgerId = sharedBucketPriorityQueue.peekN2();
+ long entryId = sharedBucketPriorityQueue.peekN3();
+ // Check if this message is a trigger to load the next snapshot segment
+ SnapshotKey snapshotKey = new SnapshotKey(ledgerId, entryId);
+ ImmutableBucket bucket = snapshotSegmentLastIndexMap.get(snapshotKey);
+ if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
+ // All message of current snapshot segment are scheduled, try load next snapshot segment
+ if (bucket.merging) {
+ log.info()
+ .attr("bucketKey", bucket.bucketKey())
+ .log("Skip load to wait for bucket snapshot merge finish");
+ break;
+ }
- final int preSegmentEntryId = bucket.currentSegmentEntryId;
- log.debug()
- .attr("bucketKey", bucket.bucketKey())
- .attr("nextSegmentEntryId", preSegmentEntryId + 1)
- .log("Loading next bucket snapshot segment");
- boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)
- .isDone();
- if (!createFutureDone) {
- log.info()
- .attr("bucketKey", bucket.bucketKey())
- .log("Skip load to wait for bucket snapshot create finish");
+ // This is the last message of a segment. We need to load the next one.
+ // Trigger the load and stop processing more messages in this run.
+ // The positions collected so far will be returned.
+ triggerAsyncLoadBucketSnapshot(bucket, snapshotKey);
break;
}
+ // If it's a regular message (or in memory-only mode), process it.
+ sharedBucketPriorityQueue.pop(); // Consume the message from the queue
+ positions.add(PositionFactory.create(ledgerId, entryId));
+ removeIndexBit(ledgerId, entryId);
+ }
+ if (!positions.isEmpty()) {
+ numberDelayedMessages.addAndGet(-positions.size());
+ updateTimer();
+ }
+ } finally {
+ writeLock.unlock();
+ }
+
+ return positions;
+ }
+
+ private void triggerAsyncLoadBucketSnapshot(ImmutableBucket bucketToLoad, SnapshotKey snapshotKeyToLoad) {
+ final int preSegmentEntryId = bucketToLoad.currentSegmentEntryId;
+ log.debug()
+ .attr("bucketKey", bucketToLoad.bucketKey())
+ .attr("nextSegmentEntryId", preSegmentEntryId + 1)
+ .log("Loading next bucket snapshot segment");
+
+ boolean createFutureDone = bucketToLoad.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
+ if (!createFutureDone) {
+ log.info()
+ .attr("bucketKey", bucketToLoad.bucketKey())
+ .log("Skip load to wait for bucket snapshot create finish");
+ return;
+ }
- long loadStartTime = System.currentTimeMillis();
- stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
- CompletableFuture loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry()
- .thenAccept(indexList -> {
- synchronized (BucketDelayedDeliveryTracker.this) {
- this.snapshotSegmentLastIndexMap.remove(snapshotKey);
+ long loadStartTime = System.currentTimeMillis();
+ stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
+ pendingLoad = bucketToLoad.asyncLoadNextBucketSnapshotEntry()
+ .thenAccept(indexList -> {
+ writeLock.lock();
+ try {
+ this.snapshotSegmentLastIndexMap.remove(snapshotKeyToLoad);
if (CollectionUtils.isEmpty(indexList)) {
immutableBuckets.asMapOfRanges()
- .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
- bucket.asyncDeleteBucketSnapshot(stats);
+ .remove(Range.closed(bucketToLoad.startLedgerId, bucketToLoad.endLedgerId));
+ bucketToLoad.asyncDeleteBucketSnapshot(stats);
return;
}
DelayedIndex
lastDelayedIndex = indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexMap.put(
new SnapshotKey(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId()),
- bucket);
+ bucketToLoad);
for (DelayedIndex index : indexList) {
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
+ } finally {
+ writeLock.unlock();
}
}).whenComplete((__, ex) -> {
if (ex != null) {
// Back bucket state
- bucket.setCurrentSegmentEntryId(preSegmentEntryId);
+ bucketToLoad.setCurrentSegmentEntryId(preSegmentEntryId);
log.error()
- .attr("bucketKey", bucket.bucketKey())
+ .attr("bucketKey", bucketToLoad.bucketKey())
.attr("segmentEntryId", preSegmentEntryId + 1)
.exception(ex)
.log("Failed to load bucket snapshot segment");
@@ -703,76 +999,116 @@ public synchronized NavigableSet getScheduledMessages(int maxMessages)
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
} else {
log.info()
- .attr("bucketKey", bucket.bucketKey())
+ .attr("bucketKey", bucketToLoad.bucketKey())
.attr("segmentEntryId",
- (preSegmentEntryId == bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId
- + 1)
+ (preSegmentEntryId == bucketToLoad.lastSegmentEntryId)
+ ? "-1" : preSegmentEntryId + 1)
.log("Load next bucket snapshot segment finish");
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
System.currentTimeMillis() - loadStartTime);
}
- synchronized (this) {
- if (timeout != null) {
- timeout.cancel();
- }
- timeout = timer.newTimeout(this, 0, TimeUnit.MILLISECONDS);
- }
+ scheduleImmediateRun();
});
+ }
- if (!checkPendingLoadDone() || loadFuture.isCompletedExceptionally()) {
- break;
- }
- }
-
- positions.add(PositionFactory.create(ledgerId, entryId));
-
- sharedBucketPriorityQueue.pop();
- removeIndexBit(ledgerId, entryId);
+ @Override
+ public boolean shouldPauseAllDeliveries() {
+ return false;
+ }
- --n;
- numberDelayedMessages.decrementAndGet();
+ @Override
+ public CompletableFuture clear() {
+ CompletableFuture snapshotFuture;
+ CompletableFuture loadFuture;
+ writeLock.lock();
+ try {
+ snapshotFuture = pendingBucketSnapshot;
+ loadFuture = pendingLoad;
+ } finally {
+ writeLock.unlock();
}
- updateTimer();
+ return FutureUtil.waitForAll(List.of(ignoreFailure(snapshotFuture), ignoreFailure(loadFuture)))
+ .thenCompose(__ -> clearAfterPendingOperations());
+ }
- return positions;
+ private static CompletableFuture ignoreFailure(CompletableFuture future) {
+ if (future == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return future.handle((__, ex) -> null);
}
- private synchronized boolean checkPendingLoadDone() {
- if (pendingLoad == null || pendingLoad.isDone()) {
+ private CompletableFuture clearAfterPendingOperations() {
+ writeLock.lock();
+ try {
+ CompletableFuture future = cleanImmutableBuckets();
+ sharedBucketPriorityQueue.clear();
+ lastMutableBucket.clear();
+ clearBucketBeingSealedUnsafe(bucketBeingSealed);
+ snapshotSegmentLastIndexMap.clear();
pendingLoad = null;
- return true;
+ numberDelayedMessages.set(0);
+ return future;
+ } finally {
+ writeLock.unlock();
}
- return false;
}
@Override
- public boolean shouldPauseAllDeliveries() {
- return false;
- }
+ public void close() {
+ writeLock.lock();
+ try {
+ super.close();
+ } finally {
+ writeLock.unlock();
+ }
- @Override
- public synchronized CompletableFuture clear() {
- CompletableFuture future = cleanImmutableBuckets();
- sharedBucketPriorityQueue.clear();
- lastMutableBucket.clear();
- snapshotSegmentLastIndexMap.clear();
- numberDelayedMessages.set(0);
- return future;
- }
+ bucketSnapshotExecutor.shutdown();
+ try {
+ if (!bucketSnapshotExecutor.awaitTermination(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
+ log.warn().log("bucketSnapshotExecutor did not terminate in the specified time");
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ log.warn()
+ .exception(ie)
+ .log("Interrupted while waiting for bucketSnapshotExecutor to terminate");
+ }
+
+ List> completableFutures = Collections.emptyList();
+ writeLock.lock();
+ try {
+ try {
+ completableFutures = immutableBuckets.asMapOfRanges().values().stream()
+ .map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
+ } catch (Exception e) {
+ log.warn()
+ .exception(e)
+ .log("Failed wait to snapshot generate");
+ }
+ } finally {
+ writeLock.unlock();
+ }
- @Override
- public synchronized void close() {
- super.close();
- lastMutableBucket.close();
- sharedBucketPriorityQueue.close();
try {
- List> completableFutures = immutableBuckets.asMapOfRanges().values().stream()
- .map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
- FutureUtil.waitForAll(completableFutures).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+ if (!completableFutures.isEmpty()) {
+ FutureUtil.waitForAll(completableFutures).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+ }
} catch (Exception e) {
- log.warn().exception(e).log("Failed wait to snapshot generate");
+ log.warn()
+ .exception(e)
+ .log("Failed wait to snapshot generate");
+ }
+
+ writeLock.lock();
+ try {
+ super.close();
+ lastMutableBucket.close();
+ sharedBucketPriorityQueue.close();
+ } finally {
+ writeLock.unlock();
}
}
@@ -797,11 +1133,28 @@ private boolean removeIndexBit(long ledgerId, long entryId) {
.orElse(false);
}
- public synchronized boolean containsMessage(long ledgerId, long entryId) {
+ public boolean containsMessage(long ledgerId, long entryId) {
+ readLock.lock();
+ try {
+ return containsMessageUnsafe(ledgerId, entryId);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private boolean containsMessageUnsafe(long ledgerId, long entryId) {
if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
return true;
}
+ MutableBucket sealingBucket = this.bucketBeingSealed;
+ if (sealingBucket != null
+ && ledgerId >= sealingBucket.startLedgerId
+ && ledgerId <= sealingBucket.endLedgerId
+ && sealingBucket.containsMessage(ledgerId, entryId)) {
+ return true;
+ }
+
return findImmutableBucket(ledgerId).map(bucket -> bucket.containsMessage(ledgerId, entryId))
.orElse(false);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
index 006938e9ed271..526b7d7532729 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
@@ -48,7 +48,23 @@ static class Node {
private CombinedSegmentDelayedIndexQueue(List> segmentLists) {
this.kpq = new PriorityQueue<>(segmentLists.size(), COMPARATOR_NODE);
for (List segmentList : segmentLists) {
- Node node = new Node(segmentList, 0, 0);
+ if (segmentList == null || segmentList.isEmpty()) {
+ // Skip empty segment lists, there is nothing to merge from them.
+ continue;
+ }
+
+ // Advance to the first non-empty segment in this list.
+ int segmentListCursor = 0;
+ while (segmentListCursor < segmentList.size()
+ && segmentList.get(segmentListCursor).getIndexesCount() == 0) {
+ segmentListCursor++;
+ }
+ if (segmentListCursor >= segmentList.size()) {
+ // All segments are empty, skip this list entirely.
+ continue;
+ }
+
+ Node node = new Node(segmentList, segmentListCursor, 0);
kpq.offer(node);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index 75a3fad65892a..93209584c74af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -54,19 +54,27 @@ Pair sealBucketAndAsyncPersistent(
TripleLongPriorityQueue sharedQueue) {
return createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment,
maxIndexesPerBucketSnapshotSegment, sharedQueue,
- TripleLongPriorityDelayedIndexQueue.wrap(priorityQueue), startLedgerId, endLedgerId);
+ TripleLongPriorityDelayedIndexQueue.wrap(priorityQueue), startLedgerId, endLedgerId, false);
}
Pair createImmutableBucketAndAsyncPersistent(
final long timeStepPerBucketSnapshotSegment, final int maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue, DelayedIndexQueue delayedIndexQueue, final long startLedgerId,
final long endLedgerId) {
- log.debug()
- .attr("dispatcher", dispatcherName)
- .attr("startLedgerId", startLedgerId)
- .attr("endLedgerId", endLedgerId)
- .log("Creating bucket snapshot");
- if (delayedIndexQueue.isEmpty()) {
+ return createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment,
+ maxIndexesPerBucketSnapshotSegment, sharedQueue, delayedIndexQueue, startLedgerId, endLedgerId, true);
+ }
+
+ private Pair createImmutableBucketAndAsyncPersistent(
+ final long timeStepPerBucketSnapshotSegment, final int maxIndexesPerBucketSnapshotSegment,
+ TripleLongPriorityQueue sharedQueue, DelayedIndexQueue delayedIndexQueue, final long startLedgerId,
+ final long endLedgerId, boolean removeSourceIndexBits) {
+ log.debug()
+ .attr("dispatcher", dispatcherName)
+ .attr("startLedgerId", startLedgerId)
+ .attr("endLedgerId", endLedgerId)
+ .log("Creating bucket snapshot");
+ if (delayedIndexQueue.isEmpty()) {
return null;
}
long numMessages = 0;
@@ -96,7 +104,9 @@ Pair createImmutableBucketAndAsyncPersistent(
final long ledgerId = delayedIndex.getLedgerId();
final long entryId = delayedIndex.getEntryId();
- removeIndexBit(ledgerId, entryId);
+ if (removeSourceIndexBits) {
+ removeIndexBit(ledgerId, entryId);
+ }
checkArgument(ledgerId >= startLedgerId && ledgerId <= endLedgerId);
@@ -146,7 +156,9 @@ Pair createImmutableBucketAndAsyncPersistent(
// optimize bm
immutableBucketBitMap.values().forEach(RoaringBitmap::runOptimize);
- this.delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
+ if (removeSourceIndexBits) {
+ this.delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
+ }
SnapshotMetadata bucketSnapshotMetadata = new SnapshotMetadata();
for (SnapshotSegmentMetadata sm : segmentMetadataList) {
@@ -231,7 +243,7 @@ void addMessage(long ledgerId, long entryId, long deliverAt) {
if (startLedgerId == -1L) {
this.startLedgerId = ledgerId;
}
- this.endLedgerId = ledgerId;
+ this.endLedgerId = Math.max(endLedgerId, ledgerId);
putIndexBit(ledgerId, entryId);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index efce0f1d8e51f..777210078fd38 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -40,6 +40,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -188,28 +189,34 @@ public void testContainsMessage(BucketDelayedDeliveryTracker tracker) {
@Test(dataProvider = "delayedTracker", invocationCount = 10)
public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) throws Exception {
- for (int i = 1; i <= 100; i++) {
+ final int minIndexCountPerBucket = 5;
+ final int messagesToSnapshot = 100;
+ final int triggerMessageCount = messagesToSnapshot + 1;
+ for (int i = 1; i <= triggerMessageCount; i++) {
tracker.addMessage(i, i, i * 10);
+ boolean isTriggerPoint = i > minIndexCountPerBucket && (i - 1) % minIndexCountPerBucket == 0;
+ if (isTriggerPoint) {
+ final int expectedBuckets = (i - 1) / minIndexCountPerBucket;
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .pollInterval(10, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ // 1. Confirm the number of immutable buckets matches the expected
+ assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), expectedBuckets,
+ "Expected number of immutable buckets did not match.");
+ // 2. Confirm that the persistence Future for
+ // all snapshots of created immutable buckets have all completed
+ assertTrue(tracker.getImmutableBuckets().asMapOfRanges().values().stream()
+ .allMatch(bucket -> bucket.getSnapshotCreateFuture()
+ .map(CompletableFuture::isDone)
+ .orElse(true)),
+ "Not all snapshot creation futures were completed.");
+ });
+ }
}
- assertEquals(tracker.getNumberOfDelayedMessages(), 100);
-
- clockTime.set(1 * 10);
-
- Awaitility.await().untilAsserted(() -> {
- Assert.assertTrue(
- tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x -> x.merging
- || !x.getSnapshotCreateFuture().get().isDone()));
- });
-
- assertTrue(tracker.hasMessageAvailable());
- Set scheduledMessages = new TreeSet<>();
- Awaitility.await().untilAsserted(() -> {
- scheduledMessages.addAll(tracker.getScheduledMessages(100));
- assertEquals(scheduledMessages.size(), 1);
- });
-
- tracker.addMessage(101, 101, 101 * 10);
+ assertEquals(tracker.getNumberOfDelayedMessages(), 101);
+ assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 20);
+ assertEquals(tracker.getLastMutableBucket().size(), 1);
tracker.close();
@@ -311,7 +318,7 @@ public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) throws
clockTime.set(110 * 10);
NavigableSet scheduledMessages = new TreeSet<>();
- Awaitility.await().untilAsserted(() -> {
+ Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
scheduledMessages.addAll(tracker2.getScheduledMessages(110));
assertEquals(scheduledMessages.size(), 110);
});
@@ -388,7 +395,7 @@ public void testWithBkException(final BucketDelayedDeliveryTracker tracker) thro
assertEquals(tracker2.getScheduledMessages(100).size(), 0);
Set scheduledMessages = new TreeSet<>();
- Awaitility.await().untilAsserted(() -> {
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
scheduledMessages.addAll(tracker2.getScheduledMessages(100));
assertEquals(scheduledMessages.size(), delayedMessagesInSnapshotValue);
});
@@ -436,11 +443,33 @@ public void testWithCreateFailDowngrade(BucketDelayedDeliveryTracker tracker) {
@Test(dataProvider = "delayedTracker")
public void testMaxIndexesPerSegment(BucketDelayedDeliveryTracker tracker) {
- for (int i = 1; i <= 101; i++) {
+ final int minIndexCountPerBucket = 20;
+ final int totalMessages = 101;
+ final int expectedFinalBucketCount = (totalMessages - 1) / minIndexCountPerBucket;
+ for (int i = 1; i <= totalMessages; i++) {
tracker.addMessage(i, i, i * 10);
+
+ // The trigger point is the next message after the bucket is full.
+ // For example, i=21 triggers the 1st bucket, i=41 triggers the 2nd, ..., i=101 triggers the 5th
+ boolean isTriggerPoint = i > minIndexCountPerBucket && (i - 1) % minIndexCountPerBucket == 0;
+
+ if (isTriggerPoint) {
+ final int expectedBuckets = (i - 1) / minIndexCountPerBucket;
+
+ // Wait until the background bucket creation task is completed
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .pollInterval(10, TimeUnit.MILLISECONDS)
+ .untilAsserted(() ->
+ assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), expectedBuckets)
+ );
+ }
}
- assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 5);
+ // After the loop ends, we expect to have 5 immutable buckets
+ assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), expectedFinalBucketCount);
+
+ // And, the lastMutableBucket should only have the last message (101) left.
+ assertEquals(tracker.getLastMutableBucket().size(), 1);
tracker.getImmutableBuckets().asMapOfRanges().forEach((k, bucket) -> {
assertEquals(bucket.getLastSegmentEntryId(), 4);
@@ -452,21 +481,23 @@ public void testMaxIndexesPerSegment(BucketDelayedDeliveryTracker tracker) {
@Test(dataProvider = "delayedTracker")
public void testClear(BucketDelayedDeliveryTracker tracker)
throws ExecutionException, InterruptedException, TimeoutException {
- for (int i = 1; i <= 1001; i++) {
+ for (int i = 1; i <= 1001; i++) {
tracker.addMessage(i, i, i * 10);
- }
+ }
- assertEquals(tracker.getNumberOfDelayedMessages(), 1001);
- assertTrue(tracker.getImmutableBuckets().asMapOfRanges().size() > 0);
- assertEquals(tracker.getLastMutableBucket().size(), 1);
+ assertEquals(tracker.getNumberOfDelayedMessages(), 1001);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
+ Assert.assertFalse(tracker.getImmutableBuckets().asMapOfRanges().isEmpty())
+ );
+ assertEquals(tracker.getLastMutableBucket().size(), 1);
- tracker.clear().get(1, TimeUnit.MINUTES);
+ tracker.clear().get(1, TimeUnit.MINUTES);
- assertEquals(tracker.getNumberOfDelayedMessages(), 0);
- assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 0);
- assertEquals(tracker.getLastMutableBucket().size(), 0);
- assertEquals(tracker.getSharedBucketPriorityQueue().size(), 0);
+ assertEquals(tracker.getNumberOfDelayedMessages(), 0);
+ assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 0);
+ assertEquals(tracker.getLastMutableBucket().size(), 0);
+ assertEquals(tracker.getSharedBucketPriorityQueue().size(), 0);
- tracker.close();
+ tracker.close();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
index dca8f2d6fb67b..fefa14ef788d0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.delayed.bucket;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -27,17 +26,20 @@
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.Timer;
import java.time.Clock;
+import java.util.NavigableSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.delayed.MockBucketSnapshotStorage;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -45,14 +47,14 @@
/**
* Thread safety tests for BucketDelayedDeliveryTracker.
- * These tests verify that the hybrid approach with StampedLock and concurrent data structures
+ * These tests verify that the hybrid approach with ReentrantReadWriteLock and concurrent data structures
* correctly handles concurrent access patterns without deadlocks, race conditions, or data corruption.
*/
+@Slf4j
public class BucketDelayedDeliveryTrackerThreadSafetyTest {
private BucketDelayedDeliveryTracker tracker;
private AbstractPersistentDispatcherMultipleConsumers dispatcher;
- private ManagedCursor cursor;
private Timer timer;
private BucketSnapshotStorage storage;
private ExecutorService executorService;
@@ -60,9 +62,9 @@ public class BucketDelayedDeliveryTrackerThreadSafetyTest {
@BeforeMethod
public void setUp() throws Exception {
dispatcher = mock(AbstractPersistentDispatcherMultipleConsumers.class);
- cursor = mock(ManagedCursor.class);
+ final ManagedCursor cursor = mock(ManagedCursor.class);
timer = mock(Timer.class);
- storage = mock(BucketSnapshotStorage.class);
+ storage = new MockBucketSnapshotStorage();
when(dispatcher.getName()).thenReturn("persistent://public/default/test-topic / test-cursor");
when(dispatcher.getCursor()).thenReturn(cursor);
@@ -75,18 +77,6 @@ public void setUp() throws Exception {
when(cursor.removeCursorProperty(any()))
.thenReturn(CompletableFuture.completedFuture(null));
- // Mock storage operations to avoid NullPointerException
- when(storage.createBucketSnapshot(any(), any(), any(), any(), any()))
- .thenReturn(CompletableFuture.completedFuture(1L));
- when(storage.getBucketSnapshotMetadata(anyLong()))
- .thenReturn(CompletableFuture.completedFuture(null));
- when(storage.getBucketSnapshotSegment(anyLong(), anyLong(), anyLong()))
- .thenReturn(CompletableFuture.completedFuture(java.util.Collections.emptyList()));
- when(storage.getBucketSnapshotLength(anyLong()))
- .thenReturn(CompletableFuture.completedFuture(0L));
- when(storage.deleteBucketSnapshot(anyLong()))
- .thenReturn(CompletableFuture.completedFuture(null));
-
tracker = new BucketDelayedDeliveryTracker(
dispatcher, timer, 1000, Clock.systemUTC(), true, storage,
100, 1000, 100, 10 // Restore original minIndexCountPerBucket for proper testing
@@ -97,83 +87,105 @@ public void setUp() throws Exception {
@AfterMethod
public void tearDown() throws Exception {
+ // First shutdown executor to stop all threads
+ if (executorService != null) {
+ assertTrue(MoreExecutors.shutdownAndAwaitTermination(executorService, 5, TimeUnit.SECONDS),
+ "Executor should shutdown cleanly");
+ }
+ // Then close tracker safely after all threads stopped
if (tracker != null) {
tracker.close();
}
- if (executorService != null) {
- assertTrue(MoreExecutors.shutdownAndAwaitTermination(executorService, 5, TimeUnit.SECONDS),
- "Executor should shutdown cleanly");
+ if (storage != null) {
+ storage.close();
}
}
/**
- * Test concurrent containsMessage() calls while adding messages.
- * This tests the StampedLock optimistic read performance under contention.
+ * Test concurrent containsMessage() calls while adding messages sequentially.
+ * This tests the ReentrantReadWriteLock read performance under contention.
+ * addMessage is executed sequentially (as in real scenarios), while containsMessage is concurrent.
*/
@Test
public void testConcurrentContainsMessageWithWrites() throws Exception {
- final int numThreads = 16;
- final int operationsPerThread = 1000; // Restore to test bucket creation properly
+ final int numReadThreads = 8;
+ final int readsPerThread = 1000;
+ final int totalMessages = 5000;
final CountDownLatch startLatch = new CountDownLatch(1);
- final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+ final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+ final CountDownLatch writerDone = new CountDownLatch(1);
final AtomicInteger errors = new AtomicInteger(0);
final AtomicReference firstException = new AtomicReference<>();
+ final AtomicInteger messagesAdded = new AtomicInteger(0);
- // Start reader threads
- for (int i = 0; i < numThreads / 2; i++) {
- final int threadId = i;
+ // Start reader threads - these will run concurrently
+ for (int i = 0; i < numReadThreads; i++) {
executorService.submit(() -> {
try {
startLatch.await();
- for (int j = 0; j < operationsPerThread; j++) {
- long ledgerId = threadId * 1000 + j;
- long entryId = j;
+ // Continuously read for a period while messages are being added
+ long endTime = System.currentTimeMillis() + 10000;
+ int readCount = 0;
+ while (System.currentTimeMillis() < endTime && readCount < readsPerThread) {
+ // Check for messages across the range that might be added
+ long ledgerId = 1000 + (readCount % totalMessages);
+ long entryId = readCount % 100;
// This should not throw exceptions or block indefinitely
tracker.containsMessage(ledgerId, entryId);
+ readCount++;
+ if (readCount % 100 == 0) {
+ Thread.sleep(1);
+ }
}
} catch (Exception e) {
errors.incrementAndGet();
firstException.compareAndSet(null, e);
e.printStackTrace();
} finally {
- doneLatch.countDown();
+ readersDone.countDown();
}
});
}
- // Start writer threads
- for (int i = numThreads / 2; i < numThreads; i++) {
- final int threadId = i;
- executorService.submit(() -> {
- try {
- startLatch.await();
- for (int j = 0; j < operationsPerThread; j++) {
- long ledgerId = threadId * 1000 + j;
- long entryId = j;
- long deliverAt = System.currentTimeMillis() + 10000; // 10s delay
- tracker.addMessage(ledgerId, entryId, deliverAt);
+ // Start the single writer thread - sequential addMessage calls
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < totalMessages; i++) {
+ long ledgerId = 1000 + i;
+ long entryId = i % 100;
+ long deliverAt = System.currentTimeMillis() + 10000;
+ boolean added = tracker.addMessage(ledgerId, entryId, deliverAt);
+ if (added) {
+ messagesAdded.incrementAndGet();
+ }
+ // Small delay to simulate real processing time
+ if (i % 100 == 0) {
+ Thread.sleep(1);
}
- } catch (Exception e) {
- errors.incrementAndGet();
- firstException.compareAndSet(null, e);
- e.printStackTrace();
- } finally {
- doneLatch.countDown();
}
- });
- }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ writerDone.countDown();
+ }
+ });
startLatch.countDown();
- assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds");
+ assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should complete within 30 seconds");
+ assertTrue(writerDone.await(30, TimeUnit.SECONDS), "Writer should complete within 30 seconds");
if (errors.get() > 0) {
Exception exception = firstException.get();
if (exception != null) {
- System.err.println("First exception caught: " + exception.getMessage());
+ log.error("First exception caught: " + exception.getMessage());
exception.printStackTrace();
}
}
assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations");
+ assertTrue(messagesAdded.get() > 0, "Some messages should have been added");
}
/**
@@ -199,7 +211,7 @@ public void testAddMessageWithNonMonotonicLedgerIds() {
/**
* Test concurrent nextDeliveryTime() calls.
- * This verifies the StampedLock implementation for read-heavy operations.
+ * This verifies the ReentrantReadWriteLock implementation for read-heavy operations.
*/
@Test
public void testConcurrentNextDeliveryTime() throws Exception {
@@ -245,75 +257,149 @@ public void testConcurrentNextDeliveryTime() throws Exception {
*/
@Test
public void testDeadlockDetection() throws Exception {
- final int numThreads = 32;
- final int operationsPerThread = 100;
- // Use Phaser for better concurrency coordination
- final Phaser startPhaser = new Phaser(numThreads + 1); // +1 for main thread
- final Phaser endPhaser = new Phaser(numThreads + 1); // +1 for main thread
+ final int numReadThreads = 30;
+ final int operationsPerThread = 200;
+ final int writeOperations = 1000;
+
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch writerDone = new CountDownLatch(1);
+ final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
final AtomicBoolean deadlockDetected = new AtomicBoolean(false);
final AtomicInteger completedOperations = new AtomicInteger(0);
+ final AtomicInteger writesCompleted = new AtomicInteger(0);
+ final AtomicReference firstException = new AtomicReference<>();
+ // Single writer thread - executes addMessage sequentially
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < writeOperations; i++) {
+ try {
+ long ledgerId = 50000 + i;
+ long entryId = i;
+ tracker.addMessage(ledgerId, entryId, System.currentTimeMillis() + 10000);
+ writesCompleted.incrementAndGet();
+ completedOperations.incrementAndGet();
- // Mixed workload: reads, writes, and metric queries
- for (int i = 0; i < numThreads; i++) {
+ // Small delay to allow read threads to interleave
+ if (i % 50 == 0) {
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ if (!(e instanceof IllegalArgumentException)) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ break;
+ }
+ completedOperations.incrementAndGet();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ deadlockDetected.set(true);
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ writerDone.countDown();
+ }
+ });
+ // Multiple reader threads - execute read operations concurrently
+ for (int i = 0; i < numReadThreads; i++) {
final int threadId = i;
- final int workloadType = i % 4;
-
+ final int readOperationType = i % 3;
executorService.submit(() -> {
try {
- // Wait for all threads to be ready
- startPhaser.arriveAndAwaitAdvance();
+ startLatch.await();
- for (int j = 0; j < operationsPerThread; j++) {
+ // Continue reading until writer is done, plus some extra operations
+ int operationCount = 0;
+ while ((writerDone.getCount() > 0 || operationCount < operationsPerThread)) {
try {
- switch (workloadType) {
- case 0: // containsMessage calls
- tracker.containsMessage(threadId * 1000 + j, j);
+ switch (readOperationType) {
+ case 0:
+ // Check both existing and potentially non-existing messages
+ long ledgerId = 50000 + (operationCount % (writeOperations + 100));
+ long entryId = operationCount % 1000;
+ tracker.containsMessage(ledgerId, entryId);
break;
- case 1: // addMessage calls
- tracker.addMessage(threadId * 1000 + j, j, System.currentTimeMillis() + 5000);
- break;
- case 2: // nextDeliveryTime calls
+ case 1:
tracker.nextDeliveryTime();
break;
- case 3: // getNumberOfDelayedMessages calls
+ case 2:
tracker.getNumberOfDelayedMessages();
break;
}
completedOperations.incrementAndGet();
+ operationCount++;
+
+ // Small delay to prevent excessive CPU usage
+ if (operationCount % 100 == 0) {
+ Thread.sleep(1);
+ }
} catch (IllegalArgumentException e) {
- // IllegalArgumentException is expected for some operations
- // (e.g., calling nextDeliveryTime on empty queue, invalid ledger IDs)
- // This is not a deadlock, just normal validation
+ // Expected for some operations (e.g., nextDeliveryTime on empty queue)
completedOperations.incrementAndGet();
+ operationCount++;
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ break;
+ }
+
+ // Break if we've done enough operations and writer is done
+ if (writerDone.getCount() == 0 && operationCount >= operationsPerThread) {
+ break;
}
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ deadlockDetected.set(true);
} catch (Exception e) {
- // Only unexpected exceptions indicate potential deadlocks
- if (!(e instanceof IllegalArgumentException)) {
- deadlockDetected.set(true);
- e.printStackTrace();
- }
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
} finally {
- // Signal completion
- endPhaser.arriveAndDeregister();
+ readersDone.countDown();
}
});
}
+ // Start all threads
+ startLatch.countDown();
+ // Wait for completion with timeout to detect deadlocks
+ boolean writerCompleted = false;
+ boolean readersCompleted = false;
- // Start all threads at once
- startPhaser.arriveAndAwaitAdvance();
-
- // Wait for all threads to complete with timeout to detect potential deadlocks
try {
- endPhaser.awaitAdvanceInterruptibly(endPhaser.arrive(), 60, TimeUnit.SECONDS);
- } catch (Exception e) {
- // Timeout or interrupt indicates potential deadlock
+ writerCompleted = writerDone.await(30, TimeUnit.SECONDS);
+ readersCompleted = readersDone.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
deadlockDetected.set(true);
- e.printStackTrace();
}
-
- assertTrue(!deadlockDetected.get(), "No deadlocks should be detected");
+ // Check for deadlock indicators
+ if (!writerCompleted || !readersCompleted) {
+ deadlockDetected.set(true);
+ log.error("Test timed out - potential deadlock detected. Writer completed: {}, Readers completed: {}",
+ writerCompleted, readersCompleted);
+ }
+ // Assert results
+ if (deadlockDetected.get()) {
+ Exception e = firstException.get();
+ if (e != null) {
+ throw new AssertionError("Deadlock or exception detected during test execution", e);
+ } else {
+ throw new AssertionError("Deadlock detected - test did not complete within timeout");
+ }
+ }
+ // Verify that operations actually completed
assertTrue(completedOperations.get() > 0, "Some operations should complete");
+ assertTrue(writesCompleted.get() > 0, "Some write operations should complete");
+
+ log.info("Deadlock test completed successfully. Total operations: {}, Writes completed: {}",
+ completedOperations.get(), writesCompleted.get());
}
/**
@@ -322,85 +408,100 @@ public void testDeadlockDetection() throws Exception {
*/
@Test
public void testDataConsistencyUnderConcurrency() throws Exception {
- final int numWriteThreads = 8;
final int numReadThreads = 16;
- final int messagesPerWriter = 500;
+ final int totalMessages = 4000;
+ final int readsPerThread = 1000;
final CountDownLatch startLatch = new CountDownLatch(1);
- final CountDownLatch writersDone = new CountDownLatch(numWriteThreads);
final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicReference firstException = new AtomicReference<>();
final AtomicInteger foundMessages = new AtomicInteger(0);
final AtomicInteger totalMessagesAdded = new AtomicInteger(0);
-
- // Writer threads add messages
- for (int i = 0; i < numWriteThreads; i++) {
- final int writerId = i;
- executorService.submit(() -> {
- try {
- startLatch.await();
- for (int j = 0; j < messagesPerWriter; j++) {
- long ledgerId = writerId * 10000 + j;
- long entryId = j;
- boolean added = tracker.addMessage(ledgerId, entryId, System.currentTimeMillis() + 30000);
- if (added) {
- totalMessagesAdded.incrementAndGet();
- }
- }
- } catch (Exception e) {
- // Ignore exceptions for this test
- } finally {
- writersDone.countDown();
- }
- });
- }
-
- // Reader threads check for messages
+ // Start reader threads - these will run concurrently
for (int i = 0; i < numReadThreads; i++) {
final int readerId = i;
executorService.submit(() -> {
try {
startLatch.await();
+ // Continuously read for a period while messages are being added
+ long endTime = System.currentTimeMillis() + 12000;
+ int readCount = 0;
+ while (System.currentTimeMillis() < endTime && readCount < readsPerThread) {
+ // Check for messages across the range that might be added
+ int messageIndex = readCount % totalMessages;
+ long ledgerId = 10000 + messageIndex;
+ long entryId = messageIndex % 100;
- // Read for a while to catch messages being added
- long endTime = System.currentTimeMillis() + 5000; // Read for 5 seconds
- while (System.currentTimeMillis() < endTime) {
- for (int writerId = 0; writerId < numWriteThreads; writerId++) {
- for (int j = 0; j < messagesPerWriter; j++) {
- long ledgerId = writerId * 10000 + j;
- long entryId = j;
- if (tracker.containsMessage(ledgerId, entryId)) {
- foundMessages.incrementAndGet();
- }
- }
+ if (tracker.containsMessage(ledgerId, entryId)) {
+ foundMessages.incrementAndGet();
+ }
+ readCount++;
+
+ if (readCount % 200 == 0) {
+ Thread.sleep(1);
}
- Thread.sleep(10); // Small delay to allow writes
}
} catch (Exception e) {
- // Ignore exceptions for this test
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
} finally {
readersDone.countDown();
}
});
}
-
+ // Start the single writer thread - sequential addMessage calls
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < totalMessages; i++) {
+ long ledgerId = 10000 + i;
+ long entryId = i % 100;
+ long deliverAt = System.currentTimeMillis() + 30000;
+ boolean added = tracker.addMessage(ledgerId, entryId, deliverAt);
+ if (added) {
+ totalMessagesAdded.incrementAndGet();
+ }
+ // Small delay to simulate real processing time and allow reads
+ if (i % 200 == 0) {
+ Thread.sleep(2);
+ }
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ }
+ });
startLatch.countDown();
-
- assertTrue(writersDone.await(30, TimeUnit.SECONDS), "Writers should complete");
- assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should complete");
-
+ assertTrue(readersDone.await(40, TimeUnit.SECONDS), "Readers should complete within 40 seconds");
+ // Check for errors during concurrent operations
+ if (errors.get() > 0) {
+ Exception exception = firstException.get();
+ if (exception != null) {
+ log.error("First exception caught: " + exception.getMessage());
+ exception.printStackTrace();
+ }
+ }
+ assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations");
// Verify final consistency
long finalMessageCount = tracker.getNumberOfDelayedMessages();
assertTrue(finalMessageCount >= 0, "Message count should be non-negative");
-
- // The exact counts may vary due to timing, but we should have some successful operations
+ // The exact counts may vary due to timing, but we should have successful operations
assertTrue(totalMessagesAdded.get() > 0, "Some messages should have been added");
+ assertTrue(foundMessages.get() >= 0, "Found messages count should be non-negative");
+
+ // Log results for analysis
+ log.info("Total messages added: {}, Found messages: {}, Final message count: {}",
+ totalMessagesAdded.get(), foundMessages.get(), finalMessageCount);
}
/**
- * Test optimistic read performance under varying contention levels.
- * This helps validate that the StampedLock optimistic reads are working efficiently.
+ * Test read performance under varying contention levels.
+ * This helps validate that the ReentrantReadWriteLock reads are working efficiently.
*/
@Test
- public void testOptimisticReadPerformance() throws Exception {
+ public void testReadPerformanceUnderContention() throws Exception {
// Add baseline messages
for (int i = 0; i < 1000; i++) {
tracker.addMessage(i, i, System.currentTimeMillis() + 60000);
@@ -451,4 +552,478 @@ public void testOptimisticReadPerformance() throws Exception {
assertTrue(throughput > 10000, "Should achieve at least 10K ops/sec with " + numThreads + " threads");
}
}
-}
\ No newline at end of file
+
+ /**
+ * Test concurrent getScheduledMessages() calls with read operations.
+ * getScheduledMessages() uses write lock while read operations use read lock.
+ * Messages are added beforehand to avoid concurrent addMessage calls.
+ */
+ @Test
+ public void testConcurrentGetScheduledMessagesWithReads() throws Exception {
+ // Add messages that will be ready for delivery after a short delay
+ final long baseTime = System.currentTimeMillis();
+ final int totalMessages = 500;
+
+ // Add messages with delivery time slightly in the future, then wait for them to become ready
+ for (int i = 0; i < totalMessages; i++) {
+ tracker.addMessage(i, i, baseTime + 1000);
+ }
+ assertEquals(tracker.getNumberOfDelayedMessages(), totalMessages, "All messages should be added");
+ // Wait for messages to become ready for delivery
+ Thread.sleep(3000);
+ final int numReadThreads = 12;
+ final int numScheduleThreads = 4;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+ final CountDownLatch schedulersDone = new CountDownLatch(numScheduleThreads);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicReference firstException = new AtomicReference<>();
+ final AtomicInteger totalMessagesRetrieved = new AtomicInteger(0);
+ // Start read threads (containsMessage and nextDeliveryTime)
+ for (int i = 0; i < numReadThreads; i++) {
+ final int threadId = i;
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ for (int j = 0; j < 1000; j++) {
+ if (threadId % 2 == 0) {
+ tracker.containsMessage(j % totalMessages, j % totalMessages);
+ } else {
+ try {
+ tracker.nextDeliveryTime();
+ } catch (IllegalArgumentException e) {
+ // Expected when no messages available
+ }
+ }
+ if (j % 100 == 0) {
+ Thread.sleep(1);
+ }
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ readersDone.countDown();
+ }
+ });
+ }
+ // Start getScheduledMessages threads - continue until all messages are retrieved
+ for (int i = 0; i < numScheduleThreads; i++) {
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ final long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(20);
+
+ while (totalMessagesRetrieved.get() < totalMessages
+ && System.nanoTime() < deadlineNanos) {
+ NavigableSet messages = tracker.getScheduledMessages(50);
+ int retrieved = messages.size();
+ totalMessagesRetrieved.addAndGet(retrieved);
+
+ if (retrieved == 0) {
+ Thread.sleep(10);
+ } else {
+ Thread.sleep(5);
+ }
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ schedulersDone.countDown();
+ }
+ });
+ }
+ startLatch.countDown();
+ assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should complete within 30 seconds");
+ assertTrue(schedulersDone.await(30, TimeUnit.SECONDS), "Schedulers should complete within 30 seconds");
+ if (errors.get() > 0) {
+ Exception exception = firstException.get();
+ if (exception != null) {
+ throw new AssertionError("Concurrent getScheduledMessages test failed", exception);
+ }
+ }
+ assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations");
+
+ // Verify that most or all messages were retrieved
+ assertEquals(totalMessagesRetrieved.get(), 500, "All messages should be retrieved");
+
+ log.info("Total messages retrieved: {} out of {}", totalMessagesRetrieved.get(), totalMessages);
+ }
+
+ /**
+ * Test concurrent clear() operations with read operations.
+ * This verifies that clear() properly coordinates with ongoing read operations.
+ * Messages are added beforehand, then clear() is tested with concurrent reads.
+ */
+ @Test
+ public void testConcurrentClearWithReads() throws Exception {
+ final int initialMessages = 1000;
+ final int numReadThreads = 10;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicReference firstException = new AtomicReference<>();
+ final AtomicBoolean clearCompleted = new AtomicBoolean(false);
+ // Add initial messages (single thread)
+ for (int i = 0; i < initialMessages; i++) {
+ tracker.addMessage(i, i, System.currentTimeMillis() + 60000);
+ }
+ // Start read threads that will run during clear operation
+ for (int i = 0; i < numReadThreads; i++) {
+ final int threadId = i;
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ while (!clearCompleted.get()) {
+ switch (threadId % 3) {
+ case 0:
+ tracker.containsMessage(threadId, threadId);
+ break;
+ case 1:
+ try {
+ tracker.nextDeliveryTime();
+ } catch (IllegalArgumentException e) {
+ // Expected when no messages available
+ }
+ break;
+ case 2:
+ tracker.getNumberOfDelayedMessages();
+ break;
+ }
+ Thread.sleep(1);
+ }
+ // Continue reading for a bit after clear
+ for (int j = 0; j < 100; j++) {
+ tracker.containsMessage(j, j);
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ readersDone.countDown();
+ }
+ });
+ }
+ // Start clear operation after a short delay
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ Thread.sleep(100);
+ tracker.clear().get(30, TimeUnit.SECONDS);
+ clearCompleted.set(true);
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ clearCompleted.set(true);
+ }
+ });
+ startLatch.countDown();
+ assertTrue(readersDone.await(60, TimeUnit.SECONDS), "Readers should complete within 60 seconds");
+ if (errors.get() > 0) {
+ Exception exception = firstException.get();
+ if (exception != null) {
+ throw new AssertionError("Concurrent clear test failed", exception);
+ }
+ }
+ assertEquals(errors.get(), 0, "No exceptions should occur during concurrent clear operations");
+ assertEquals(tracker.getNumberOfDelayedMessages(), 0, "All messages should be cleared");
+ }
+
+ /**
+ * Test concurrent close() operations.
+ * This verifies that close() properly handles concurrent access and shuts down cleanly.
+ * Messages are added beforehand to test close() behavior with existing data.
+ */
+ @Test
+ public void testConcurrentClose() throws Exception {
+ final int numReadThreads = 8;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicReference firstException = new AtomicReference<>();
+ final AtomicBoolean closeInitiated = new AtomicBoolean(false);
+ // Add some messages first (single thread)
+ for (int i = 0; i < 100; i++) {
+ tracker.addMessage(i, i, System.currentTimeMillis() + 60000);
+ }
+ // Start read threads that will be interrupted by close
+ for (int i = 0; i < numReadThreads; i++) {
+ final int threadId = i;
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ while (!closeInitiated.get()) {
+ try {
+ switch (threadId % 4) {
+ case 0:
+ tracker.containsMessage(threadId, threadId);
+ break;
+ case 1:
+ tracker.nextDeliveryTime();
+ break;
+ case 2:
+ tracker.getNumberOfDelayedMessages();
+ break;
+ case 3:
+ tracker.getScheduledMessages(10);
+ break;
+ }
+ } catch (IllegalArgumentException e) {
+ // Expected for some operations when tracker is being closed
+ }
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ // Some exceptions may be expected during close
+ if (!closeInitiated.get()) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ }
+ } finally {
+ readersDone.countDown();
+ }
+ });
+ }
+ // Start close operation after a short delay
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ Thread.sleep(100);
+ closeInitiated.set(true);
+ tracker.close();
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ }
+ });
+ startLatch.countDown();
+ assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should complete within 30 seconds");
+ if (errors.get() > 0) {
+ Exception exception = firstException.get();
+ if (exception != null) {
+ log.warn("Exception during concurrent close test (may be expected): " + exception.getMessage());
+ }
+ }
+ // Create a new tracker for the next test since this one is closed
+ tracker = new BucketDelayedDeliveryTracker(
+ dispatcher, timer, 1000, Clock.systemUTC(), true, storage,
+ 100, 1000, 100, 10
+ );
+ }
+
+ /**
+ * Test mixed read operations with sequential addMessage and concurrent getScheduledMessages.
+ * This tests the ReentrantReadWriteLock behavior when read and write operations are mixed.
+ * addMessage is executed in single thread, while reads and getScheduledMessages are concurrent.
+ * Ensures all deliverable messages are retrieved before test completion.
+ */
+ @Test
+ public void testMixedReadWriteOperationsDeadlockDetection() throws Exception {
+ final int numReadThreads = 16;
+ final int numScheduleThreads = 4;
+ final int totalMessages = 2000;
+ final int readsPerThread = 500;
+
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+ final CountDownLatch schedulersDone = new CountDownLatch(numScheduleThreads);
+ final CountDownLatch writerDone = new CountDownLatch(1);
+ final AtomicBoolean deadlockDetected = new AtomicBoolean(false);
+ final AtomicInteger completedOperations = new AtomicInteger(0);
+ final AtomicReference firstException = new AtomicReference<>();
+ final AtomicInteger messagesAdded = new AtomicInteger(0);
+ final AtomicInteger deliverableMessagesCount = new AtomicInteger(0);
+ final AtomicInteger totalMessagesRetrieved = new AtomicInteger(0);
+ // Single writer thread for addMessage (sequential execution)
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ final long baseTime = System.currentTimeMillis();
+
+ for (int i = 0; i < totalMessages; i++) {
+ try {
+ long ledgerId = 10000 + i;
+ long entryId = i % 1000;
+
+ // Create mix of messages: some ready for delivery, some delayed
+ long deliverAt;
+ if (i % 3 == 0) {
+ // Messages that will be ready for delivery after a short delay
+ deliverAt = baseTime + 500;
+ deliverableMessagesCount.incrementAndGet();
+ } else {
+ // Messages for future delivery (much later)
+ deliverAt = baseTime + 30000;
+ }
+
+ boolean added = tracker.addMessage(ledgerId, entryId, deliverAt);
+ if (added) {
+ messagesAdded.incrementAndGet();
+ }
+ completedOperations.incrementAndGet();
+
+ if (i % 200 == 0) {
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ deadlockDetected.set(true);
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ writerDone.countDown();
+ }
+ });
+ // Start read threads (using read locks)
+ for (int i = 0; i < numReadThreads; i++) {
+ final int threadId = i;
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ // Continue reading until writer is done, plus some extra operations
+ int operationCount = 0;
+ while ((writerDone.getCount() > 0 || operationCount < readsPerThread)) {
+ try {
+ switch (threadId % 3) {
+ case 0:
+ long ledgerId = 10000 + (operationCount % (totalMessages + 100));
+ long entryId = operationCount % 1000;
+ tracker.containsMessage(ledgerId, entryId);
+ break;
+ case 1:
+ tracker.nextDeliveryTime();
+ break;
+ case 2:
+ tracker.getNumberOfDelayedMessages();
+ break;
+ }
+ completedOperations.incrementAndGet();
+ operationCount++;
+ if (operationCount % 100 == 0) {
+ Thread.sleep(1);
+ }
+ } catch (IllegalArgumentException e) {
+ // Expected for some operations
+ completedOperations.incrementAndGet();
+ operationCount++;
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ break;
+ }
+ if (writerDone.getCount() == 0 && operationCount >= readsPerThread) {
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ deadlockDetected.set(true);
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ readersDone.countDown();
+ }
+ });
+ }
+ // Start getScheduledMessages threads (using write locks)
+ for (int i = 0; i < numScheduleThreads; i++) {
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+
+ // Wait for writer to finish and messages to become deliverable
+ writerDone.await();
+ Thread.sleep(1000); // Wait 1 second for messages to become ready for delivery
+
+ final long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+
+ // Continue until all deliverable messages are retrieved or the deadline is reached. Empty returns
+ // are valid while a bucket snapshot or snapshot segment load is still completing.
+ while (totalMessagesRetrieved.get() < deliverableMessagesCount.get()
+ && System.nanoTime() < deadlineNanos) {
+ try {
+ NavigableSet messages = tracker.getScheduledMessages(50);
+ int retrieved = messages.size();
+ totalMessagesRetrieved.addAndGet(retrieved);
+ completedOperations.incrementAndGet();
+
+ if (retrieved == 0) {
+ Thread.sleep(5); // Short wait for more messages
+ } else {
+ Thread.sleep(2); // Short processing delay
+ }
+
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ deadlockDetected.set(true);
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ schedulersDone.countDown();
+ }
+ });
+ }
+ // Start all threads
+ startLatch.countDown();
+
+ // Wait for completion with reasonable timeout to detect deadlocks
+ boolean writerCompleted = writerDone.await(10, TimeUnit.SECONDS);
+ boolean readersCompleted = readersDone.await(15, TimeUnit.SECONDS);
+ boolean schedulersCompleted = schedulersDone.await(20, TimeUnit.SECONDS);
+ if (!writerCompleted || !readersCompleted || !schedulersCompleted) {
+ deadlockDetected.set(true);
+ log.error("Test timed out - potential deadlock detected. Writer: {}, Readers: {}, Schedulers: {}",
+ writerCompleted, readersCompleted, schedulersCompleted);
+ }
+ if (deadlockDetected.get()) {
+ Exception e = firstException.get();
+ if (e != null) {
+ throw new AssertionError("Deadlock or exception detected during mixed operations test", e);
+ } else {
+ throw new AssertionError("Deadlock detected - test did not complete within timeout");
+ }
+ }
+
+ // Verify operations completed successfully
+ assertTrue(completedOperations.get() > 0, "Some operations should complete");
+ assertTrue(messagesAdded.get() > 0, "Some messages should have been added");
+ assertTrue(deliverableMessagesCount.get() > 0, "Some messages should be deliverable");
+
+ // Verify that all deliverable messages were retrieved
+ assertEquals(totalMessagesRetrieved.get(), deliverableMessagesCount.get(),
+ "All deliverable messages should be retrieved");
+ log.info("Mixed operations test completed successfully. Total operations: {}, Messages added: {}, "
+ + "Deliverable messages: {}, Retrieved messages: {}",
+ completedOperations.get(), messagesAdded.get(),
+ deliverableMessagesCount.get(), totalMessagesRetrieved.get());
+ }
+}