Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1547,13 +1547,19 @@ public synchronized void asyncTerminate(TerminateCallback callback, Object ctx)
if (rc != BKException.Code.OK) {
callback.terminateFailed(createManagedLedgerException(rc), ctx);
} else {
lastConfirmedEntry = PositionFactory.create(lh.getId(), lh.getLastAddConfirmed());
ManagedLedgerInfo managedLedgerInfo;
synchronized (ManagedLedgerImpl.this) {
lastConfirmedEntry = PositionFactory.create(lh.getId(), lh.getLastAddConfirmed());
updateClosedLedgerInfo(lh, lh.getLastAddConfirmed(), false);
managedLedgerInfo = getManagedLedgerInfo();
}
// Store the new state in metadata
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
store.asyncUpdateLedgerIds(name, managedLedgerInfo, ledgersStat, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
ledgersStat = stat;
log.info().attr("lastConfirmedEntry", lastConfirmedEntry).log("Terminated managed ledger");
maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER);
callback.terminateComplete(lastConfirmedEntry, ctx);
}

Expand Down Expand Up @@ -1966,23 +1972,27 @@ synchronized void ledgerClosed(final LedgerHandle lh, Long lastAddConfirmed) {
return;
}

updateClosedLedgerInfo(lh, lastAddConfirmed, true);

trimConsumedLedgersInBackground();

maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER);

createLedgerAfterClosed();
}

private void updateClosedLedgerInfo(final LedgerHandle lh, Long lastAddConfirmed, boolean removeEmptyLedger) {
long entriesInLedger = lastAddConfirmed != null ? lastAddConfirmed + 1 : lh.getLastAddConfirmed() + 1;
log.debug().attr("ledgerId", lh.getId()).attr("entries", entriesInLedger).log("Ledger has been closed");
if (entriesInLedger > 0) {
LedgerInfo info = new LedgerInfo().setLedgerId(lh.getId()).setEntries(entriesInLedger)
.setSize(lh.getLength()).setTimestamp(clock.millis());
ledgers.put(lh.getId(), info);
} else {
} else if (removeEmptyLedger) {
// The last ledger was empty, so we can discard it
ledgers.remove(lh.getId());
mbean.startDataLedgerDeleteOp();
}

trimConsumedLedgersInBackground();

maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER);

createLedgerAfterClosed();
}

@Override
Expand Down Expand Up @@ -3682,23 +3692,27 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
}

long current = ledgers.lastKey();
boolean includeLastLedger = STATE_UPDATER.get(this) == State.Terminated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs one more guard for the in-progress termination window. asyncTerminate() sets state = State.Terminated before the BookKeeper close callback refreshes the final LedgerInfo. If asyncOffloadPrefix() runs in that window, includeLastLedger becomes true, but the current ledger still has the active-ledger placeholder metadata (entries/size are still 0 and timestamp is not refreshed). The loop can then find no ledgers to offload and return lastConfirmedEntry.getNext(), which reports the prefix as fully offloaded even though the final ledger was never offloaded.

Could we gate includeLastLedger on the final LedgerInfo actually being closed/refreshed, or introduce a separate terminating state so manual offload does not treat an in-flight termination as completed? A regression test can block the BK close with PulsarMockBookKeeper.promiseAfter(0), call asyncTerminate(), call offloadPrefix() before releasing the close, and assert it does not report lastConfirmedEntry.getNext() without offloading the final ledger.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommended fix: Gate includeLastLedger on the final ledger actually being closed (metadata refreshed). The simplest reliable signal is whether the last ledger's LedgerInfo has been populated:

LedgerInfo lastLedgerInfo = ledgers.get(current);
boolean lastLedgerClosed = lastLedgerInfo != null && lastLedgerInfo.getEntries() > 0;
boolean includeLastLedger = STATE_UPDATER.get(this) == State.Terminated
&& requestOffloadTo.compareTo(lastConfirmedEntry) >= 0
&& lastLedgerClosed;

&& requestOffloadTo.compareTo(lastConfirmedEntry) >= 0;

// the first ledger which will not be offloaded. Defaults to current,
// in the case that the whole headmap is offloaded. Otherwise, it will
// be set as we iterate through the headmap values
long firstLedgerRetained = current;
for (LedgerInfo ls : ledgers.headMap(current).values()) {
if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
Position firstPositionRetained = includeLastLedger ? lastConfirmedEntry.getNext()
: PositionFactory.create(current, 0);
for (LedgerInfo ls : ledgers.headMap(current, includeLastLedger).values()) {
if (requestOffloadTo.getLedgerId() > ls.getLedgerId()
|| (includeLastLedger && requestOffloadTo.getLedgerId() == ls.getLedgerId())) {
// don't offload if ledger has already been offloaded, or is empty
if (!ls.getOffloadContext().isComplete() && ls.getSize() > 0) {
ledgersToOffload.add(ls);
}
} else {
firstLedgerRetained = ls.getLedgerId();
firstPositionRetained = PositionFactory.create(ls.getLedgerId(), 0);
break;
}
}
firstUnoffloaded = PositionFactory.create(firstLedgerRetained, 0);
firstUnoffloaded = firstPositionRetained;
}

if (ledgersToOffload.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,125 @@ public void offloadAsSoonAsClosed(Long sizeThreshold, Long timeThreshold) throws
ledger.getLedgersInfoAsList().get(1).getLedgerId()));
}

@Test
public void offloadTerminatedLastLedger() throws Exception {
MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config);

for (int i = 0; i < 5; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}

long lastLedgerId = ledger.getLastConfirmedEntry().getLedgerId();
Position lastPosition = ledger.terminate();

Position firstUnoffloaded = ledger.offloadPrefix(lastPosition);

assertEquals(firstUnoffloaded, lastPosition.getNext());
assertEquals(offloader.offloadedLedgers(), Set.of(lastLedgerId));
LedgerInfo lastLedgerInfo = ledger.getLedgersInfoAsList().get(0);
assertEquals(lastLedgerInfo.getLedgerId(), lastLedgerId);
assertTrue(lastLedgerInfo.getEntries() > 0);
assertTrue(lastLedgerInfo.getSize() > 0);
assertTrue(lastLedgerInfo.getTimestamp() > 0);
assertTrue(lastLedgerInfo.getOffloadContext().isComplete());
}

@Test
public void offloadTerminatedLastLedgerOnlyWhenPositionCoversTerminatedPosition() throws Exception {
MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config);

for (int i = 0; i < 5; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}

long lastLedgerId = ledger.getLastConfirmedEntry().getLedgerId();
Position lastPosition = ledger.terminate();
Position beforeTerminatedPosition = PositionFactory.create(lastLedgerId, lastPosition.getEntryId() - 1);

Position firstUnoffloaded = ledger.offloadPrefix(beforeTerminatedPosition);

assertEquals(firstUnoffloaded, PositionFactory.create(lastLedgerId, 0));
assertEquals(offloader.offloadedLedgers().size(), 0);
assertFalse(ledger.getLedgersInfoAsList().get(0).getOffloadContext().isComplete());
}

@Test
public void offloadTerminatedLastLedgerIsIdempotent() throws Exception {
AtomicInteger offloadCalls = new AtomicInteger();
MockLedgerOffloader offloader = new MockLedgerOffloader() {
@Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uuid,
Map<String, String> extraMetadata) {
offloadCalls.incrementAndGet();
return super.offload(ledger, uuid, extraMetadata);
}
};
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config);

for (int i = 0; i < 5; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}

long lastLedgerId = ledger.getLastConfirmedEntry().getLedgerId();
Position lastPosition = ledger.terminate();

Position firstUnoffloaded = ledger.offloadPrefix(lastPosition);
Position firstUnoffloadedAgain = ledger.offloadPrefix(lastPosition);

assertEquals(firstUnoffloaded, lastPosition.getNext());
assertEquals(firstUnoffloadedAgain, lastPosition.getNext());
assertEquals(offloader.offloadedLedgers(), Set.of(lastLedgerId));
assertEquals(offloadCalls.get(), 1);
}

@Test
public void terminateTriggersAutomaticOffloadForLastLedger() throws Exception {
MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config);

for (int i = 0; i < 5; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}

long lastLedgerId = ledger.getLastConfirmedEntry().getLedgerId();
Position lastPosition = ledger.terminate();

assertEquals(lastPosition.getLedgerId(), lastLedgerId);
assertEventuallyTrue(() -> offloader.offloadedLedgers().equals(Set.of(lastLedgerId)));
assertTrue(ledger.isTerminated());
assertEquals(ledger.getLedgersInfoAsList().size(), 1);
assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().isComplete());
}


static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
// wait up to 3 seconds
Expand Down
Loading