diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 18a662c4b7a38..2735a3bf6363a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1706,37 +1706,55 @@ private CompletableFuture checkReplicationCluster(String remoteCluster) protected CompletableFuture addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) { - return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService) - .thenCompose(__ -> checkReplicationCluster(remoteCluster)) - .thenCompose(clusterExists -> { - if (!clusterExists) { - log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster); - return removeReplicator(remoteCluster).thenApply(__ -> null); - } - return brokerService.pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(remoteCluster) - .thenApply(clusterData -> - brokerService.getReplicationClient(remoteCluster, clusterData)); - }) - .thenAccept(replicationClient -> { - if (replicationClient == null) { - return; + CompletableFuture replicationStartFuture = new CompletableFuture<>(); + AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService) + .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources() + .getClusterAsync(remoteCluster) + .thenApply(clusterData -> + brokerService.getReplicationClient(remoteCluster, clusterData))) + .thenAccept(replicationClient -> { + Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { + try { + return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, + remoteCluster, brokerService, (PulsarClientImpl) replicationClient); + } catch (PulsarServerException e) { + log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); } - Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { - try { - return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, - remoteCluster, brokerService, (PulsarClientImpl) replicationClient); - } catch (PulsarServerException e) { - log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); + return null; + }); + // clean up replicator if startup is failed + if (replicator == null) { + replicators.removeNullValue(remoteCluster); + } + }).whenComplete((ignore, ex) -> { + if (ex == null){ + replicationStartFuture.complete(null); + } else { + checkReplicationCluster(remoteCluster).thenCompose(clusterExists -> { + if (!clusterExists) { + log.warn("[{}] Start remove the replicator because the cluster '{}' does not exist", + topic, remoteCluster); + return removeReplicator(remoteCluster).whenComplete((ignore2, removeCursorEx) -> { + if (removeCursorEx != null) { + log.error("[{}] Remove the cursor of replicator[{}] is failed, please reload topic." + , topic, remoteCluster); + replicationStartFuture.failedFuture(removeCursorEx); + } else { + log.warn("[{}] Remove the cursor of replicator[{}] successfully", + topic, remoteCluster); + replicationStartFuture.complete(null); + } + }); + } else { + // Start replication is failed. + log.error("[{}] The replicator startup failed {}", topic, remoteCluster, ex); + replicationStartFuture.completeExceptionally(ex); + return CompletableFuture.completedFuture(null); } - return null; }); - - // clean up replicator if startup is failed - if (replicator == null) { - replicators.removeNullValue(remoteCluster); - } - }); + } + }); + return replicationStartFuture; } CompletableFuture removeReplicator(String remoteCluster) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index c63be7aad01cd..7b729bf618650 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -39,6 +39,7 @@ import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -558,7 +559,10 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) admin.tenants().updateTenant("prop", tenantInfo); if (topicLevelPolicy) { - admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster)); + // setReplicationClusters may fail, so do retry. + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster)); + }); } else { admin.namespaces().setNamespaceReplicationClustersAsync( namespace, Collections.singleton(remoteCluster)).get(); @@ -584,8 +588,13 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) admin.clusters().deleteCluster(remoteCluster); // Now the cluster and its related policy has been removed but the replicator cursor still exists - topic.initialize().get(3, TimeUnit.SECONDS); - Awaitility.await().atMost(3, TimeUnit.SECONDS) - .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext()); + // Verify: + // 1. Topic can load success. If the topic loading by client is failed, it will retry, + // so we can do retry "initialize topic". + // 2. The repl cursor will be deleted. + Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + topic.initialize().get(3, TimeUnit.SECONDS); + assertFalse(topic.getManagedLedger().getCursors().iterator().hasNext()); + }); } }