Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ public CompletableFuture<Void> checkStatus() {
boolean shouldBeEnabled = topic.isDeduplicationEnabled();
synchronized (this) {
if (status == Status.Recovering) {
if (!shouldBeEnabled) {
return statusChangeFuture.handle((__, e) -> null).thenCompose(__ -> checkStatus());
}
return statusChangeFuture;
}
if (status == Status.Removing) {
Expand Down Expand Up @@ -191,7 +194,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
}, null);
}

if (status == Status.Enabled && !shouldBeEnabled) {
if ((status == Status.Enabled || status == Status.Failed) && !shouldBeEnabled) {
// Disabled deduping
CompletableFuture<Void> future = new CompletableFuture<>();
status = Status.Removing;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
public class BrokerMessageDeduplicationTest {

private ManagedLedger managedLedger;
private PersistentTopic topic;
private MessageDeduplication deduplication;
private ScheduledExecutorService executor;

Expand All @@ -68,9 +69,9 @@ public void setUp() {
executor = Executors.newScheduledThreadPool(1, new ExecutorProvider.ExtendedThreadFactory("pulsar"));
doReturn(executor).when(pulsarService).getExecutor();
managedLedger = mock(ManagedLedger.class);
final var mockTopic = mock(PersistentTopic.class);
doReturn(true).when(mockTopic).isDeduplicationEnabled();
deduplication = spy(new MessageDeduplication(pulsarService, mockTopic, managedLedger));
topic = mock(PersistentTopic.class);
doReturn(true).when(topic).isDeduplicationEnabled();
deduplication = spy(new MessageDeduplication(pulsarService, topic, managedLedger));
doReturn(true).when(deduplication).isEnabled();
}

Expand Down Expand Up @@ -163,6 +164,83 @@ public void checkStatusDoesNotStartMultipleRecoveries() throws Exception {
verify(managedLedger, times(1)).asyncOpenCursor(any(), any(), any());
}

@Test
public void checkStatusDisablesAfterRecoveryCompletesWhenPolicyChanges() throws Exception {
final var cursor = mock(ManagedCursor.class);
final var shouldEnableDeduplication = new AtomicReference<>(true);
final var openCursorCallback = new AtomicReference<AsyncCallbacks.OpenCursorCallback>();
doAnswer(invocation -> shouldEnableDeduplication.get()).when(topic).isDeduplicationEnabled();
doAnswer(invocation -> {
openCursorCallback.set(invocation.getArgument(1));
return null;
}).when(managedLedger).asyncOpenCursor(any(), any(), any());
doAnswer(invocation -> {
((AsyncCallbacks.DeleteCursorCallback) invocation.getArgument(1)).deleteCursorComplete(null);
return null;
}).when(managedLedger).asyncDeleteCursor(any(), any(), any());
doReturn(Map.of("from-snapshot", 10L)).when(cursor).getProperties();
doReturn(false).when(cursor).hasMoreEntries();

final var enableFuture = deduplication.checkStatus();
assertFalse(enableFuture.isDone());
assertEquals(String.valueOf(deduplication.getStatus()), "Recovering");

shouldEnableDeduplication.set(false);
final var disableFuture = deduplication.checkStatus();
assertFalse(disableFuture.isDone());
verify(managedLedger, times(1)).asyncOpenCursor(any(), any(), any());

openCursorCallback.get().openCursorComplete(cursor, null);
enableFuture.get(3, TimeUnit.SECONDS);
disableFuture.get(3, TimeUnit.SECONDS);

assertEquals(String.valueOf(deduplication.getStatus()), "Disabled");
assertEquals(deduplication.getLastPublishedSequenceId("from-snapshot"), -1L);
verify(managedLedger, times(1)).asyncDeleteCursor(any(), any(), any());
}

@Test
public void checkStatusDisablesAfterRecoveryFailsWhenPolicyChanges() throws Exception {
final var cursor = mock(ManagedCursor.class);
final var shouldEnableDeduplication = new AtomicReference<>(true);
final var openCursorCallback = new AtomicReference<AsyncCallbacks.OpenCursorCallback>();
doAnswer(invocation -> shouldEnableDeduplication.get()).when(topic).isDeduplicationEnabled();
doAnswer(invocation -> {
openCursorCallback.set(invocation.getArgument(1));
return null;
}).when(managedLedger).asyncOpenCursor(any(), any(), any());
doAnswer(invocation -> {
((AsyncCallbacks.DeleteCursorCallback) invocation.getArgument(1)).deleteCursorComplete(null);
return null;
}).when(managedLedger).asyncDeleteCursor(any(), any(), any());
doReturn(Map.of()).when(cursor).getProperties();
doReturn(true).when(cursor).hasMoreEntries();
doAnswer(invocation -> {
throw new RuntimeException("asyncReadEntries failed");
}).when(cursor).asyncReadEntries(anyInt(), anyLong(), any(), any(), any());

final var enableFuture = deduplication.checkStatus();
assertFalse(enableFuture.isDone());
assertEquals(String.valueOf(deduplication.getStatus()), "Recovering");

shouldEnableDeduplication.set(false);
final var disableFuture = deduplication.checkStatus();
assertFalse(disableFuture.isDone());

openCursorCallback.get().openCursorComplete(cursor, null);
try {
enableFuture.get(3, TimeUnit.SECONDS);
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof RuntimeException);
assertTrue(e.getMessage().contains("asyncReadEntries failed"));
}
disableFuture.get(3, TimeUnit.SECONDS);

assertEquals(String.valueOf(deduplication.getStatus()), "Disabled");
verify(managedLedger, times(1)).asyncDeleteCursor(any(), any(), any());
}

@Test
public void checkStatusRetriesAfterFailedEnable() throws Exception {
final var cursor = mock(ManagedCursor.class);
Expand Down
Loading