diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 1fa565d6ec788..1b75fe070c1c3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -165,6 +165,12 @@ public OffloadInProgressException(String msg) { } } + public static class CursorRecoveryInProgressException extends ManagedLedgerException { + public CursorRecoveryInProgressException(String msg) { + super(msg); + } + } + public static class CursorNotFoundException extends ManagedLedgerException { public CursorNotFoundException(String msg) { super(msg); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 10f7948f553cb..7902ecc34093b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -190,6 +190,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @SuppressWarnings("unused") private volatile long totalSize = 0; + static final AtomicLongFieldUpdater LAZY_RECOVERY_IN_PROCESS = AtomicLongFieldUpdater + .newUpdater(ManagedLedgerImpl.class, "lazyRecoveryInProcess"); + @SuppressWarnings("unused") + private volatile long lazyRecoveryInProcess = 0; + // Cursors that are waiting to be notified when new entries are persisted final ConcurrentLinkedQueue waitingCursors; @@ -603,6 +608,8 @@ public void operationFailed(ManagedLedgerException exception) { }); } } else { + LAZY_RECOVERY_IN_PROCESS.addAndGet(ManagedLedgerImpl.this, consumers.size()); + // Lazily recover cursors by put them to uninitializedCursors map. for (final String cursorName : consumers) { if (log.isDebugEnabled()) { @@ -623,6 +630,7 @@ public void operationComplete() { addCursor(cursor); uninitializedCursors.remove(cursor.getName()).complete(cursor); } + LAZY_RECOVERY_IN_PROCESS.decrementAndGet(ManagedLedgerImpl.this); } @Override @@ -631,6 +639,7 @@ public void operationFailed(ManagedLedgerException exception) { synchronized (ManagedLedgerImpl.this) { uninitializedCursors.remove(cursor.getName()).completeExceptionally(exception); } + LAZY_RECOVERY_IN_PROCESS.decrementAndGet(ManagedLedgerImpl.this); } }); } @@ -2593,6 +2602,19 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { return; } + long lazyRecoveryNumber = LAZY_RECOVERY_IN_PROCESS.get(this); + if (lazyRecoveryNumber > 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] Skip trim ledgers because has lazy recovery in process, current={}", + name, lazyRecoveryNumber); + } + promise.completeExceptionally( + new ManagedLedgerException. + CursorRecoveryInProgressException("Current ledger has lazy cursor recovery in process")); + + return; + } + // Ensure only one trimming operation is active if (!trimmerMutex.tryLock()) { scheduleDeferredTrimming(isTruncate, promise); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 70ddbb9998fd8..059d699b45af4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2877,6 +2877,107 @@ public void testLazyRecoverCursor() throws Exception { assertEquals(cursor.getMarkDeletedPosition(), p1); } + @Test + public void onlyTrimCursorAfterLazyRecoverFinished() throws Exception { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setMaxEntriesPerLedger(10); + managedLedgerConfig.setRetentionTime(0, TimeUnit.SECONDS); + + // set for cursor to store state in ledger. + managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(3); + + String ledgerName = "testLedgerLazyRecover"; + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig); + + String cursorNamePrefix = "testCursor_"; + + ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor(cursorNamePrefix + 1); + ManagedCursorImpl cursor2 = (ManagedCursorImpl) ledger.openCursor(cursorNamePrefix + 2); + + int entryNumber = 30; + List entryPositions = new ArrayList<>(entryNumber); + + for (int i = 0; i < entryNumber; i++) { + entryPositions.add(ledger.addEntry("entry-1".getBytes())); + } + + for (ManagedCursorImpl cursor : Set.of(cursor1, cursor2)) { + // read all data + List entries = cursor.readEntries(entryNumber); + entries.forEach(Entry::release); + + // ack some record. + // 0, 1/4 pos, 1/2 pos, 3/4 pos, last pos + cursor.delete(List.of( + entryPositions.get(0), + entryPositions.get(entryNumber / 4), + entryPositions.get(entryNumber / 2), + entryPositions.get(entryNumber * 3 / 4), + entryPositions.get(entryNumber - 1)) + ); + + assertEquals(entryPositions.get(0), cursor.getMarkDeletedPosition()); + + // trigger cursor persist state. + cursor.flush(); + } + + Position cursor2MarkDeletedPosition = cursor2.getMarkDeletedPosition(); + long cursor2LedgerId = cursor2.getCursorLedger(); + + ledger.close(); + + // prepare finished. + + // add delay when recover from ledger, to mock one of the cursor recover is slow + bkc.readEntryDelay(cursor2LedgerId, 1, TimeUnit.HOURS); + + + // use lazy recovery and open ledger again. + // cursor2 should not finish recover. + managedLedgerConfig.setLazyCursorRecovery(true); + ManagedLedgerImpl reOpenedLedger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig); + + int totalLedgerNumber = reOpenedLedger.ledgers.size(); + ManagedCursorImpl reOpnedCursor1 = (ManagedCursorImpl) + reOpenedLedger.openCursor(cursorNamePrefix + 1); + + // before cursor2 recover finished cursor1 move markDelete pos + reOpnedCursor1.markDelete(entryPositions.get(entryPositions.size() - 1)); + + // trigger ml trim + CompletableFuture trimCf = new CompletableFuture<>(); + reOpenedLedger.trimConsumedLedgersInBackground(trimCf); + + try { + trimCf.get(); + } catch (Exception e) { + // trim should fail with exception + assertTrue(e.getCause() instanceof ManagedLedgerException.CursorRecoveryInProgressException); + } + + int totalLedgerNumberAfterTrim = reOpenedLedger.ledgers.size(); + + // the ledger should not be trimmed + assertEquals(totalLedgerNumberAfterTrim, totalLedgerNumber); + + reOpenedLedger.close(); + + // check LAZY_RECOVERY_IN_PROCESS will become 0 when recover finished. + managedLedgerConfig.setLazyCursorRecovery(true); + ManagedLedgerImpl reOpenedLedgerSecondTime = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig); + ManagedCursorImpl reOpenedCursor2SecondTime = + (ManagedCursorImpl) reOpenedLedgerSecondTime.openCursor(cursorNamePrefix + 2); + + assertEquals(cursor2MarkDeletedPosition, reOpenedCursor2SecondTime.getMarkDeletedPosition()); + + Thread.sleep(1000); + long lazyRecoveryInProcess = ManagedLedgerImpl.LAZY_RECOVERY_IN_PROCESS.get(reOpenedLedgerSecondTime); + + // normal after recovery finished this variable should be zero. + assertEquals(lazyRecoveryInProcess, 0); + } + @Test public void testConcurrentOpenCursor() throws Exception { ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testConcurrentOpenCursor"); diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index f0d279ef25050..cd25d6dbca871 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -34,6 +34,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -69,6 +71,8 @@ public class PulsarMockBookKeeper extends BookKeeper { final OrderedExecutor orderedExecutor; final ExecutorService executor; + // use for delay async operation + final ScheduledExecutorService scheduledExecutorService; @Override public ClientConfiguration getConf() { @@ -89,12 +93,15 @@ public static Collection getMockEnsemble() { } final Queue addEntryDelaysMillis = new ConcurrentLinkedQueue<>(); + final Map> readEntryDelayMillis = new ConcurrentHashMap<>(); + final List> failures = new ArrayList<>(); final List> addEntryFailures = new ArrayList<>(); public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception { this.orderedExecutor = orderedExecutor; this.executor = orderedExecutor.chooseThread(); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); } @Override @@ -285,6 +292,9 @@ public void shutdown() { synchronized (this) { defaultResponse = FutureUtils.exception(new BKException.BKClientClosedException()); } + + scheduledExecutorService.shutdown(); + for (PulsarMockLedgerHandle ledger : ledgers.values()) { ledger.entries.clear(); } @@ -367,6 +377,11 @@ public synchronized void addEntryDelay(long delay, TimeUnit unit) { addEntryDelaysMillis.add(unit.toMillis(delay)); } + public synchronized void readEntryDelay(long entryId, long delayTime, TimeUnit unit) { + Queue delay = readEntryDelayMillis.computeIfAbsent(entryId, (__) -> new LinkedBlockingQueue<>()); + delay.add(unit.toMillis(delayTime)); + } + static int getExceptionCode(Throwable t) { if (t instanceof BKException) { return ((BKException) t).getCode(); diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index dea33a0e67662..f2363c37a1d0d 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import lombok.Getter; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; @@ -101,40 +102,62 @@ public void asyncClose(CloseCallback cb, Object ctx) { @Override public void asyncReadEntries(final long firstEntry, final long lastEntry, final ReadCallback cb, final Object ctx) { bk.getProgrammedFailure().thenComposeAsync((res) -> { - log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); - final Queue seq = new ArrayDeque(); - long entryId = firstEntry; - while (entryId <= lastEntry && entryId < entries.size()) { - seq.add(new LedgerEntry(entries.get((int) entryId++).duplicate())); + long delay = 0; + Queue delays = bk.readEntryDelayMillis.get(ledgerId); + if (delays != null) { + Long tmpDelay = delays.poll(); + // null check to avoid NPE + if (tmpDelay != null) { + delay = tmpDelay; } + } - log.debug("Entries read: {}", seq); + log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); - try { - Thread.sleep(1); - } catch (InterruptedException e) { + final Queue seq = new ArrayDeque(); + long entryId = firstEntry; + while (entryId <= lastEntry && entryId < entries.size()) { + seq.add(new LedgerEntry(entries.get((int) entryId++).duplicate())); + } + + log.debug("Entries read: {}", seq); + + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + + Enumeration entries = new Enumeration() { + @Override + public boolean hasMoreElements() { + return !seq.isEmpty(); } - Enumeration entries = new Enumeration() { - @Override - public boolean hasMoreElements() { - return !seq.isEmpty(); - } - - @Override - public LedgerEntry nextElement() { - return seq.remove(); - } - }; + @Override + public LedgerEntry nextElement() { + return seq.remove(); + } + }; + + if (delay != 0) { + CompletableFuture> ledgers = new CompletableFuture<>(); + + bk.scheduledExecutorService.schedule(() -> { + ledgers.complete(entries); + }, delay, TimeUnit.MILLISECONDS); + + return ledgers; + } else { return FutureUtils.value(entries); - }).whenCompleteAsync((res, exception) -> { - if (exception != null) { - cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception), - PulsarMockLedgerHandle.this, null, ctx); - } else { - cb.readComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, res, ctx); - } - }, bk.executor); + } + }).whenCompleteAsync((res, exception) -> { + if (exception != null) { + cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception), + PulsarMockLedgerHandle.this, null, ctx); + } else { + cb.readComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, res, ctx); + } + }, bk.executor); } @Override