From 83ee0f565e345628b91909094b66c3eb7c85c8aa Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Tue, 9 Jun 2026 21:05:51 +0800 Subject: [PATCH] [fix][broker] Apply dedup disable after recovery --- .../persistent/MessageDeduplication.java | 5 +- .../BrokerMessageDeduplicationTest.java | 84 ++++++++++++++++++- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index a980556f49baa..ce7e74332292a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -161,6 +161,9 @@ public CompletableFuture checkStatus() { boolean shouldBeEnabled = topic.isDeduplicationEnabled(); synchronized (this) { if (status == Status.Recovering) { + if (!shouldBeEnabled) { + return statusChangeFuture.handle((__, e) -> null).thenCompose(__ -> checkStatus()); + } return statusChangeFuture; } if (status == Status.Removing) { @@ -191,7 +194,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { }, null); } - if (status == Status.Enabled && !shouldBeEnabled) { + if ((status == Status.Enabled || status == Status.Failed) && !shouldBeEnabled) { // Disabled deduping CompletableFuture future = new CompletableFuture<>(); status = Status.Removing; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java index f36caf87e6c74..bc8af54570d16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java @@ -56,6 +56,7 @@ public class BrokerMessageDeduplicationTest { private ManagedLedger managedLedger; + private PersistentTopic topic; private MessageDeduplication deduplication; private ScheduledExecutorService executor; @@ -68,9 +69,9 @@ public void setUp() { executor = Executors.newScheduledThreadPool(1, new ExecutorProvider.ExtendedThreadFactory("pulsar")); doReturn(executor).when(pulsarService).getExecutor(); managedLedger = mock(ManagedLedger.class); - final var mockTopic = mock(PersistentTopic.class); - doReturn(true).when(mockTopic).isDeduplicationEnabled(); - deduplication = spy(new MessageDeduplication(pulsarService, mockTopic, managedLedger)); + topic = mock(PersistentTopic.class); + doReturn(true).when(topic).isDeduplicationEnabled(); + deduplication = spy(new MessageDeduplication(pulsarService, topic, managedLedger)); doReturn(true).when(deduplication).isEnabled(); } @@ -163,6 +164,83 @@ public void checkStatusDoesNotStartMultipleRecoveries() throws Exception { verify(managedLedger, times(1)).asyncOpenCursor(any(), any(), any()); } + @Test + public void checkStatusDisablesAfterRecoveryCompletesWhenPolicyChanges() throws Exception { + final var cursor = mock(ManagedCursor.class); + final var shouldEnableDeduplication = new AtomicReference<>(true); + final var openCursorCallback = new AtomicReference(); + doAnswer(invocation -> shouldEnableDeduplication.get()).when(topic).isDeduplicationEnabled(); + doAnswer(invocation -> { + openCursorCallback.set(invocation.getArgument(1)); + return null; + }).when(managedLedger).asyncOpenCursor(any(), any(), any()); + doAnswer(invocation -> { + ((AsyncCallbacks.DeleteCursorCallback) invocation.getArgument(1)).deleteCursorComplete(null); + return null; + }).when(managedLedger).asyncDeleteCursor(any(), any(), any()); + doReturn(Map.of("from-snapshot", 10L)).when(cursor).getProperties(); + doReturn(false).when(cursor).hasMoreEntries(); + + final var enableFuture = deduplication.checkStatus(); + assertFalse(enableFuture.isDone()); + assertEquals(String.valueOf(deduplication.getStatus()), "Recovering"); + + shouldEnableDeduplication.set(false); + final var disableFuture = deduplication.checkStatus(); + assertFalse(disableFuture.isDone()); + verify(managedLedger, times(1)).asyncOpenCursor(any(), any(), any()); + + openCursorCallback.get().openCursorComplete(cursor, null); + enableFuture.get(3, TimeUnit.SECONDS); + disableFuture.get(3, TimeUnit.SECONDS); + + assertEquals(String.valueOf(deduplication.getStatus()), "Disabled"); + assertEquals(deduplication.getLastPublishedSequenceId("from-snapshot"), -1L); + verify(managedLedger, times(1)).asyncDeleteCursor(any(), any(), any()); + } + + @Test + public void checkStatusDisablesAfterRecoveryFailsWhenPolicyChanges() throws Exception { + final var cursor = mock(ManagedCursor.class); + final var shouldEnableDeduplication = new AtomicReference<>(true); + final var openCursorCallback = new AtomicReference(); + doAnswer(invocation -> shouldEnableDeduplication.get()).when(topic).isDeduplicationEnabled(); + doAnswer(invocation -> { + openCursorCallback.set(invocation.getArgument(1)); + return null; + }).when(managedLedger).asyncOpenCursor(any(), any(), any()); + doAnswer(invocation -> { + ((AsyncCallbacks.DeleteCursorCallback) invocation.getArgument(1)).deleteCursorComplete(null); + return null; + }).when(managedLedger).asyncDeleteCursor(any(), any(), any()); + doReturn(Map.of()).when(cursor).getProperties(); + doReturn(true).when(cursor).hasMoreEntries(); + doAnswer(invocation -> { + throw new RuntimeException("asyncReadEntries failed"); + }).when(cursor).asyncReadEntries(anyInt(), anyLong(), any(), any(), any()); + + final var enableFuture = deduplication.checkStatus(); + assertFalse(enableFuture.isDone()); + assertEquals(String.valueOf(deduplication.getStatus()), "Recovering"); + + shouldEnableDeduplication.set(false); + final var disableFuture = deduplication.checkStatus(); + assertFalse(disableFuture.isDone()); + + openCursorCallback.get().openCursorComplete(cursor, null); + try { + enableFuture.get(3, TimeUnit.SECONDS); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof RuntimeException); + assertTrue(e.getMessage().contains("asyncReadEntries failed")); + } + disableFuture.get(3, TimeUnit.SECONDS); + + assertEquals(String.valueOf(deduplication.getStatus()), "Disabled"); + verify(managedLedger, times(1)).asyncDeleteCursor(any(), any(), any()); + } + @Test public void checkStatusRetriesAfterFailedEnable() throws Exception { final var cursor = mock(ManagedCursor.class);