Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ public OffloadInProgressException(String msg) {
}
}

public static class CursorRecoveryInProgressException extends ManagedLedgerException {
public CursorRecoveryInProgressException(String msg) {
super(msg);
}
}

public static class CursorNotFoundException extends ManagedLedgerException {
public CursorNotFoundException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@SuppressWarnings("unused")
private volatile long totalSize = 0;

static final AtomicLongFieldUpdater<ManagedLedgerImpl> LAZY_RECOVERY_IN_PROCESS = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "lazyRecoveryInProcess");
@SuppressWarnings("unused")
private volatile long lazyRecoveryInProcess = 0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the variable uninitializedCursors is tracing the cursor, which is in initialize progress. I think the new variable lazyRecoveryInProcess is not needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review, I agree with you. The idea to use this variable here is to avoid get the synchronized lock of the managedledger instance. current uninitializedCursors is not thread safe. so each time a call to get synchronized lock is needed. I submit another to make this map thread safe. #20674. Need some guide to handle this. If acquire synchronized lock is acceptable. I will change the code in this style.


// Cursors that are waiting to be notified when new entries are persisted
final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;

Expand Down Expand Up @@ -603,6 +608,8 @@ public void operationFailed(ManagedLedgerException exception) {
});
}
} else {
LAZY_RECOVERY_IN_PROCESS.addAndGet(ManagedLedgerImpl.this, consumers.size());

// Lazily recover cursors by put them to uninitializedCursors map.
for (final String cursorName : consumers) {
if (log.isDebugEnabled()) {
Expand All @@ -623,6 +630,7 @@ public void operationComplete() {
addCursor(cursor);
uninitializedCursors.remove(cursor.getName()).complete(cursor);
}
LAZY_RECOVERY_IN_PROCESS.decrementAndGet(ManagedLedgerImpl.this);
}

@Override
Expand All @@ -631,6 +639,7 @@ public void operationFailed(ManagedLedgerException exception) {
synchronized (ManagedLedgerImpl.this) {
uninitializedCursors.remove(cursor.getName()).completeExceptionally(exception);
}
LAZY_RECOVERY_IN_PROCESS.decrementAndGet(ManagedLedgerImpl.this);
}
});
}
Expand Down Expand Up @@ -2593,6 +2602,19 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
return;
}

long lazyRecoveryNumber = LAZY_RECOVERY_IN_PROCESS.get(this);
if (lazyRecoveryNumber > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skip trim ledgers because has lazy recovery in process, current={}",
name, lazyRecoveryNumber);
}
promise.completeExceptionally(
new ManagedLedgerException.
CursorRecoveryInProgressException("Current ledger has lazy cursor recovery in process"));

return;
}

// Ensure only one trimming operation is active
if (!trimmerMutex.tryLock()) {
scheduleDeferredTrimming(isTruncate, promise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2877,6 +2877,107 @@ public void testLazyRecoverCursor() throws Exception {
assertEquals(cursor.getMarkDeletedPosition(), p1);
}

@Test
public void onlyTrimCursorAfterLazyRecoverFinished() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(10);
managedLedgerConfig.setRetentionTime(0, TimeUnit.SECONDS);

// set for cursor to store state in ledger.
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(3);

String ledgerName = "testLedgerLazyRecover";
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);

String cursorNamePrefix = "testCursor_";

ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor(cursorNamePrefix + 1);
ManagedCursorImpl cursor2 = (ManagedCursorImpl) ledger.openCursor(cursorNamePrefix + 2);

int entryNumber = 30;
List<Position> entryPositions = new ArrayList<>(entryNumber);

for (int i = 0; i < entryNumber; i++) {
entryPositions.add(ledger.addEntry("entry-1".getBytes()));
}

for (ManagedCursorImpl cursor : Set.of(cursor1, cursor2)) {
// read all data
List<Entry> entries = cursor.readEntries(entryNumber);
entries.forEach(Entry::release);

// ack some record.
// 0, 1/4 pos, 1/2 pos, 3/4 pos, last pos
cursor.delete(List.of(
entryPositions.get(0),
entryPositions.get(entryNumber / 4),
entryPositions.get(entryNumber / 2),
entryPositions.get(entryNumber * 3 / 4),
entryPositions.get(entryNumber - 1))
);

assertEquals(entryPositions.get(0), cursor.getMarkDeletedPosition());

// trigger cursor persist state.
cursor.flush();
}

Position cursor2MarkDeletedPosition = cursor2.getMarkDeletedPosition();
long cursor2LedgerId = cursor2.getCursorLedger();

ledger.close();

// prepare finished.

// add delay when recover from ledger, to mock one of the cursor recover is slow
bkc.readEntryDelay(cursor2LedgerId, 1, TimeUnit.HOURS);


// use lazy recovery and open ledger again.
// cursor2 should not finish recover.
managedLedgerConfig.setLazyCursorRecovery(true);
ManagedLedgerImpl reOpenedLedger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);

int totalLedgerNumber = reOpenedLedger.ledgers.size();
ManagedCursorImpl reOpnedCursor1 = (ManagedCursorImpl)
reOpenedLedger.openCursor(cursorNamePrefix + 1);

// before cursor2 recover finished cursor1 move markDelete pos
reOpnedCursor1.markDelete(entryPositions.get(entryPositions.size() - 1));

// trigger ml trim
CompletableFuture<Void> trimCf = new CompletableFuture<>();
reOpenedLedger.trimConsumedLedgersInBackground(trimCf);

try {
trimCf.get();
} catch (Exception e) {
// trim should fail with exception
assertTrue(e.getCause() instanceof ManagedLedgerException.CursorRecoveryInProgressException);
}

int totalLedgerNumberAfterTrim = reOpenedLedger.ledgers.size();

// the ledger should not be trimmed
assertEquals(totalLedgerNumberAfterTrim, totalLedgerNumber);

reOpenedLedger.close();

// check LAZY_RECOVERY_IN_PROCESS will become 0 when recover finished.
managedLedgerConfig.setLazyCursorRecovery(true);
ManagedLedgerImpl reOpenedLedgerSecondTime = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);
ManagedCursorImpl reOpenedCursor2SecondTime =
(ManagedCursorImpl) reOpenedLedgerSecondTime.openCursor(cursorNamePrefix + 2);

assertEquals(cursor2MarkDeletedPosition, reOpenedCursor2SecondTime.getMarkDeletedPosition());

Thread.sleep(1000);
long lazyRecoveryInProcess = ManagedLedgerImpl.LAZY_RECOVERY_IN_PROCESS.get(reOpenedLedgerSecondTime);

// normal after recovery finished this variable should be zero.
assertEquals(lazyRecoveryInProcess, 0);
}

@Test
public void testConcurrentOpenCursor() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testConcurrentOpenCursor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -69,6 +71,8 @@ public class PulsarMockBookKeeper extends BookKeeper {

final OrderedExecutor orderedExecutor;
final ExecutorService executor;
// use for delay async operation
final ScheduledExecutorService scheduledExecutorService;

@Override
public ClientConfiguration getConf() {
Expand All @@ -89,12 +93,15 @@ public static Collection<BookieId> getMockEnsemble() {
}

final Queue<Long> addEntryDelaysMillis = new ConcurrentLinkedQueue<>();
final Map<Long, Queue<Long>> readEntryDelayMillis = new ConcurrentHashMap<>();

final List<CompletableFuture<Void>> failures = new ArrayList<>();
final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();

public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception {
this.orderedExecutor = orderedExecutor;
this.executor = orderedExecutor.chooseThread();
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}

@Override
Expand Down Expand Up @@ -285,6 +292,9 @@ public void shutdown() {
synchronized (this) {
defaultResponse = FutureUtils.exception(new BKException.BKClientClosedException());
}

scheduledExecutorService.shutdown();

for (PulsarMockLedgerHandle ledger : ledgers.values()) {
ledger.entries.clear();
}
Expand Down Expand Up @@ -367,6 +377,11 @@ public synchronized void addEntryDelay(long delay, TimeUnit unit) {
addEntryDelaysMillis.add(unit.toMillis(delay));
}

public synchronized void readEntryDelay(long entryId, long delayTime, TimeUnit unit) {
Queue<Long> delay = readEntryDelayMillis.computeIfAbsent(entryId, (__) -> new LinkedBlockingQueue<>());
delay.add(unit.toMillis(delayTime));
}

static int getExceptionCode(Throwable t) {
if (t instanceof BKException) {
return ((BKException) t).getCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
Expand Down Expand Up @@ -101,40 +102,62 @@ public void asyncClose(CloseCallback cb, Object ctx) {
@Override
public void asyncReadEntries(final long firstEntry, final long lastEntry, final ReadCallback cb, final Object ctx) {
bk.getProgrammedFailure().thenComposeAsync((res) -> {
log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());
final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>();
long entryId = firstEntry;
while (entryId <= lastEntry && entryId < entries.size()) {
seq.add(new LedgerEntry(entries.get((int) entryId++).duplicate()));
long delay = 0;
Queue<Long> delays = bk.readEntryDelayMillis.get(ledgerId);
if (delays != null) {
Long tmpDelay = delays.poll();
// null check to avoid NPE
if (tmpDelay != null) {
delay = tmpDelay;
}
}

log.debug("Entries read: {}", seq);
log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());

try {
Thread.sleep(1);
} catch (InterruptedException e) {
final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>();
long entryId = firstEntry;
while (entryId <= lastEntry && entryId < entries.size()) {
seq.add(new LedgerEntry(entries.get((int) entryId++).duplicate()));
}

log.debug("Entries read: {}", seq);

try {
Thread.sleep(1);
} catch (InterruptedException e) {
}

Enumeration<LedgerEntry> entries = new Enumeration<LedgerEntry>() {
@Override
public boolean hasMoreElements() {
return !seq.isEmpty();
}

Enumeration<LedgerEntry> entries = new Enumeration<LedgerEntry>() {
@Override
public boolean hasMoreElements() {
return !seq.isEmpty();
}

@Override
public LedgerEntry nextElement() {
return seq.remove();
}
};
@Override
public LedgerEntry nextElement() {
return seq.remove();
}
};

if (delay != 0) {
CompletableFuture<Enumeration<LedgerEntry>> ledgers = new CompletableFuture<>();

bk.scheduledExecutorService.schedule(() -> {
ledgers.complete(entries);
}, delay, TimeUnit.MILLISECONDS);

return ledgers;
} else {
return FutureUtils.value(entries);
}).whenCompleteAsync((res, exception) -> {
if (exception != null) {
cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception),
PulsarMockLedgerHandle.this, null, ctx);
} else {
cb.readComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, res, ctx);
}
}, bk.executor);
}
}).whenCompleteAsync((res, exception) -> {
if (exception != null) {
cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception),
PulsarMockLedgerHandle.this, null, ctx);
} else {
cb.readComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, res, ctx);
}
}, bk.executor);
}

@Override
Expand Down