Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_MAX_MESSAGE_SIZE;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.Arrays;
Expand Down Expand Up @@ -91,6 +92,36 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private boolean cacheEvictionByExpectedReadCount = true;

/**
* Enable batch read API when reading entries from bookkeeper.
* Batch read allows reading multiple entries in a single RPC call, reducing network overhead.
* Note: Batch read is only effective when ensembleSize equals writeQuorumSize (non-striped ledgers).
*/
@Setter
private boolean batchReadEnabled = false;

/**
* Max size in bytes for per-batch read request. If set to 0 or negative,
* uses the netty max frame size (default 5MB).
* Batch read may return fewer entries if total size exceeds this limit.
*/
@Getter
@Setter
private int batchReadMaxSizeBytes = DEFAULT_MAX_MESSAGE_SIZE;

/**
* Returns whether batch read is enabled for this managed ledger.
* Batch read is only enabled when both conditions are met:
* 1. batchReadEnabled is set to true
* 2. ensembleSize equals writeQuorumSize (non-striped ledger)
*
* @return true if batch read should be used
*/
public boolean isBatchReadEnabled() {
return ensembleSize == writeQuorumSize && batchReadEnabled;
}

@Getter
private long continueCachingAddedEntriesAfterLastActiveCursorLeavesMillis;
private int minimumBacklogCursorsForCaching = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import io.netty.util.Recycler;
import java.util.Iterator;
import java.util.List;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;

public class CompositeLedgerEntriesImpl implements LedgerEntries {
private List<LedgerEntry> entries;
private List<LedgerEntries> ledgerEntries;
private final Recycler.Handle<CompositeLedgerEntriesImpl> recyclerHandle;

private CompositeLedgerEntriesImpl(Recycler.Handle<CompositeLedgerEntriesImpl> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<CompositeLedgerEntriesImpl> RECYCLER = new Recycler<>() {
@Override
protected CompositeLedgerEntriesImpl newObject(Recycler.Handle<CompositeLedgerEntriesImpl> handle) {
return new CompositeLedgerEntriesImpl(handle);
}
};

public static LedgerEntries create(List<LedgerEntry> entries, List<LedgerEntries> ledgerEntries) {
checkArgument(!entries.isEmpty(), "entries for create should not be empty.");
checkArgument(!ledgerEntries.isEmpty(), "ledgerEntries for create should not be empty.");
CompositeLedgerEntriesImpl instance = RECYCLER.get();
instance.entries = entries;
instance.ledgerEntries = ledgerEntries;
return instance;
}

private void recycle() {
if (ledgerEntries == null) {
return;
}
ledgerEntries.forEach(LedgerEntries::close);
entries = null;
ledgerEntries = null;
recyclerHandle.recycle(this);
}

@Override
public LedgerEntry getEntry(long entryId) {
checkNotNull(entries, "entries has been recycled");
long firstId = entries.get(0).getEntryId();
long lastId = entries.get(entries.size() - 1).getEntryId();
if (entryId < firstId || entryId > lastId) {
throw new IndexOutOfBoundsException("required index: " + entryId
+ " is out of bounds: [ " + firstId + ", " + lastId + " ].");
}
int index = (int) (entryId - firstId);
LedgerEntry entry = entries.get(index);
if (entry.getEntryId() != entryId) {
throw new IllegalStateException("Non-contiguous entries detected: expected entryId "
+ entryId + " at index " + index + " but found entryId " + entry.getEntryId());
}
return entry;
}

@Override
public Iterator<LedgerEntry> iterator() {
checkNotNull(entries, "entries has been recycled");
return entries.iterator();
}

@Override
public void close() {
recycle();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
Expand Down Expand Up @@ -70,8 +71,11 @@ public void clear() {
@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount,
final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync(
ledgerEntries -> {
ManagedLedgerConfig config = ml.getConfig();
boolean isBatchReadEnabled = config.isBatchReadEnabled();
int batchReadMaxBytes = config.getBatchReadMaxSizeBytes();
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, isBatchReadEnabled, batchReadMaxBytes)
.thenAcceptAsync(ledgerEntries -> {
List<Entry> entries = new ArrayList<>();
long totalSize = 0;
try {
Expand Down Expand Up @@ -99,8 +103,12 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSu
@Override
public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
ManagedLedgerConfig config = ml.getConfig();
boolean isBatchReadEnabled = config.isBatchReadEnabled();
int batchReadMaxBytes = config.getBatchReadMaxSizeBytes();
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(),
isBatchReadEnabled, batchReadMaxBytes)
.whenCompleteAsync((ledgerEntries, exception) -> {
if (exception != null) {
ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,9 @@ CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long firstEntry, l
private CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry,
IntSupplier expectedReadCount, boolean allowRetry) {
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
CompletableFuture<List<Entry>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
ManagedLedgerConfig mlConfig = ml.getConfig();
CompletableFuture<List<Entry>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry,
mlConfig.isBatchReadEnabled(), mlConfig.getBatchReadMaxSizeBytes())
.thenApply(
ledgerEntries -> {
requireNonNull(ml.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand All @@ -28,6 +31,11 @@ class ReadEntryUtils {

static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry,
long lastEntry) {
return readAsync(ml, handle, firstEntry, lastEntry, false, 0);
}

static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry,
long lastEntry, boolean batchReadEnabled, int batchReadMaxSize) {
if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) {
// The read handle comes from another managed ledger, in this case, we can only compare the entry range with
// the LAC of that read handle. Specifically, it happens when this method is called by a
Expand All @@ -49,6 +57,66 @@ static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle h
return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is "
+ lastConfirmedEntry + " when reading entry " + lastEntry));
}

int numberOfEntries = (int) (lastEntry - firstEntry + 1);

// Use batch read for multiple entries when enabled.
if (batchReadEnabled && numberOfEntries > 1 && batchReadMaxSize > 0) {
return batchReadUnconfirmed(handle, firstEntry, numberOfEntries, batchReadMaxSize);
}
return handle.readUnconfirmedAsync(firstEntry, lastEntry);
}

private static CompletableFuture<LedgerEntries> batchReadUnconfirmed(
ReadHandle handle, long firstEntry, int maxCount, int maxSize) {
CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
List<LedgerEntry> receivedEntries = new ArrayList<>(maxCount);
List<LedgerEntries> ledgerEntries = new ArrayList<>(4);
doBatchRead(handle, firstEntry, maxCount, maxSize, receivedEntries, ledgerEntries, future);
return future;
}

private static void doBatchRead(ReadHandle handle, long firstEntry, int maxCount, int maxSize,
List<LedgerEntry> receivedEntries, List<LedgerEntries> ledgerEntries,
CompletableFuture<LedgerEntries> future) {
handle.batchReadUnconfirmedAsync(firstEntry, maxCount - receivedEntries.size(), maxSize)
.whenComplete((entries, throwable) -> {
if (throwable != null) {
onBatchReadComplete(handle, firstEntry, maxCount, receivedEntries, ledgerEntries, future,
throwable);
return;
}
long lastReceivedEntry = -1;
int prevReceivedCount = receivedEntries.size();
for (LedgerEntry entry : entries) {
receivedEntries.add(entry);
lastReceivedEntry = entry.getEntryId();
}
ledgerEntries.add(entries);
if (receivedEntries.size() >= maxCount || prevReceivedCount == receivedEntries.size()) {
onBatchReadComplete(handle, firstEntry, maxCount, receivedEntries, ledgerEntries, future, null);
return;
}
doBatchRead(handle, lastReceivedEntry + 1, maxCount, maxSize,
receivedEntries, ledgerEntries, future);
});
}

private static void onBatchReadComplete(ReadHandle handle, long firstEntry, int maxCount,
List<LedgerEntry> receivedEntries, List<LedgerEntries> ledgerEntries,
CompletableFuture<LedgerEntries> future, Throwable error) {
if (error != null) {
ledgerEntries.forEach(LedgerEntries::close);
future.completeExceptionally(error);
return;
}
if (receivedEntries.isEmpty()) {
ledgerEntries.forEach(LedgerEntries::close);
future.completeExceptionally(new ManagedLedgerException(
"Batch read returned no entries for ledger " + handle.getId()
+ " starting from entry " + firstEntry));
return;
}
future.complete(CompositeLedgerEntriesImpl.create(receivedEntries, ledgerEntries));
}
}
Loading
Loading