Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack
// Count of delayed messages in the tracker.
private final AtomicLong delayedMessagesCount = new AtomicLong(0);

// Cached memory usage of the delayed message bitmaps, maintained via delta on each mutation.
private final AtomicLong memoryUsage = new AtomicLong(0);

InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
Expand Down Expand Up @@ -142,7 +145,9 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
boolean isNew = !bitmap.contains(entryId);

if (isNew) {
long oldSize = bitmap.getLongSizeInBytes();
bitmap.add(entryId);
memoryUsage.addAndGet(bitmap.getLongSizeInBytes() - oldSize);
delayedMessagesCount.incrementAndGet();
}

Expand Down Expand Up @@ -205,13 +210,16 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
});
n -= cardinality;
delayedMessagesCount.addAndGet(-cardinality);
memoryUsage.addAndGet(-entryIds.getLongSizeInBytes());
ledgerIdToDelete.add(ledgerId);
} else {
long oldSize = entryIds.getLongSizeInBytes();
long[] entryIdsArray = entryIds.toArray();
for (int i = 0; i < n; i++) {
positions.add(PositionFactory.create(ledgerId, entryIdsArray[i]));
entryIds.removeLong(entryIdsArray[i]);
}
memoryUsage.addAndGet(entryIds.getLongSizeInBytes() - oldSize);
delayedMessagesCount.addAndGet(-n);
n = 0;
}
Expand Down Expand Up @@ -248,6 +256,7 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
public CompletableFuture<Void> clear() {
this.delayedMessageMap.clear();
this.delayedMessagesCount.set(0);
this.memoryUsage.set(0);
return CompletableFuture.completedFuture(null);
}

Expand All @@ -264,9 +273,7 @@ public long getNumberOfDelayedMessages() {
*/
@Override
public long getBufferMemoryUsage() {
return delayedMessageMap.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongSizeInBytes).sum()).sum();
return memoryUsage.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import io.github.merlimat.slog.Logger;
import io.netty.util.Timeout;
import io.netty.util.Timer;
Expand Down Expand Up @@ -107,7 +106,7 @@ public static record SnapshotKey(long ledgerId, long entryId) {}

@Getter
@VisibleForTesting
private final RangeMap<Long, ImmutableBucket> immutableBuckets;
private final ImmutableBucketIndex immutableBuckets;

private final ConcurrentHashMap<SnapshotKey, ImmutableBucket> snapshotSegmentLastIndexMap;

Expand Down Expand Up @@ -154,7 +153,7 @@ public BucketDelayedDeliveryTracker(DelayedDeliveryContext context,
this.maxIndexesPerBucketSnapshotSegment = maxIndexesPerBucketSnapshotSegment;
this.maxNumBuckets = maxNumBuckets;
this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
this.immutableBuckets = TreeRangeMap.create();
this.immutableBuckets = new ImmutableBucketIndex();
this.snapshotSegmentLastIndexMap = new ConcurrentHashMap<>();
this.lastMutableBucket =
new MutableBucket(context.getName(), context.getCursor(), FutureUtil.Sequencer.create(),
Expand Down Expand Up @@ -241,7 +240,7 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT
for (Map.Entry<Range<Long>, ImmutableBucket> mapEntry : toBeDeletedBucketMap.entrySet()) {
Range<Long> key = mapEntry.getKey();
ImmutableBucket immutableBucket = mapEntry.getValue();
immutableBucketMap.remove(key);
immutableBuckets.remove(key);
// delete asynchronously without waiting for completion
immutableBucket.asyncDeleteBucketSnapshot(stats);
}
Expand All @@ -251,6 +250,8 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT
numberDelayedMessages.add(bucket.numberBucketDelayedMessages);
});

immutableBuckets.recomputeCounters();

log.info()
.attr("buckets", immutableBucketMap.size())
.attr("numberDelayedMessages", numberDelayedMessages.longValue())
Expand Down Expand Up @@ -341,7 +342,8 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
CompletableFuture<Long> future = createFuture.handle((bucketId, ex) -> {
if (ex == null) {
immutableBucket.setSnapshotSegments(null);
immutableBucket.asyncUpdateSnapshotLength();
immutableBucket.asyncUpdateSnapshotLength()
.thenRun(() -> immutableBuckets.recomputeCounters());
log.info()
.attr("bucketKey", immutableBucket.bucketKey())
.log("Create bucket snapshot finish, bucketKey");
Expand Down Expand Up @@ -371,7 +373,7 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
});

immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId);
immutableBuckets.asMapOfRanges().remove(
immutableBuckets.remove(
Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId));
snapshotSegmentLastIndexMap.remove(
new SnapshotKey(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId()));
Expand Down Expand Up @@ -409,7 +411,7 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
lastMutableBucket.resetLastMutableBucketRange();

if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
if (maxNumBuckets > 0 && immutableBuckets.count() > maxNumBuckets) {
asyncMergeBucketSnapshot();
}
}
Expand Down Expand Up @@ -507,7 +509,7 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
} else {
log.info()
.attr("bucketKeys", bucketsStr)
.attr("bucketNum", immutableBuckets.asMapOfRanges().size())
.attr("bucketNum", immutableBuckets.count())
.log("Merge bucket snapshot finish");

stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge,
Expand Down Expand Up @@ -577,8 +579,7 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<Immut
});

for (ImmutableBucket bucket : buckets) {
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
}
}
});
Expand Down Expand Up @@ -644,7 +645,7 @@ public synchronized NavigableSet<Position> getScheduledMessages(int maxMessages)
SnapshotKey snapshotKey = new SnapshotKey(ledgerId, entryId);

ImmutableBucket bucket = snapshotSegmentLastIndexMap.get(snapshotKey);
if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
if (bucket != null && immutableBuckets.containsValue(bucket)) {
// All message of current snapshot segment are scheduled, try load next snapshot segment
if (bucket.merging) {
log.info()
Expand Down Expand Up @@ -674,8 +675,7 @@ public synchronized NavigableSet<Position> getScheduledMessages(int maxMessages)
synchronized (BucketDelayedDeliveryTracker.this) {
this.snapshotSegmentLastIndexMap.remove(snapshotKey);
if (CollectionUtils.isEmpty(indexList)) {
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
bucket.asyncDeleteBucketSnapshot(stats);
return;
}
Expand Down Expand Up @@ -807,13 +807,9 @@ public synchronized boolean containsMessage(long ledgerId, long entryId) {
}

public Map<String, TopicMetricBean> genTopicMetricMap() {
stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1);
stats.recordNumOfBuckets((int) (immutableBuckets.count() + 1));
stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size());
MutableLong totalSnapshotLength = new MutableLong();
immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> {
totalSnapshotLength.add(immutableBucket.getSnapshotLength());
});
stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
stats.recordBucketSnapshotSizeBytes(immutableBuckets.totalSnapshotLength());
return stats.genTopicMetricMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,20 @@ private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(b
.log("Failed to get bucket snapshot segment");
}
}), BucketSnapshotPersistenceException.class, MaxRetryTimes)
.thenApply(bucketSnapshotSegments -> {
.thenCompose(bucketSnapshotSegments -> {
if (CollectionUtils.isEmpty(bucketSnapshotSegments)) {
return Collections.emptyList();
return CompletableFuture.completedFuture(Collections.emptyList());
}

SnapshotSegment snapshotSegment =
bucketSnapshotSegments.get(0);
List<DelayedIndex> indexList = snapshotSegment.getIndexesList();
this.setCurrentSegmentEntryId(nextSegmentEntryId);
if (isRecover) {
this.asyncUpdateSnapshotLength();
return this.asyncUpdateSnapshotLength()
.thenApply(__ -> indexList);
}
return indexList;
return CompletableFuture.completedFuture(indexList);
});
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed.bucket;

import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Index of {@link ImmutableBucket}s keyed by ledger-ID {@link Range}.
*
* <p>The underlying {@link TreeRangeMap} is not thread-safe. All mutations must be
* performed under an external lock (for example {@code synchronized(tracker)}).
*
* <p>To support lock-free stats reads, this class maintains cached counters for:
* <ul>
* <li>number of buckets ({@link #count()})</li>
* <li>total snapshot length ({@link #totalSnapshotLength()})</li>
* </ul>
*
* <p>Note that {@link TreeRangeMap#put} automatically removes any existing entries
* whose ranges overlap with the inserted range. Because these removals are implicit,
* counter updates cannot be derived solely from the caller's operation. Therefore
* {@link #put(Range, ImmutableBucket)} and {@link #recomputeCounters()} rebuild the
* counters from the actual map state.
*/
class ImmutableBucketIndex {

private final TreeRangeMap<Long, ImmutableBucket> map = TreeRangeMap.create();

private final AtomicLong count = new AtomicLong();
private final AtomicLong totalSnapshotLength = new AtomicLong();

/**
* Adds a bucket to the index.
*
* <p>If the supplied range overlaps existing entries, {@link TreeRangeMap}
* removes them automatically. Counters are therefore recomputed from the
* resulting map state.
*/
void put(Range<Long> range, ImmutableBucket bucket) {
map.put(range, bucket);
recomputeCounters();
}

/**
* Removes the bucket associated with the given range.
*
* <p>The supplied range is expected to exactly match an existing entry.
*/
void remove(Range<Long> range) {
ImmutableBucket removed = map.asMapOfRanges().remove(range);
if (removed != null) {
count.decrementAndGet();
totalSnapshotLength.addAndGet(-removed.getSnapshotLength());
}
}

long count() {
return count.get();
}

long totalSnapshotLength() {
return totalSnapshotLength.get();
}

ImmutableBucket get(long key) {
return map.get(key);
}

RangeMap<Long, ImmutableBucket> subRangeMap(Range<Long> range) {
return map.subRangeMap(range);
}

Map<Range<Long>, ImmutableBucket> asMapOfRanges() {
return map.asMapOfRanges();
}

boolean containsValue(ImmutableBucket bucket) {
return map.asMapOfRanges().containsValue(bucket);
}

/**
* Rebuilds cached counters from the current map state.
*
* <p>This method is useful after bulk updates or whenever the map state is
* considered the source of truth.
*/
void recomputeCounters() {
Map<Range<Long>, ImmutableBucket> ranges = map.asMapOfRanges();

count.set(ranges.size());

long snapshotLength = 0;
for (ImmutableBucket bucket : ranges.values()) {
snapshotLength += bucket.getSnapshotLength();
}
totalSnapshotLength.set(snapshotLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis
protected final MessageRedeliveryController redeliveryMessages;
protected final RedeliveryTracker redeliveryTracker;

private Optional<DelayedDeliveryTracker> delayedDeliveryTracker = Optional.empty();
private volatile Optional<DelayedDeliveryTracker> delayedDeliveryTracker = Optional.empty();

protected volatile boolean havePendingRead = false;
protected volatile boolean havePendingReplayRead = false;
Expand Down Expand Up @@ -1372,13 +1372,12 @@ protected boolean isNormalReadAllowed() {
}



protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries();
protected boolean shouldPauseDeliveryForDelayTracker() {
return delayedDeliveryTracker.map(DelayedDeliveryTracker::shouldPauseAllDeliveries).orElse(false);
}

@Override
public synchronized long getNumberOfDelayedMessages() {
public long getNumberOfDelayedMessages() {
Comment thread
nodece marked this conversation as resolved.
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
}

Expand Down Expand Up @@ -1464,20 +1463,15 @@ public PersistentTopic getTopic() {
}


public synchronized long getDelayedTrackerMemoryUsage() {
public long getDelayedTrackerMemoryUsage() {
Comment on lines -1467 to +1466

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementation of getBufferMemoryUsage isn't thread safe in BucketDelayedDeliveryTracker or in InMemoryDelayedDeliveryTracker.

One particular detail about RoaringBitmaps is that it's not thread safe for even plain concurrent reads and data races aren't the only problem. Reads can mutate RoaringBitmap internal state. I learned that quite recently and there are other bugs in Pulsar impacted by this too since StampedLock/ReadWriteLocks aren't safe with RoaringBitmap methods. There has been changes in the project to prevent internal datastructure corruption caused by concurrent reads, but I don't think that it has been fully resolved. I checked one of the recent changes and it looks invalid although it was merged to the project. Making a single method synchronized is useless if reads or writes happen outside of the same object monitor.
Thread safety of RoaringBitmap is documented here: https://github.com/RoaringBitmap/RoaringBitmap?tab=readme-ov-file#thread-safety (there is no commitment for thread safety).

Since BucketDelayedDeliveryTracker heavily uses RoaringBitmaps, I think that we need a different solution to avoid blocking when stats are requested. One possibility would be to have a background thread refresh stats after there has been activity since the last operation. The instances would have to be immutable for this to work properly.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reported #25991 about the RoaringBitmap thread-safety issue across the code base. The solution would be to use ReadWriteLock and ensure that read lock is only used for methods that never mutate internal state. It's requires internal knowledge of the RoaringBitmap implementation to do this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari Fixed

return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
}

public synchronized Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
if (delayedDeliveryTracker.isEmpty()) {
return Collections.emptyMap();
}

if (delayedDeliveryTracker.get() instanceof BucketDelayedDeliveryTracker) {
return ((BucketDelayedDeliveryTracker) delayedDeliveryTracker.get()).genTopicMetricMap();
}

return Collections.emptyMap();
public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
return delayedDeliveryTracker
.filter(BucketDelayedDeliveryTracker.class::isInstance)
.map(tracker -> ((BucketDelayedDeliveryTracker) tracker).genTopicMetricMap())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BucketDelayedDeliveryTracker.genTopicMetricMap is not thread safe. It should be made thread safe.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari Fixed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I think this still needs serialization for correctness. This method is now reachable from an unsynchronized stats path, but it is not a pure read: it updates shared stats, calls stats.genTopicMetricMap() which drains counters with sumThenReset() / StatsBuckets.refresh(), and also reads sharedBucketPriorityQueue.size() / lastMutableBucket.size() from non-thread-safe queues. Concurrent stats scrapes can therefore race with each other and export incorrect samples. The minimal fix would be to keep this collection serialized, e.g. make genTopicMetricMap() synchronized; if we want it lock-free, the method needs to become a pure snapshot read from cached/atomic values.

.orElse(Collections.emptyMap());
}

@Override
Expand Down
Loading
Loading