Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
this.uninitializedCursors = new HashMap();
this.uninitializedCursors = new ConcurrentHashMap<>();
this.clock = config.getClock();

// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
Expand Down Expand Up @@ -619,18 +619,14 @@ public void operationComplete() {
log.info("[{}] Lazy recovery for cursor {} completed. pos={} -- todo={}", name,
cursorName, cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
cursor.setActive();
synchronized (ManagedLedgerImpl.this) {
addCursor(cursor);
uninitializedCursors.remove(cursor.getName()).complete(cursor);
}
addCursor(cursor);
uninitializedCursors.remove(cursor.getName()).complete(cursor);
}

@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Lazy recovery for cursor {} failed", name, cursorName, exception);
synchronized (ManagedLedgerImpl.this) {
uninitializedCursors.remove(cursor.getName()).completeExceptionally(exception);
}
uninitializedCursors.remove(cursor.getName()).completeExceptionally(exception);
}
});
}
Expand Down Expand Up @@ -1000,24 +996,25 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
cursor.setActive();

synchronized (ManagedLedgerImpl.this) {
// Update the ack position (ignoring entries that were written while the cursor was being created)
cursor.initializeCursorPosition(InitialPosition.Earliest == initialPosition
? getFirstPositionAndCounter()
: getLastPositionAndCounter());
addCursor(cursor);
uninitializedCursors.remove(cursorName).complete(cursor);
}

addCursor(cursor);
uninitializedCursors.remove(cursorName).complete(cursor);

callback.openCursorComplete(cursor, ctx);
}

@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Failed to open cursor: {}", name, cursor);
uninitializedCursors.remove(cursorName).completeExceptionally(exception);

synchronized (ManagedLedgerImpl.this) {
uninitializedCursors.remove(cursorName).completeExceptionally(exception);
}
callback.openCursorFailed(exception, ctx);
}
});
Expand Down Expand Up @@ -1095,6 +1092,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
@VisibleForTesting
public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException {
return newNonDurableCursor(
startCursorPosition,
Expand Down Expand Up @@ -1127,10 +1125,9 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
(PositionImpl) startCursorPosition, initialPosition, isReadCompacted);
cursor.setActive();

log.info("[{}] Opened new cursor: {}", name, cursor);
synchronized (this) {
addCursor(cursor);
}
log.info("[{}] Opened new non durable cursor: {}", name, cursor);

addCursor(cursor);

return cursor;
}
Expand Down Expand Up @@ -1191,17 +1188,15 @@ public long getEstimatedBacklogSize() {
long size = 0;
final long slowestConsumerLedgerId = pos.getLedgerId();

LedgerInfo ledgerInfo;
// Subtract size of ledgers that were already fully consumed but not trimmed yet
synchronized (this) {
size = getTotalSize();
size -= ledgers.values().stream().filter(li -> li.getLedgerId() < slowestConsumerLedgerId)
size -= ledgers.values().stream().takeWhile(li -> li.getLedgerId() < slowestConsumerLedgerId)
.mapToLong(LedgerInfo::getSize).sum();
}

LedgerInfo ledgerInfo = null;
synchronized (this) {
ledgerInfo = ledgers.get(pos.getLedgerId());
}

if (ledgerInfo == null) {
// ledger was removed
if (pos.compareTo(getMarkDeletePositionOfSlowestConsumer()) == 0) {
Expand Down