From 46b8d72c2c0e286c916390accd99b0fad4c3e680 Mon Sep 17 00:00:00 2001 From: WJL3333 Date: Thu, 29 Jun 2023 00:40:12 +0800 Subject: [PATCH] Avoid hold managedLedgerImpl synchronized lock when cursor init. --- .../mledger/impl/ManagedLedgerImpl.java | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 10f7948f553cb..8d55d7c6d8e58 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -358,7 +358,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.entryCache = factory.getEntryCacheManager().getEntryCache(this); this.waitingCursors = Queues.newConcurrentLinkedQueue(); this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue(); - this.uninitializedCursors = new HashMap(); + this.uninitializedCursors = new ConcurrentHashMap<>(); this.clock = config.getClock(); // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time @@ -619,18 +619,14 @@ public void operationComplete() { log.info("[{}] Lazy recovery for cursor {} completed. pos={} -- todo={}", name, cursorName, cursor.getMarkDeletedPosition(), cursorCount.get() - 1); cursor.setActive(); - synchronized (ManagedLedgerImpl.this) { - addCursor(cursor); - uninitializedCursors.remove(cursor.getName()).complete(cursor); - } + addCursor(cursor); + uninitializedCursors.remove(cursor.getName()).complete(cursor); } @Override public void operationFailed(ManagedLedgerException exception) { log.warn("[{}] Lazy recovery for cursor {} failed", name, cursorName, exception); - synchronized (ManagedLedgerImpl.this) { - uninitializedCursors.remove(cursor.getName()).completeExceptionally(exception); - } + uninitializedCursors.remove(cursor.getName()).completeExceptionally(exception); } }); } @@ -1000,24 +996,25 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP public void operationComplete() { log.info("[{}] Opened new cursor: {}", name, cursor); cursor.setActive(); + synchronized (ManagedLedgerImpl.this) { // Update the ack position (ignoring entries that were written while the cursor was being created) cursor.initializeCursorPosition(InitialPosition.Earliest == initialPosition ? getFirstPositionAndCounter() : getLastPositionAndCounter()); - addCursor(cursor); - uninitializedCursors.remove(cursorName).complete(cursor); } + + addCursor(cursor); + uninitializedCursors.remove(cursorName).complete(cursor); + callback.openCursorComplete(cursor, ctx); } @Override public void operationFailed(ManagedLedgerException exception) { log.warn("[{}] Failed to open cursor: {}", name, cursor); + uninitializedCursors.remove(cursorName).completeExceptionally(exception); - synchronized (ManagedLedgerImpl.this) { - uninitializedCursors.remove(cursorName).completeExceptionally(exception); - } callback.openCursorFailed(exception, ctx); } }); @@ -1095,6 +1092,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { } @Override + @VisibleForTesting public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException { return newNonDurableCursor( startCursorPosition, @@ -1127,10 +1125,9 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu (PositionImpl) startCursorPosition, initialPosition, isReadCompacted); cursor.setActive(); - log.info("[{}] Opened new cursor: {}", name, cursor); - synchronized (this) { - addCursor(cursor); - } + log.info("[{}] Opened new non durable cursor: {}", name, cursor); + + addCursor(cursor); return cursor; } @@ -1191,17 +1188,15 @@ public long getEstimatedBacklogSize() { long size = 0; final long slowestConsumerLedgerId = pos.getLedgerId(); + LedgerInfo ledgerInfo; // Subtract size of ledgers that were already fully consumed but not trimmed yet synchronized (this) { size = getTotalSize(); - size -= ledgers.values().stream().filter(li -> li.getLedgerId() < slowestConsumerLedgerId) + size -= ledgers.values().stream().takeWhile(li -> li.getLedgerId() < slowestConsumerLedgerId) .mapToLong(LedgerInfo::getSize).sum(); - } - - LedgerInfo ledgerInfo = null; - synchronized (this) { ledgerInfo = ledgers.get(pos.getLedgerId()); } + if (ledgerInfo == null) { // ledger was removed if (pos.compareTo(getMarkDeletePositionOfSlowestConsumer()) == 0) {