diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 22d1655b718ec..bfecb5c50c683 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1547,13 +1547,19 @@ public synchronized void asyncTerminate(TerminateCallback callback, Object ctx) if (rc != BKException.Code.OK) { callback.terminateFailed(createManagedLedgerException(rc), ctx); } else { - lastConfirmedEntry = PositionFactory.create(lh.getId(), lh.getLastAddConfirmed()); + ManagedLedgerInfo managedLedgerInfo; + synchronized (ManagedLedgerImpl.this) { + lastConfirmedEntry = PositionFactory.create(lh.getId(), lh.getLastAddConfirmed()); + updateClosedLedgerInfo(lh, lh.getLastAddConfirmed(), false); + managedLedgerInfo = getManagedLedgerInfo(); + } // Store the new state in metadata - store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback() { + store.asyncUpdateLedgerIds(name, managedLedgerInfo, ledgersStat, new MetaStoreCallback() { @Override public void operationComplete(Void result, Stat stat) { ledgersStat = stat; log.info().attr("lastConfirmedEntry", lastConfirmedEntry).log("Terminated managed ledger"); + maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER); callback.terminateComplete(lastConfirmedEntry, ctx); } @@ -1966,23 +1972,27 @@ synchronized void ledgerClosed(final LedgerHandle lh, Long lastAddConfirmed) { return; } + updateClosedLedgerInfo(lh, lastAddConfirmed, true); + + trimConsumedLedgersInBackground(); + + maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER); + + createLedgerAfterClosed(); + } + + private void updateClosedLedgerInfo(final LedgerHandle lh, Long lastAddConfirmed, boolean removeEmptyLedger) { long entriesInLedger = lastAddConfirmed != null ? lastAddConfirmed + 1 : lh.getLastAddConfirmed() + 1; log.debug().attr("ledgerId", lh.getId()).attr("entries", entriesInLedger).log("Ledger has been closed"); if (entriesInLedger > 0) { LedgerInfo info = new LedgerInfo().setLedgerId(lh.getId()).setEntries(entriesInLedger) .setSize(lh.getLength()).setTimestamp(clock.millis()); ledgers.put(lh.getId(), info); - } else { + } else if (removeEmptyLedger) { // The last ledger was empty, so we can discard it ledgers.remove(lh.getId()); mbean.startDataLedgerDeleteOp(); } - - trimConsumedLedgersInBackground(); - - maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER); - - createLedgerAfterClosed(); } @Override @@ -3682,23 +3692,27 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct } long current = ledgers.lastKey(); + boolean includeLastLedger = STATE_UPDATER.get(this) == State.Terminated + && requestOffloadTo.compareTo(lastConfirmedEntry) >= 0; // the first ledger which will not be offloaded. Defaults to current, // in the case that the whole headmap is offloaded. Otherwise, it will // be set as we iterate through the headmap values - long firstLedgerRetained = current; - for (LedgerInfo ls : ledgers.headMap(current).values()) { - if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) { + Position firstPositionRetained = includeLastLedger ? lastConfirmedEntry.getNext() + : PositionFactory.create(current, 0); + for (LedgerInfo ls : ledgers.headMap(current, includeLastLedger).values()) { + if (requestOffloadTo.getLedgerId() > ls.getLedgerId() + || (includeLastLedger && requestOffloadTo.getLedgerId() == ls.getLedgerId())) { // don't offload if ledger has already been offloaded, or is empty if (!ls.getOffloadContext().isComplete() && ls.getSize() > 0) { ledgersToOffload.add(ls); } } else { - firstLedgerRetained = ls.getLedgerId(); + firstPositionRetained = PositionFactory.create(ls.getLedgerId(), 0); break; } } - firstUnoffloaded = PositionFactory.create(firstLedgerRetained, 0); + firstUnoffloaded = firstPositionRetained; } if (ledgersToOffload.isEmpty()) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 10f72cb8d53b7..f5cef4808bd05 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -1347,6 +1347,125 @@ public void offloadAsSoonAsClosed(Long sizeThreshold, Long timeThreshold) throws ledger.getLedgersInfoAsList().get(1).getLedgerId())); } + @Test + public void offloadTerminatedLastLedger() throws Exception { + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config); + + for (int i = 0; i < 5; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + + long lastLedgerId = ledger.getLastConfirmedEntry().getLedgerId(); + Position lastPosition = ledger.terminate(); + + Position firstUnoffloaded = ledger.offloadPrefix(lastPosition); + + assertEquals(firstUnoffloaded, lastPosition.getNext()); + assertEquals(offloader.offloadedLedgers(), Set.of(lastLedgerId)); + LedgerInfo lastLedgerInfo = ledger.getLedgersInfoAsList().get(0); + assertEquals(lastLedgerInfo.getLedgerId(), lastLedgerId); + assertTrue(lastLedgerInfo.getEntries() > 0); + assertTrue(lastLedgerInfo.getSize() > 0); + assertTrue(lastLedgerInfo.getTimestamp() > 0); + assertTrue(lastLedgerInfo.getOffloadContext().isComplete()); + } + + @Test + public void offloadTerminatedLastLedgerOnlyWhenPositionCoversTerminatedPosition() throws Exception { + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config); + + for (int i = 0; i < 5; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + + long lastLedgerId = ledger.getLastConfirmedEntry().getLedgerId(); + Position lastPosition = ledger.terminate(); + Position beforeTerminatedPosition = PositionFactory.create(lastLedgerId, lastPosition.getEntryId() - 1); + + Position firstUnoffloaded = ledger.offloadPrefix(beforeTerminatedPosition); + + assertEquals(firstUnoffloaded, PositionFactory.create(lastLedgerId, 0)); + assertEquals(offloader.offloadedLedgers().size(), 0); + assertFalse(ledger.getLedgersInfoAsList().get(0).getOffloadContext().isComplete()); + } + + @Test + public void offloadTerminatedLastLedgerIsIdempotent() throws Exception { + AtomicInteger offloadCalls = new AtomicInteger(); + MockLedgerOffloader offloader = new MockLedgerOffloader() { + @Override + public CompletableFuture offload(ReadHandle ledger, + UUID uuid, + Map extraMetadata) { + offloadCalls.incrementAndGet(); + return super.offload(ledger, uuid, extraMetadata); + } + }; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config); + + for (int i = 0; i < 5; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + + long lastLedgerId = ledger.getLastConfirmedEntry().getLedgerId(); + Position lastPosition = ledger.terminate(); + + Position firstUnoffloaded = ledger.offloadPrefix(lastPosition); + Position firstUnoffloadedAgain = ledger.offloadPrefix(lastPosition); + + assertEquals(firstUnoffloaded, lastPosition.getNext()); + assertEquals(firstUnoffloadedAgain, lastPosition.getNext()); + assertEquals(offloader.offloadedLedgers(), Set.of(lastLedgerId)); + assertEquals(offloadCalls.get(), 1); + } + + @Test + public void terminateTriggersAutomaticOffloadForLastLedger() throws Exception { + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config); + + for (int i = 0; i < 5; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + + long lastLedgerId = ledger.getLastConfirmedEntry().getLedgerId(); + Position lastPosition = ledger.terminate(); + + assertEquals(lastPosition.getLedgerId(), lastLedgerId); + assertEventuallyTrue(() -> offloader.offloadedLedgers().equals(Set.of(lastLedgerId))); + assertTrue(ledger.isTerminated()); + assertEquals(ledger.getLedgersInfoAsList().size(), 1); + assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().isComplete()); + } + static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception { // wait up to 3 seconds