diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 8fbf1cfda0d2b..a9b2fcc4e01b5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.common.protocol.Commands.DEFAULT_MAX_MESSAGE_SIZE; import java.nio.charset.StandardCharsets; import java.time.Clock; import java.util.Arrays; @@ -91,6 +92,36 @@ public class ManagedLedgerConfig { @Getter @Setter private boolean cacheEvictionByExpectedReadCount = true; + + /** + * Enable batch read API when reading entries from bookkeeper. + * Batch read allows reading multiple entries in a single RPC call, reducing network overhead. + * Note: Batch read is only effective when ensembleSize equals writeQuorumSize (non-striped ledgers). + */ + @Setter + private boolean batchReadEnabled = false; + + /** + * Max size in bytes for per-batch read request. If set to 0 or negative, + * uses the netty max frame size (default 5MB). + * Batch read may return fewer entries if total size exceeds this limit. + */ + @Getter + @Setter + private int batchReadMaxSizeBytes = DEFAULT_MAX_MESSAGE_SIZE; + + /** + * Returns whether batch read is enabled for this managed ledger. + * Batch read is only enabled when both conditions are met: + * 1. batchReadEnabled is set to true + * 2. ensembleSize equals writeQuorumSize (non-striped ledger) + * + * @return true if batch read should be used + */ + public boolean isBatchReadEnabled() { + return ensembleSize == writeQuorumSize && batchReadEnabled; + } + @Getter private long continueCachingAddedEntriesAfterLastActiveCursorLeavesMillis; private int minimumBacklogCursorsForCaching = 0; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java new file mode 100644 index 0000000000000..3df6f1c639fa4 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import io.netty.util.Recycler; +import java.util.Iterator; +import java.util.List; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; + +public class CompositeLedgerEntriesImpl implements LedgerEntries { + private List entries; + private List ledgerEntries; + private final Recycler.Handle recyclerHandle; + + private CompositeLedgerEntriesImpl(Recycler.Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private static final Recycler RECYCLER = new Recycler<>() { + @Override + protected CompositeLedgerEntriesImpl newObject(Recycler.Handle handle) { + return new CompositeLedgerEntriesImpl(handle); + } + }; + + public static LedgerEntries create(List entries, List ledgerEntries) { + checkArgument(!entries.isEmpty(), "entries for create should not be empty."); + checkArgument(!ledgerEntries.isEmpty(), "ledgerEntries for create should not be empty."); + CompositeLedgerEntriesImpl instance = RECYCLER.get(); + instance.entries = entries; + instance.ledgerEntries = ledgerEntries; + return instance; + } + + private void recycle() { + if (ledgerEntries == null) { + return; + } + ledgerEntries.forEach(LedgerEntries::close); + entries = null; + ledgerEntries = null; + recyclerHandle.recycle(this); + } + + @Override + public LedgerEntry getEntry(long entryId) { + checkNotNull(entries, "entries has been recycled"); + long firstId = entries.get(0).getEntryId(); + long lastId = entries.get(entries.size() - 1).getEntryId(); + if (entryId < firstId || entryId > lastId) { + throw new IndexOutOfBoundsException("required index: " + entryId + + " is out of bounds: [ " + firstId + ", " + lastId + " ]."); + } + int index = (int) (entryId - firstId); + LedgerEntry entry = entries.get(index); + if (entry.getEntryId() != entryId) { + throw new IllegalStateException("Non-contiguous entries detected: expected entryId " + + entryId + " at index " + index + " but found entryId " + entry.getEntryId()); + } + return entry; + } + + @Override + public Iterator iterator() { + checkNotNull(entries, "entries has been recycled"); + return entries.iterator(); + } + + @Override + public void close() { + recycle(); + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index b5a45415a4fe1..99b3c7c14b62b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; @@ -70,8 +71,11 @@ public void clear() { @Override public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync( - ledgerEntries -> { + ManagedLedgerConfig config = ml.getConfig(); + boolean isBatchReadEnabled = config.isBatchReadEnabled(); + int batchReadMaxBytes = config.getBatchReadMaxSizeBytes(); + ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, isBatchReadEnabled, batchReadMaxBytes) + .thenAcceptAsync(ledgerEntries -> { List entries = new ArrayList<>(); long totalSize = 0; try { @@ -99,8 +103,12 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSu @Override public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync( - (ledgerEntries, exception) -> { + ManagedLedgerConfig config = ml.getConfig(); + boolean isBatchReadEnabled = config.isBatchReadEnabled(); + int batchReadMaxBytes = config.getBatchReadMaxSizeBytes(); + ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), + isBatchReadEnabled, batchReadMaxBytes) + .whenCompleteAsync((ledgerEntries, exception) -> { if (exception != null) { ml.invalidateLedgerHandle(lh); callback.readEntryFailed(createManagedLedgerException(exception), ctx); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 2fb20efb0bd03..85532c125cf5b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -535,7 +535,9 @@ CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, l private CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, boolean allowRetry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; - CompletableFuture> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry) + ManagedLedgerConfig mlConfig = ml.getConfig(); + CompletableFuture> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, + mlConfig.isBatchReadEnabled(), mlConfig.getBatchReadMaxSizeBytes()) .thenApply( ledgerEntries -> { requireNonNull(ml.getName()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index 5cf5f053f0ce7..8d689ef5292ab 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -18,8 +18,11 @@ */ package org.apache.bookkeeper.mledger.impl.cache; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -28,6 +31,11 @@ class ReadEntryUtils { static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, long lastEntry) { + return readAsync(ml, handle, firstEntry, lastEntry, false, 0); + } + + static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, + long lastEntry, boolean batchReadEnabled, int batchReadMaxSize) { if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) { // The read handle comes from another managed ledger, in this case, we can only compare the entry range with // the LAC of that read handle. Specifically, it happens when this method is called by a @@ -49,6 +57,66 @@ static CompletableFuture readAsync(ManagedLedger ml, ReadHandle h return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is " + lastConfirmedEntry + " when reading entry " + lastEntry)); } + + int numberOfEntries = (int) (lastEntry - firstEntry + 1); + + // Use batch read for multiple entries when enabled. + if (batchReadEnabled && numberOfEntries > 1 && batchReadMaxSize > 0) { + return batchReadUnconfirmed(handle, firstEntry, numberOfEntries, batchReadMaxSize); + } return handle.readUnconfirmedAsync(firstEntry, lastEntry); } + + private static CompletableFuture batchReadUnconfirmed( + ReadHandle handle, long firstEntry, int maxCount, int maxSize) { + CompletableFuture future = new CompletableFuture<>(); + List receivedEntries = new ArrayList<>(maxCount); + List ledgerEntries = new ArrayList<>(4); + doBatchRead(handle, firstEntry, maxCount, maxSize, receivedEntries, ledgerEntries, future); + return future; + } + + private static void doBatchRead(ReadHandle handle, long firstEntry, int maxCount, int maxSize, + List receivedEntries, List ledgerEntries, + CompletableFuture future) { + handle.batchReadUnconfirmedAsync(firstEntry, maxCount - receivedEntries.size(), maxSize) + .whenComplete((entries, throwable) -> { + if (throwable != null) { + onBatchReadComplete(handle, firstEntry, maxCount, receivedEntries, ledgerEntries, future, + throwable); + return; + } + long lastReceivedEntry = -1; + int prevReceivedCount = receivedEntries.size(); + for (LedgerEntry entry : entries) { + receivedEntries.add(entry); + lastReceivedEntry = entry.getEntryId(); + } + ledgerEntries.add(entries); + if (receivedEntries.size() >= maxCount || prevReceivedCount == receivedEntries.size()) { + onBatchReadComplete(handle, firstEntry, maxCount, receivedEntries, ledgerEntries, future, null); + return; + } + doBatchRead(handle, lastReceivedEntry + 1, maxCount, maxSize, + receivedEntries, ledgerEntries, future); + }); + } + + private static void onBatchReadComplete(ReadHandle handle, long firstEntry, int maxCount, + List receivedEntries, List ledgerEntries, + CompletableFuture future, Throwable error) { + if (error != null) { + ledgerEntries.forEach(LedgerEntries::close); + future.completeExceptionally(error); + return; + } + if (receivedEntries.isEmpty()) { + ledgerEntries.forEach(LedgerEntries::close); + future.completeExceptionally(new ManagedLedgerException( + "Batch read returned no entries for ledger " + handle.getId() + + " starting from entry " + firstEntry)); + return; + } + future.complete(CompositeLedgerEntriesImpl.create(receivedEntries, ledgerEntries)); + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java new file mode 100644 index 0000000000000..3bad19c04e5d9 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.testng.annotations.Test; + +public class CompositeLedgerEntriesImplTest { + + private LedgerEntryImpl createEntry(long ledgerId, long entryId, byte[] data) { + return LedgerEntryImpl.create(ledgerId, entryId, data.length, Unpooled.wrappedBuffer(data)); + } + + @Test + public void testCreateAndIterate() { + LedgerEntryImpl e0 = createEntry(1L, 0L, new byte[]{0}); + LedgerEntryImpl e1 = createEntry(1L, 1L, new byte[]{1}); + LedgerEntryImpl e2 = createEntry(1L, 2L, new byte[]{2}); + + List entries = new ArrayList<>(); + entries.add(e0); + entries.add(e1); + entries.add(e2); + + List containers = new ArrayList<>(); + // Wrap in a simple LedgerEntries mock-like container using a real impl + // For test, we use a list-based approach + containers.add(new MockLedgerEntries(entries)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + entries, containers); + + // Verify iteration + Iterator it = combined.iterator(); + assertThat(it.hasNext()).isTrue(); + assertThat(it.next().getEntryId()).isEqualTo(0L); + assertThat(it.next().getEntryId()).isEqualTo(1L); + assertThat(it.next().getEntryId()).isEqualTo(2L); + assertThat(it.hasNext()).isFalse(); + + combined.close(); + } + + @Test + public void testGetEntry() { + LedgerEntryImpl e0 = createEntry(1L, 5L, new byte[]{0}); + LedgerEntryImpl e1 = createEntry(1L, 6L, new byte[]{1}); + LedgerEntryImpl e2 = createEntry(1L, 7L, new byte[]{2}); + + List entries = new ArrayList<>(); + entries.add(e0); + entries.add(e1); + entries.add(e2); + + List containers = new ArrayList<>(); + containers.add(new MockLedgerEntries(entries)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + entries, containers); + + assertThat(combined.getEntry(5L).getEntryId()).isEqualTo(5L); + assertThat(combined.getEntry(6L).getEntryId()).isEqualTo(6L); + assertThat(combined.getEntry(7L).getEntryId()).isEqualTo(7L); + + combined.close(); + } + + @Test + public void testGetEntryOutOfRange() { + LedgerEntryImpl e0 = createEntry(1L, 0L, new byte[]{0}); + LedgerEntryImpl e1 = createEntry(1L, 1L, new byte[]{1}); + + List entries = new ArrayList<>(); + entries.add(e0); + entries.add(e1); + + List containers = new ArrayList<>(); + containers.add(new MockLedgerEntries(entries)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + entries, containers); + + // Out of lower bound + assertThatThrownBy(() -> combined.getEntry(-1L)) + .isInstanceOf(IndexOutOfBoundsException.class); + // Out of upper bound + assertThatThrownBy(() -> combined.getEntry(2L)) + .isInstanceOf(IndexOutOfBoundsException.class); + + combined.close(); + } + + @Test + public void testCloseAndRecycle() { + LedgerEntryImpl e0 = createEntry(1L, 0L, new byte[]{0}); + + List entries = new ArrayList<>(); + entries.add(e0); + + List containers = new ArrayList<>(); + containers.add(new MockLedgerEntries(entries)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + entries, containers); + + // Should be usable before close + assertThat(combined.iterator().hasNext()).isTrue(); + + combined.close(); + + // After close, iterator should throw + assertThatThrownBy(combined::iterator) + .isInstanceOf(NullPointerException.class); + } + + @Test + public void testMultipleLedgerEntriesContainers() { + // Simulate entries from 2 separate batch reads + LedgerEntryImpl e0 = createEntry(1L, 0L, new byte[]{0}); + LedgerEntryImpl e1 = createEntry(1L, 1L, new byte[]{1}); + LedgerEntryImpl e2 = createEntry(1L, 2L, new byte[]{2}); + LedgerEntryImpl e3 = createEntry(1L, 3L, new byte[]{3}); + + List batch1 = new ArrayList<>(); + batch1.add(e0); + batch1.add(e1); + + List batch2 = new ArrayList<>(); + batch2.add(e2); + batch2.add(e3); + + List allEntries = new ArrayList<>(); + allEntries.addAll(batch1); + allEntries.addAll(batch2); + + List containers = new ArrayList<>(); + containers.add(new MockLedgerEntries(batch1)); + containers.add(new MockLedgerEntries(batch2)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + allEntries, containers); + + // Verify all entries accessible + assertThat(combined.getEntry(0L).getEntryId()).isEqualTo(0L); + assertThat(combined.getEntry(1L).getEntryId()).isEqualTo(1L); + assertThat(combined.getEntry(2L).getEntryId()).isEqualTo(2L); + assertThat(combined.getEntry(3L).getEntryId()).isEqualTo(3L); + + combined.close(); + } + + @Test + public void testGetEntryWithNonContiguousEntries() { + // Create entries with a gap: IDs 5, 6, 8 (missing 7) + LedgerEntryImpl e5 = createEntry(1L, 5L, new byte[]{0}); + LedgerEntryImpl e6 = createEntry(1L, 6L, new byte[]{1}); + LedgerEntryImpl e8 = createEntry(1L, 8L, new byte[]{2}); + + List entries = new ArrayList<>(); + entries.add(e5); + entries.add(e6); + entries.add(e8); + + List containers = new ArrayList<>(); + containers.add(new MockLedgerEntries(entries)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + entries, containers); + + // Valid entries should still work + assertThat(combined.getEntry(5L).getEntryId()).isEqualTo(5L); + assertThat(combined.getEntry(6L).getEntryId()).isEqualTo(6L); + // Entry 7 computes index 2 (7-5=2), but entries.get(2) has ID 8, not 7 — should throw + assertThatThrownBy(() -> combined.getEntry(7L)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Non-contiguous"); + + combined.close(); + } + + /** + * Simple LedgerEntries implementation for testing. + */ + private static class MockLedgerEntries implements LedgerEntries { + private final List entries; + private boolean closed = false; + + MockLedgerEntries(List entries) { + this.entries = entries; + } + + @Override + public LedgerEntry getEntry(long entryId) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + return entries.iterator(); + } + + @Override + public void close() { + closed = true; + entries.forEach(LedgerEntry::close); + } + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java index 42ae262e8ddb8..2984635b772a2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java @@ -24,23 +24,29 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.netty.buffer.Unpooled; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntSupplier; +import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -193,6 +199,10 @@ public void testReadFromStorageRetriesWhenHandleClosed() { ManagedLedgerFactoryMBeanImpl mlFactoryMBean = mock(ManagedLedgerFactoryMBeanImpl.class); when(mockEntryCacheManager.getMlFactoryMBean()).thenReturn(mlFactoryMBean); ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class); + ManagedLedgerConfig conf = mock(ManagedLedgerConfig.class); + when(conf.isBatchReadEnabled()).thenReturn(false); + when(conf.getBatchReadMaxSizeBytes()).thenReturn(0); + when(mockManagedLedger.getConfig()).thenReturn(conf); ManagedLedgerMBeanImpl mockManagedLedgerMBean = mock(ManagedLedgerMBeanImpl.class); when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean); when(mockManagedLedger.getName()).thenReturn("testManagedLedger"); @@ -239,6 +249,86 @@ public void testReadFromStorageRetriesWhenHandleClosed() { assertThat(readAttempts.get()).isEqualTo(2); } + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testReadFromStorageWithBatchReadEnabled() { + RangeEntryCacheManagerImpl mockEntryCacheManager = mock(RangeEntryCacheManagerImpl.class); + ManagedLedgerFactoryMBeanImpl mlFactoryMBean = mock(ManagedLedgerFactoryMBeanImpl.class); + when(mockEntryCacheManager.getMlFactoryMBean()).thenReturn(mlFactoryMBean); + + ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class); + ManagedLedgerConfig conf = mock(ManagedLedgerConfig.class); + when(conf.isBatchReadEnabled()).thenReturn(true); + when(conf.getBatchReadMaxSizeBytes()).thenReturn(1024 * 1024); + when(mockManagedLedger.getConfig()).thenReturn(conf); + + ManagedLedgerMBeanImpl mockManagedLedgerMBean = mock(ManagedLedgerMBeanImpl.class); + when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean); + when(mockManagedLedger.getName()).thenReturn("testManagedLedger"); + when(mockManagedLedger.getExecutor()).thenReturn(mock(java.util.concurrent.ExecutorService.class)); + Position lastConfirmedEntry = PositionFactory.create(1L, 99L); + when(mockManagedLedger.getLastConfirmedEntry()).thenReturn(lastConfirmedEntry); + when(mockManagedLedger.getOptionalLedgerInfo(1L)).thenReturn((Optional) Optional.of(new Object())); + + RangeCacheRemovalQueue mockRangeCacheRemovalQueue = mock(RangeCacheRemovalQueue.class); + when(mockRangeCacheRemovalQueue.addEntry(any())).thenReturn(true); + InflightReadsLimiter inflightReadsLimiter = mock(InflightReadsLimiter.class); + when(mockEntryCacheManager.getInflightReadsLimiter()).thenReturn(inflightReadsLimiter); + doAnswer(invocation -> { + long permits = invocation.getArgument(0); + InflightReadsLimiter.Handle handle = + new InflightReadsLimiter.Handle(permits, System.currentTimeMillis(), true); + return Optional.of(handle); + }).when(inflightReadsLimiter).acquire(anyLong(), any()); + + RangeEntryCacheImpl cache = new RangeEntryCacheImpl(mockEntryCacheManager, mockManagedLedger, false, + mockRangeCacheRemovalQueue, EntryLengthFunction.DEFAULT, mock(PendingReadsManager.class)); + + // Use LedgerHandle mock so batch read path is taken + LedgerHandle ledgerHandle = mock(LedgerHandle.class); + when(ledgerHandle.getId()).thenReturn(1L); + + // Create test entries for batch read + List entryList = new ArrayList<>(); + for (long i = 0; i <= 4; i++) { + entryList.add(LedgerEntryImpl.create(1L, i, 1, Unpooled.wrappedBuffer(new byte[]{(byte) i}))); + } + LedgerEntries batchEntries = new LedgerEntries() { + @Override + public LedgerEntry getEntry(long entryId) { + for (LedgerEntry e : entryList) { + if (e.getEntryId() == entryId) { + return e; + } + } + throw new IndexOutOfBoundsException("Entry " + entryId + " not found"); + } + + @Override + public Iterator iterator() { + return entryList.iterator(); + } + + @Override + public void close() { + entryList.forEach(LedgerEntry::close); + } + }; + when(ledgerHandle.batchReadUnconfirmedAsync(eq(0L), eq(5), anyLong())) + .thenReturn(CompletableFuture.completedFuture(batchEntries)); + + CompletableFuture> future = cache.readFromStorage(ledgerHandle, 0L, 4L, () -> 1); + + assertThat(future).isCompleted(); + List entries = future.getNow(null); + assertThat(entries).hasSize(5); + for (int i = 0; i < 5; i++) { + assertThat(entries.get(i).getEntryId()).isEqualTo(i); + } + // Verify batch read was used, not readUnconfirmedAsync + verify(ledgerHandle, never()).readUnconfirmedAsync(anyLong(), anyLong()); + } + private void performReadAndValidateResult() { CompletableFuture> future = new CompletableFuture<>(); rangeEntryCache.asyncReadEntry(lh, 0, 99, expectedReadCount, new AsyncCallbacks.ReadEntriesCallback() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java new file mode 100644 index 0000000000000..a9e67db63ca19 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class ReadEntryUtilsTest { + + private ManagedLedger ml; + private LedgerHandle lh; + + @BeforeMethod + @SuppressWarnings({"unchecked", "rawtypes"}) + public void setup() { + ml = mock(ManagedLedger.class); + lh = mock(LedgerHandle.class); + when(lh.getId()).thenReturn(1L); + Position lastConfirmedEntry = PositionFactory.create(1L, 99L); + when(ml.getLastConfirmedEntry()).thenReturn(lastConfirmedEntry); + when(ml.getOptionalLedgerInfo(1L)).thenReturn((Optional) Optional.of(new Object())); + } + + @Test + public void testBatchReadSingleBatch() { + LedgerEntries entries = createLedgerEntries(1L, 0, 1, 2, 3, 4); + when(lh.batchReadUnconfirmedAsync(eq(0L), eq(5), eq(1024L))) + .thenReturn(CompletableFuture.completedFuture(entries)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompleted(); + try (LedgerEntries result = future.getNow(null)) { + List entryIds = new ArrayList<>(); + for (LedgerEntry e : result) { + entryIds.add(e.getEntryId()); + } + assertThat(entryIds).containsExactly(0L, 1L, 2L, 3L, 4L); + } + + verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); + } + + @Test + public void testBatchReadMultipleBatches() { + LedgerEntries firstBatch = createLedgerEntries(1L, 0, 1, 2); + when(lh.batchReadUnconfirmedAsync(eq(0L), eq(5), eq(1024L))) + .thenReturn(CompletableFuture.completedFuture(firstBatch)); + LedgerEntries secondBatch = createLedgerEntries(1L, 3, 4); + when(lh.batchReadUnconfirmedAsync(eq(3L), eq(2), eq(1024L))) + .thenReturn(CompletableFuture.completedFuture(secondBatch)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompleted(); + LedgerEntries result = future.getNow(null); + try { + List entryIds = new ArrayList<>(); + for (LedgerEntry e : result) { + entryIds.add(e.getEntryId()); + } + assertThat(entryIds).containsExactly(0L, 1L, 2L, 3L, 4L); + } finally { + result.close(); + } + } + + @Test + public void testBatchReadReturnsEmptyEntries() { + LedgerEntries emptyEntries = wrapLedgerEntries(new ArrayList<>()); + when(lh.batchReadUnconfirmedAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(emptyEntries)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompletedExceptionally(); + assertThatThrownBy(future::get) + .hasCauseInstanceOf(ManagedLedgerException.class); + verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); + } + + @Test + public void testBatchReadFailure() { + CompletableFuture failedFuture = + CompletableFuture.failedFuture(new BKException.BKBookieHandleNotAvailableException()); + when(lh.batchReadUnconfirmedAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(failedFuture); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompletedExceptionally(); + assertThatThrownBy(future::get) + .hasCauseInstanceOf(BKException.class); + verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); + } + + @Test + public void testBatchReadDisabledFallback() { + LedgerEntries mockEntries = createLedgerEntries(1L, 0, 1, 2, 3, 4); + when(lh.readUnconfirmedAsync(0L, 4L)) + .thenReturn(CompletableFuture.completedFuture(mockEntries)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, false, 1024); + + assertThat(future).isCompleted(); + verify(lh).readUnconfirmedAsync(0L, 4L); + verify(lh, never()).batchReadUnconfirmedAsync(anyLong(), anyInt(), anyLong()); + + future.getNow(null).close(); + } + + @Test + public void testBatchReadSingleEntryFallback() { + LedgerEntries mockEntries = createLedgerEntries(1L, 0); + when(lh.readUnconfirmedAsync(0L, 0L)) + .thenReturn(CompletableFuture.completedFuture(mockEntries)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 0L, true, 1024); + + assertThat(future).isCompleted(); + verify(lh).readUnconfirmedAsync(0L, 0L); + verify(lh, never()).batchReadUnconfirmedAsync(anyLong(), anyInt(), anyLong()); + + future.getNow(null).close(); + } + + @Test + public void testBatchReadWithNonLedgerHandle() { + ReadHandle rh = mock(ReadHandle.class); + when(rh.getId()).thenReturn(1L); + LedgerEntries mockEntries = createLedgerEntries(1L, 0, 1, 2); + // ReadHandle.batchReadUnconfirmedAsync is a default method that delegates to + // readUnconfirmedAsync for non-LedgerHandle implementations + when(rh.batchReadUnconfirmedAsync(eq(0L), eq(3), eq(1024L))) + .thenReturn(CompletableFuture.completedFuture(mockEntries)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, rh, 0L, 2L, true, 1024); + + assertThat(future).isCompleted(); + try (LedgerEntries result = future.getNow(null)) { + List entryIds = new ArrayList<>(); + for (LedgerEntry e : result) { + entryIds.add(e.getEntryId()); + } + assertThat(entryIds).containsExactly(0L, 1L, 2L); + } + } + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testReadOnlyManagedLedgerFallback() { + when(ml.getOptionalLedgerInfo(1L)).thenReturn((Optional) Optional.empty()); + + ReadHandle rh = mock(ReadHandle.class); + when(rh.getId()).thenReturn(1L); + LedgerEntries mockEntries = createLedgerEntries(1L, 0, 1); + when(rh.readAsync(0L, 1L)).thenReturn(CompletableFuture.completedFuture(mockEntries)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, rh, 0L, 1L, true, 1024); + + assertThat(future).isCompleted(); + verify(rh).readAsync(0L, 1L); + + future.getNow(null).close(); + } + + @Test + public void testAutoRefillWithSizeLimitedReturns() { + // First batch returns only entries 0-1 (size-limited) + LedgerEntries firstBatch = createLedgerEntries(1L, 0, 1); + when(lh.batchReadUnconfirmedAsync(eq(0L), eq(5), eq(1024L))) + .thenReturn(CompletableFuture.completedFuture(firstBatch)); + // Second batch returns entries 2-4 to complete the read + LedgerEntries secondBatch = createLedgerEntries(1L, 2, 3, 4); + when(lh.batchReadUnconfirmedAsync(eq(2L), eq(3), eq(1024L))) + .thenReturn(CompletableFuture.completedFuture(secondBatch)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompleted(); + LedgerEntries result = future.getNow(null); + try { + List entryIds = new ArrayList<>(); + for (LedgerEntry e : result) { + entryIds.add(e.getEntryId()); + } + assertThat(entryIds).containsExactly(0L, 1L, 2L, 3L, 4L); + } finally { + result.close(); + } + } + + @Test + public void testBatchReadFailureWithPartialDataDoesNotFallback() { + // First batch succeeds with entries 0-2 + LedgerEntries firstBatch = createLedgerEntries(1L, 0, 1, 2); + when(lh.batchReadUnconfirmedAsync(eq(0L), eq(5), eq(1024L))) + .thenReturn(CompletableFuture.completedFuture(firstBatch)); + // Second batch fails + CompletableFuture failedFuture = + CompletableFuture.failedFuture(new BKException.BKBookieHandleNotAvailableException()); + when(lh.batchReadUnconfirmedAsync(eq(3L), eq(2), eq(1024L))) + .thenReturn(failedFuture); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompletedExceptionally(); + verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); + } + + @Test + public void testBatchReadMidBatchFailurePreservesOriginalException() { + // First batch succeeds with entries 0-2 + LedgerEntries firstBatch = createLedgerEntries(1L, 0, 1, 2); + when(lh.batchReadUnconfirmedAsync(eq(0L), eq(5), eq(1024L))) + .thenReturn(CompletableFuture.completedFuture(firstBatch)); + // Second batch fails + CompletableFuture failedFuture = + CompletableFuture.failedFuture(new BKException.BKBookieHandleNotAvailableException()); + when(lh.batchReadUnconfirmedAsync(eq(3L), eq(2), eq(1024L))) + .thenReturn(failedFuture); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompletedExceptionally(); + assertThatThrownBy(future::get) + .hasCauseInstanceOf(BKException.class); + } + + // --- helpers --- + + private static LedgerEntries createLedgerEntries(long ledgerId, long... entryIds) { + List entries = new ArrayList<>(); + for (long entryId : entryIds) { + entries.add(LedgerEntryImpl.create(ledgerId, entryId, 1, + Unpooled.wrappedBuffer(new byte[]{(byte) entryId}))); + } + return wrapLedgerEntries(entries); + } + + private static LedgerEntries wrapLedgerEntries(List entries) { + return new LedgerEntries() { + @Override + public LedgerEntry getEntry(long eid) { + for (LedgerEntry e : entries) { + if (e.getEntryId() == eid) { + return e; + } + } + throw new IndexOutOfBoundsException("Entry " + eid + " not found"); + } + + @Override + public Iterator iterator() { + return entries.iterator(); + } + + @Override + public void close() { + entries.forEach(LedgerEntry::close); + } + }; + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index ae4ec5777de3e..df8b399aa0ca3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2424,6 +2424,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed = true; + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "Enable batch read API when reading entries from bookkeeper. " + + "Batch read allows reading multiple entries in a single RPC call, " + + "reducing network overhead for sequential reads.") + private boolean managedLedgerBatchReadEnabled = false; + @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'" + " and thus should be set as inactive.\n" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d180048b1b316..2ae6613df1bf1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2252,6 +2252,9 @@ public CompletableFuture getManagedLedgerConfig(@NonNull To serviceConfig.isCacheEvictionByMarkDeletedPosition()); managedLedgerConfig.setCacheEvictionByExpectedReadCount(false); } + managedLedgerConfig.setBatchReadEnabled( + serviceConfig.isBookkeeperUseV2WireProtocol() && serviceConfig.isManagedLedgerBatchReadEnabled()); + managedLedgerConfig.setBatchReadMaxSizeBytes(serviceConfig.getMaxMessageSize()); managedLedgerConfig.setMinimumBacklogCursorsForCaching( serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching()); managedLedgerConfig.setMinimumBacklogEntriesForCaching( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index d5cbb9ebece3d..fffeb27bb7253 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -605,6 +605,7 @@ protected ServiceConfiguration getDefaultConf() { configuration.setNumExecutorThreadPoolSize(5); configuration.setBrokerMaxConnections(0); configuration.setBrokerMaxConnectionsPerIp(0); + configuration.setManagedLedgerBatchReadEnabled(true); return configuration; } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 0e1e75248e48e..0d026bf29e402 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -79,6 +79,7 @@ public class PulsarMockBookKeeper extends BookKeeper { final ScheduledExecutorService scheduler; private volatile long defaultAddEntryDelayMillis = 1L; private volatile long defaultReadEntriesDelayMillis = 1L; + private final EnsemblePlacementPolicy defaultPlacementPolicy = new DefaultEnsemblePlacementPolicy(); @Override public ClientConfiguration getConf() { @@ -112,6 +113,13 @@ public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception { scheduler = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("mock-bk-scheduler")); } + public EnsemblePlacementPolicy getPlacementPolicy() { + if (placementPolicy == null) { + return defaultPlacementPolicy; + } + return placementPolicy; + } + @Override public OrderedExecutor getMainWorkerPool() { return orderedExecutor; diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 2b11b12b4284b..f9381690cb458 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -279,6 +279,16 @@ public CompletableFuture readLastAddConfirmedAndEntryAsyn return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + return readHandle.batchReadAsync(startEntry, maxCount, maxSize); + } + + @Override + public CompletableFuture batchReadUnconfirmedAsync(long startEntry, int maxCount, long maxSize) { + return readHandle.batchReadUnconfirmedAsync(startEntry, maxCount, maxSize); + } + private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd, Map customMetadata) { List ensemble = new ArrayList<>(PulsarMockBookKeeper.getMockEnsemble()); diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index 97dd0c3d1ad1d..42150b2137cdc 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -83,6 +83,33 @@ public CompletableFuture readUnconfirmedAsync(long firstEntry, lo return readAsync(firstEntry, lastEntry); } + @Override + public CompletableFuture batchReadAsync(long firstEntry, int maxCount, long maxSize) { + long lastEntryByCount = Math.min(firstEntry + maxCount - 1, getLastAddConfirmed()); + if (lastEntryByCount < firstEntry) { + return readAsync(firstEntry, firstEntry - 1); + } + long accumulatedSize = 0; + long lastEntry = firstEntry - 1; + for (long eid = firstEntry; eid <= lastEntryByCount; eid++) { + long entrySize = entries.get((int) eid).getLength(); + if (accumulatedSize > 0 && accumulatedSize + entrySize > maxSize) { + break; + } + accumulatedSize += entrySize; + lastEntry = eid; + } + if (lastEntry < firstEntry) { + lastEntry = firstEntry; + } + return readAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture batchReadUnconfirmedAsync(long startEntry, int maxCount, long maxSize) { + return batchReadAsync(startEntry, maxCount, maxSize); + } + @Override public CompletableFuture readLastAddConfirmedAsync() { return CompletableFuture.completedFuture(getLastAddConfirmed());