diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index b1d4e8cecbd14..00c2a0e868220 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -66,6 +66,9 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack // Count of delayed messages in the tracker. private final AtomicLong delayedMessagesCount = new AtomicLong(0); + // Cached memory usage of the delayed message bitmaps, maintained via delta on each mutation. + private final AtomicLong memoryUsage = new AtomicLong(0); + InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict, @@ -142,7 +145,9 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { boolean isNew = !bitmap.contains(entryId); if (isNew) { + long oldSize = bitmap.getLongSizeInBytes(); bitmap.add(entryId); + memoryUsage.addAndGet(bitmap.getLongSizeInBytes() - oldSize); delayedMessagesCount.incrementAndGet(); } @@ -205,13 +210,16 @@ public NavigableSet getScheduledMessages(int maxMessages) { }); n -= cardinality; delayedMessagesCount.addAndGet(-cardinality); + memoryUsage.addAndGet(-entryIds.getLongSizeInBytes()); ledgerIdToDelete.add(ledgerId); } else { + long oldSize = entryIds.getLongSizeInBytes(); long[] entryIdsArray = entryIds.toArray(); for (int i = 0; i < n; i++) { positions.add(PositionFactory.create(ledgerId, entryIdsArray[i])); entryIds.removeLong(entryIdsArray[i]); } + memoryUsage.addAndGet(entryIds.getLongSizeInBytes() - oldSize); delayedMessagesCount.addAndGet(-n); n = 0; } @@ -248,6 +256,7 @@ public NavigableSet getScheduledMessages(int maxMessages) { public CompletableFuture clear() { this.delayedMessageMap.clear(); this.delayedMessagesCount.set(0); + this.memoryUsage.set(0); return CompletableFuture.completedFuture(null); } @@ -264,9 +273,7 @@ public long getNumberOfDelayedMessages() { */ @Override public long getBufferMemoryUsage() { - return delayedMessageMap.values().stream().mapToLong( - ledgerMap -> ledgerMap.values().stream().mapToLong( - Roaring64Bitmap::getLongSizeInBytes).sum()).sum(); + return memoryUsage.get(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 19c774852c9cb..15e7d339417ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; -import com.google.common.collect.TreeRangeMap; import io.github.merlimat.slog.Logger; import io.netty.util.Timeout; import io.netty.util.Timer; @@ -107,7 +106,7 @@ public static record SnapshotKey(long ledgerId, long entryId) {} @Getter @VisibleForTesting - private final RangeMap immutableBuckets; + private final ImmutableBucketIndex immutableBuckets; private final ConcurrentHashMap snapshotSegmentLastIndexMap; @@ -154,7 +153,7 @@ public BucketDelayedDeliveryTracker(DelayedDeliveryContext context, this.maxIndexesPerBucketSnapshotSegment = maxIndexesPerBucketSnapshotSegment; this.maxNumBuckets = maxNumBuckets; this.sharedBucketPriorityQueue = new TripleLongPriorityQueue(); - this.immutableBuckets = TreeRangeMap.create(); + this.immutableBuckets = new ImmutableBucketIndex(); this.snapshotSegmentLastIndexMap = new ConcurrentHashMap<>(); this.lastMutableBucket = new MutableBucket(context.getName(), context.getCursor(), FutureUtil.Sequencer.create(), @@ -241,7 +240,7 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT for (Map.Entry, ImmutableBucket> mapEntry : toBeDeletedBucketMap.entrySet()) { Range key = mapEntry.getKey(); ImmutableBucket immutableBucket = mapEntry.getValue(); - immutableBucketMap.remove(key); + immutableBuckets.remove(key); // delete asynchronously without waiting for completion immutableBucket.asyncDeleteBucketSnapshot(stats); } @@ -251,6 +250,8 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT numberDelayedMessages.add(bucket.numberBucketDelayedMessages); }); + immutableBuckets.recomputeCounters(); + log.info() .attr("buckets", immutableBucketMap.size()) .attr("numberDelayedMessages", numberDelayedMessages.longValue()) @@ -341,7 +342,8 @@ private void afterCreateImmutableBucket(Pair immu CompletableFuture future = createFuture.handle((bucketId, ex) -> { if (ex == null) { immutableBucket.setSnapshotSegments(null); - immutableBucket.asyncUpdateSnapshotLength(); + immutableBucket.asyncUpdateSnapshotLength() + .thenRun(() -> immutableBuckets.recomputeCounters()); log.info() .attr("bucketKey", immutableBucket.bucketKey()) .log("Create bucket snapshot finish, bucketKey"); @@ -371,7 +373,7 @@ private void afterCreateImmutableBucket(Pair immu }); immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId); - immutableBuckets.asMapOfRanges().remove( + immutableBuckets.remove( Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId)); snapshotSegmentLastIndexMap.remove( new SnapshotKey(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId())); @@ -409,7 +411,7 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime); lastMutableBucket.resetLastMutableBucketRange(); - if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > maxNumBuckets) { + if (maxNumBuckets > 0 && immutableBuckets.count() > maxNumBuckets) { asyncMergeBucketSnapshot(); } } @@ -507,7 +509,7 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot() { } else { log.info() .attr("bucketKeys", bucketsStr) - .attr("bucketNum", immutableBuckets.asMapOfRanges().size()) + .attr("bucketNum", immutableBuckets.count()) .log("Merge bucket snapshot finish"); stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge, @@ -577,8 +579,7 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot(List getScheduledMessages(int maxMessages) SnapshotKey snapshotKey = new SnapshotKey(ledgerId, entryId); ImmutableBucket bucket = snapshotSegmentLastIndexMap.get(snapshotKey); - if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { + if (bucket != null && immutableBuckets.containsValue(bucket)) { // All message of current snapshot segment are scheduled, try load next snapshot segment if (bucket.merging) { log.info() @@ -674,8 +675,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessages) synchronized (BucketDelayedDeliveryTracker.this) { this.snapshotSegmentLastIndexMap.remove(snapshotKey); if (CollectionUtils.isEmpty(indexList)) { - immutableBuckets.asMapOfRanges() - .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); + immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); bucket.asyncDeleteBucketSnapshot(stats); return; } @@ -807,13 +807,9 @@ public synchronized boolean containsMessage(long ledgerId, long entryId) { } public Map genTopicMetricMap() { - stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1); + stats.recordNumOfBuckets((int) (immutableBuckets.count() + 1)); stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size()); - MutableLong totalSnapshotLength = new MutableLong(); - immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> { - totalSnapshotLength.add(immutableBucket.getSnapshotLength()); - }); - stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue()); + stats.recordBucketSnapshotSizeBytes(immutableBuckets.totalSnapshotLength()); return stats.genTopicMetricMap(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index e76b225b9d158..de43f096f6da9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -130,9 +130,9 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b .log("Failed to get bucket snapshot segment"); } }), BucketSnapshotPersistenceException.class, MaxRetryTimes) - .thenApply(bucketSnapshotSegments -> { + .thenCompose(bucketSnapshotSegments -> { if (CollectionUtils.isEmpty(bucketSnapshotSegments)) { - return Collections.emptyList(); + return CompletableFuture.completedFuture(Collections.emptyList()); } SnapshotSegment snapshotSegment = @@ -140,9 +140,10 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b List indexList = snapshotSegment.getIndexesList(); this.setCurrentSegmentEntryId(nextSegmentEntryId); if (isRecover) { - this.asyncUpdateSnapshotLength(); + return this.asyncUpdateSnapshotLength() + .thenApply(__ -> indexList); } - return indexList; + return CompletableFuture.completedFuture(indexList); }); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucketIndex.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucketIndex.java new file mode 100644 index 0000000000000..074fd7faa8f55 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucketIndex.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Index of {@link ImmutableBucket}s keyed by ledger-ID {@link Range}. + * + *

The underlying {@link TreeRangeMap} is not thread-safe. All mutations must be + * performed under an external lock (for example {@code synchronized(tracker)}). + * + *

To support lock-free stats reads, this class maintains cached counters for: + *

    + *
  • number of buckets ({@link #count()})
  • + *
  • total snapshot length ({@link #totalSnapshotLength()})
  • + *
+ * + *

Note that {@link TreeRangeMap#put} automatically removes any existing entries + * whose ranges overlap with the inserted range. Because these removals are implicit, + * counter updates cannot be derived solely from the caller's operation. Therefore + * {@link #put(Range, ImmutableBucket)} and {@link #recomputeCounters()} rebuild the + * counters from the actual map state. + */ +class ImmutableBucketIndex { + + private final TreeRangeMap map = TreeRangeMap.create(); + + private final AtomicLong count = new AtomicLong(); + private final AtomicLong totalSnapshotLength = new AtomicLong(); + + /** + * Adds a bucket to the index. + * + *

If the supplied range overlaps existing entries, {@link TreeRangeMap} + * removes them automatically. Counters are therefore recomputed from the + * resulting map state. + */ + void put(Range range, ImmutableBucket bucket) { + map.put(range, bucket); + recomputeCounters(); + } + + /** + * Removes the bucket associated with the given range. + * + *

The supplied range is expected to exactly match an existing entry. + */ + void remove(Range range) { + ImmutableBucket removed = map.asMapOfRanges().remove(range); + if (removed != null) { + count.decrementAndGet(); + totalSnapshotLength.addAndGet(-removed.getSnapshotLength()); + } + } + + long count() { + return count.get(); + } + + long totalSnapshotLength() { + return totalSnapshotLength.get(); + } + + ImmutableBucket get(long key) { + return map.get(key); + } + + RangeMap subRangeMap(Range range) { + return map.subRangeMap(range); + } + + Map, ImmutableBucket> asMapOfRanges() { + return map.asMapOfRanges(); + } + + boolean containsValue(ImmutableBucket bucket) { + return map.asMapOfRanges().containsValue(bucket); + } + + /** + * Rebuilds cached counters from the current map state. + * + *

This method is useful after bulk updates or whenever the map state is + * considered the source of truth. + */ + void recomputeCounters() { + Map, ImmutableBucket> ranges = map.asMapOfRanges(); + + count.set(ranges.size()); + + long snapshotLength = 0; + for (ImmutableBucket bucket : ranges.values()) { + snapshotLength += bucket.getSnapshotLength(); + } + totalSnapshotLength.set(snapshotLength); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 9bcf9153572a9..7e7b45a207ba2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -94,7 +94,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis protected final MessageRedeliveryController redeliveryMessages; protected final RedeliveryTracker redeliveryTracker; - private Optional delayedDeliveryTracker = Optional.empty(); + private volatile Optional delayedDeliveryTracker = Optional.empty(); protected volatile boolean havePendingRead = false; protected volatile boolean havePendingReplayRead = false; @@ -1372,13 +1372,12 @@ protected boolean isNormalReadAllowed() { } - - protected synchronized boolean shouldPauseDeliveryForDelayTracker() { - return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries(); + protected boolean shouldPauseDeliveryForDelayTracker() { + return delayedDeliveryTracker.map(DelayedDeliveryTracker::shouldPauseAllDeliveries).orElse(false); } @Override - public synchronized long getNumberOfDelayedMessages() { + public long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); } @@ -1464,20 +1463,15 @@ public PersistentTopic getTopic() { } - public synchronized long getDelayedTrackerMemoryUsage() { + public long getDelayedTrackerMemoryUsage() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L); } - public synchronized Map getBucketDelayedIndexStats() { - if (delayedDeliveryTracker.isEmpty()) { - return Collections.emptyMap(); - } - - if (delayedDeliveryTracker.get() instanceof BucketDelayedDeliveryTracker) { - return ((BucketDelayedDeliveryTracker) delayedDeliveryTracker.get()).genTopicMetricMap(); - } - - return Collections.emptyMap(); + public Map getBucketDelayedIndexStats() { + return delayedDeliveryTracker + .filter(BucketDelayedDeliveryTracker.class::isInstance) + .map(tracker -> ((BucketDelayedDeliveryTracker) tracker).genTopicMetricMap()) + .orElse(Collections.emptyMap()); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 276f8c038a67c..7016250e12d66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -96,7 +96,7 @@ public class PersistentDispatcherMultipleConsumersClassic extends AbstractPersis protected final MessageRedeliveryController redeliveryMessages; protected final RedeliveryTracker redeliveryTracker; - private Optional delayedDeliveryTracker = Optional.empty(); + private volatile Optional delayedDeliveryTracker = Optional.empty(); protected volatile boolean havePendingRead = false; protected volatile boolean havePendingReplayRead = false; @@ -1207,12 +1207,12 @@ protected boolean hasConsumersNeededNormalRead() { return true; } - protected synchronized boolean shouldPauseDeliveryForDelayTracker() { - return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries(); + protected boolean shouldPauseDeliveryForDelayTracker() { + return delayedDeliveryTracker.map(DelayedDeliveryTracker::shouldPauseAllDeliveries).orElse(false); } @Override - public synchronized long getNumberOfDelayedMessages() { + public long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); } @@ -1291,20 +1291,15 @@ public PersistentTopic getTopic() { } - public synchronized long getDelayedTrackerMemoryUsage() { + public long getDelayedTrackerMemoryUsage() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L); } - public synchronized Map getBucketDelayedIndexStats() { - if (delayedDeliveryTracker.isEmpty()) { - return Collections.emptyMap(); - } - - if (delayedDeliveryTracker.get() instanceof BucketDelayedDeliveryTracker) { - return ((BucketDelayedDeliveryTracker) delayedDeliveryTracker.get()).genTopicMetricMap(); - } - - return Collections.emptyMap(); + public Map getBucketDelayedIndexStats() { + return delayedDeliveryTracker + .filter(BucketDelayedDeliveryTracker.class::isInstance) + .map(tracker -> ((BucketDelayedDeliveryTracker) tracker).genTopicMetricMap()) + .orElse(Collections.emptyMap()); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucketIndexTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucketIndexTest.java new file mode 100644 index 0000000000000..e8b654bb8e81a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucketIndexTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.google.common.collect.Range; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class ImmutableBucketIndexTest { + + private ImmutableBucketIndex index; + + @BeforeMethod + public void setUp() { + index = new ImmutableBucketIndex(); + } + + private ImmutableBucket mockBucket(long snapshotLength) { + ImmutableBucket bucket = mock(ImmutableBucket.class); + when(bucket.getSnapshotLength()).thenReturn(snapshotLength); + return bucket; + } + + @Test + public void testPutAndCount() { + assertEquals(index.count(), 0); + assertEquals(index.totalSnapshotLength(), 0); + + index.put(Range.closed(1L, 10L), mockBucket(100)); + index.put(Range.closed(11L, 20L), mockBucket(200)); + index.put(Range.closed(21L, 30L), mockBucket(300)); + + assertEquals(index.count(), 3); + assertEquals(index.totalSnapshotLength(), 600); + } + + @Test + public void testRemove() { + index.put(Range.closed(1L, 10L), mockBucket(100)); + index.put(Range.closed(11L, 20L), mockBucket(200)); + + index.remove(Range.closed(1L, 10L)); + + assertEquals(index.count(), 1); + assertEquals(index.totalSnapshotLength(), 200); + + index.remove(Range.closed(11L, 20L)); + + assertEquals(index.count(), 0); + assertEquals(index.totalSnapshotLength(), 0); + } + + @Test + public void testRemoveNonExistent() { + index.put(Range.closed(1L, 10L), mockBucket(100)); + + index.remove(Range.closed(99L, 100L)); + + assertEquals(index.count(), 1); + assertEquals(index.totalSnapshotLength(), 100); + } + + @Test + public void testPutSameRangeReplacesExisting() { + index.put(Range.closed(1L, 10L), mockBucket(100)); + index.put(Range.closed(1L, 10L), mockBucket(200)); + + assertEquals(index.count(), 1); + assertEquals(index.totalSnapshotLength(), 200); + } + + @Test + public void testPutOverlappingSplitsExisting() { + index.put(Range.closed(1L, 10L), mockBucket(100)); + index.put(Range.closed(5L, 15L), mockBucket(250)); + + assertEquals(index.count(), 2); + assertEquals(index.totalSnapshotLength(), 350); + } + + @Test + public void testPutOverlappingCreatesFragments() { + index.put(Range.closed(1L, 100L), mockBucket(100)); + index.put(Range.closed(20L, 30L), mockBucket(200)); + + assertEquals(index.count(), 3); + assertEquals(index.totalSnapshotLength(), 400); + } + + @Test + public void testPutEnclosingReplacesMultiple() { + index.put(Range.closed(1L, 10L), mockBucket(100)); + index.put(Range.closed(11L, 20L), mockBucket(200)); + + index.put(Range.closed(1L, 20L), mockBucket(500)); + + assertEquals(index.count(), 1); + assertEquals(index.totalSnapshotLength(), 500); + } + + @Test + public void testRecomputeCounters() { + index.put(Range.closed(1L, 10L), mockBucket(100)); + index.put(Range.closed(11L, 20L), mockBucket(200)); + + index.recomputeCounters(); + + assertEquals(index.count(), 2); + assertEquals(index.totalSnapshotLength(), 300); + + index.remove(Range.closed(1L, 10L)); + + index.recomputeCounters(); + + assertEquals(index.count(), 1); + assertEquals(index.totalSnapshotLength(), 200); + } + + @Test + public void testGet() { + ImmutableBucket bucket = mockBucket(100); + + index.put(Range.closed(1L, 10L), bucket); + + assertEquals(index.get(1L), bucket); + assertEquals(index.get(5L), bucket); + assertEquals(index.get(10L), bucket); + + assertNull(index.get(0L)); + assertNull(index.get(11L)); + } + + @Test + public void testContainsValue() { + ImmutableBucket bucket = mockBucket(100); + + index.put(Range.closed(1L, 10L), bucket); + + assertTrue(index.containsValue(bucket)); + assertFalse(index.containsValue(mockBucket(999))); + } + + @Test + public void testSubRangeMap() { + ImmutableBucket b1 = mockBucket(100); + ImmutableBucket b2 = mockBucket(200); + + index.put(Range.closed(1L, 10L), b1); + index.put(Range.closed(11L, 20L), b2); + + var subRangeMap = index.subRangeMap(Range.closed(5L, 15L)); + + assertEquals(subRangeMap.get(5L), b1); + assertEquals(subRangeMap.get(15L), b2); + assertEquals(subRangeMap.asMapOfRanges().size(), 2); + } +} \ No newline at end of file