From 9dd077ebbf1ed132bcaab7ee1813483d7933a285 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Apr 2023 12:43:16 +0800 Subject: [PATCH 1/8] [fix] [broker] Ignore and remove the replicator cursor when the remote cluster is absent --- .../service/persistent/PersistentTopic.java | 72 +++++++++++-------- .../persistent/PersistentTopicTest.java | 10 +-- 2 files changed, 49 insertions(+), 33 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 18a662c4b7a38..85cc76b776264 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1706,37 +1706,53 @@ private CompletableFuture checkReplicationCluster(String remoteCluster) protected CompletableFuture addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) { - return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService) - .thenCompose(__ -> checkReplicationCluster(remoteCluster)) - .thenCompose(clusterExists -> { - if (!clusterExists) { - log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster); - return removeReplicator(remoteCluster).thenApply(__ -> null); - } - return brokerService.pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(remoteCluster) - .thenApply(clusterData -> - brokerService.getReplicationClient(remoteCluster, clusterData)); - }) - .thenAccept(replicationClient -> { - if (replicationClient == null) { - return; + CompletableFuture replicationStartFuture = new CompletableFuture<>(); + AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService) + .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources() + .getClusterAsync(remoteCluster) + .thenApply(clusterData -> + brokerService.getReplicationClient(remoteCluster, clusterData))) + .thenAccept(replicationClient -> { + Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { + try { + return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, + remoteCluster, brokerService, (PulsarClientImpl) replicationClient); + } catch (PulsarServerException e) { + log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); } - Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { - try { - return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, - remoteCluster, brokerService, (PulsarClientImpl) replicationClient); - } catch (PulsarServerException e) { - log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); + return null; + }); + // clean up replicator if startup is failed + if (replicator == null) { + replicators.removeNullValue(remoteCluster); + } + }).whenComplete((ignore, ex) -> { + if (ex == null){ + replicationStartFuture.complete(null); + } else { + checkReplicationCluster(remoteCluster).thenCompose(clusterExists -> { + if (!clusterExists) { + log.warn("[{}] Start remove the replicator because the cluster '{}' does not exist", + topic, remoteCluster); + replicationStartFuture.complete(null); + return removeReplicator(remoteCluster).whenComplete((ignore2, remoteCursorEx) -> { + if (remoteCursorEx != null) { + log.error("[{}] Remove the cursor of replicator[{}] is failed, please reload topic." + , topic, remoteCluster); + } else { + log.warn("[{}] Remove the cursor of replicator[{}] successfully", + topic, remoteCluster); + } + }); + } else { + // Start replication is failed. + replicationStartFuture.completeExceptionally(ex); + return CompletableFuture.completedFuture(null); } - return null; }); - - // clean up replicator if startup is failed - if (replicator == null) { - replicators.removeNullValue(remoteCluster); - } - }); + } + }); + return replicationStartFuture; } CompletableFuture removeReplicator(String remoteCluster) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index c63be7aad01cd..80a28e8f5990f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -39,6 +39,7 @@ import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -558,10 +559,10 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) admin.tenants().updateTenant("prop", tenantInfo); if (topicLevelPolicy) { - admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster)); + admin.topics().setReplicationClusters(topicName, Arrays.asList(remoteCluster, "test")); } else { admin.namespaces().setNamespaceReplicationClustersAsync( - namespace, Collections.singleton(remoteCluster)).get(); + namespace, new HashSet<>(Arrays.asList(remoteCluster, "test"))).get(); } final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false) @@ -577,13 +578,12 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster)); if (topicLevelPolicy) { - admin.topics().setReplicationClusters(topicName, Collections.emptyList()); + admin.topics().setReplicationClusters(topicName, Collections.singletonList("test")); } else { - admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get(); + admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.singleton("test")).get(); } admin.clusters().deleteCluster(remoteCluster); // Now the cluster and its related policy has been removed but the replicator cursor still exists - topic.initialize().get(3, TimeUnit.SECONDS); Awaitility.await().atMost(3, TimeUnit.SECONDS) .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext()); From 903466c9006de70f207d3155640f75b4a0677340 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Apr 2023 18:35:27 +0800 Subject: [PATCH 2/8] fix flaky test --- .../persistent/PersistentTopicTest.java | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 80a28e8f5990f..d7a7c6421525d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -559,7 +559,15 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) admin.tenants().updateTenant("prop", tenantInfo); if (topicLevelPolicy) { - admin.topics().setReplicationClusters(topicName, Arrays.asList(remoteCluster, "test")); + // setReplicationClusters may fail, so do retry. + Awaitility.await().until(() -> { + try { + admin.topics().setReplicationClusters(topicName, Arrays.asList(remoteCluster, "test")); + return true; + } catch (Exception ex) { + return false; + } + }); } else { admin.namespaces().setNamespaceReplicationClustersAsync( namespace, new HashSet<>(Arrays.asList(remoteCluster, "test"))).get(); @@ -584,8 +592,15 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) } admin.clusters().deleteCluster(remoteCluster); // Now the cluster and its related policy has been removed but the replicator cursor still exists - topic.initialize().get(3, TimeUnit.SECONDS); - Awaitility.await().atMost(3, TimeUnit.SECONDS) - .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext()); + + // Verify: + // 1. Topic can load success( use "topic.initialize()" instead of "load topic" ). + // If the topic loading by client is failed, it will retry, so we can do retry "topic.initialize()". + // 2. The repl cursor will be deleted. + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .until(() -> { + topic.initialize().get(3, TimeUnit.SECONDS); + return !topic.getManagedLedger().getCursors().iterator().hasNext(); + }); } } From 1ce4fb9ea9c4d2fa44f4f30933a04cee8eb0d2b6 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Apr 2023 21:07:47 +0800 Subject: [PATCH 3/8] address comments --- .../broker/service/persistent/PersistentTopic.java | 8 +++++--- .../broker/service/persistent/PersistentTopicTest.java | 9 ++------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 85cc76b776264..91e9bb3a8c8e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1734,17 +1734,19 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma if (!clusterExists) { log.warn("[{}] Start remove the replicator because the cluster '{}' does not exist", topic, remoteCluster); - replicationStartFuture.complete(null); - return removeReplicator(remoteCluster).whenComplete((ignore2, remoteCursorEx) -> { - if (remoteCursorEx != null) { + return removeReplicator(remoteCluster).whenComplete((ignore2, removeCursorEx) -> { + if (removeCursorEx != null) { log.error("[{}] Remove the cursor of replicator[{}] is failed, please reload topic." , topic, remoteCluster); + replicationStartFuture.failedFuture(removeCursorEx); } else { log.warn("[{}] Remove the cursor of replicator[{}] successfully", topic, remoteCluster); + replicationStartFuture.complete(null); } }); } else { + log.info("===> {}", ex.getMessage()); // Start replication is failed. replicationStartFuture.completeExceptionally(ex); return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index d7a7c6421525d..8b71a62e8d85b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -560,13 +560,8 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) if (topicLevelPolicy) { // setReplicationClusters may fail, so do retry. - Awaitility.await().until(() -> { - try { - admin.topics().setReplicationClusters(topicName, Arrays.asList(remoteCluster, "test")); - return true; - } catch (Exception ex) { - return false; - } + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + admin.topics().setReplicationClusters(topicName, Arrays.asList(remoteCluster, "test")); }); } else { admin.namespaces().setNamespaceReplicationClustersAsync( From 20f8a39cfb19d0873a563f22c00557595a9e14b7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Apr 2023 21:12:56 +0800 Subject: [PATCH 4/8] address comment --- .../pulsar/broker/service/persistent/PersistentTopic.java | 1 - .../broker/service/persistent/PersistentTopicTest.java | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 91e9bb3a8c8e0..67ebee0cc2de3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1746,7 +1746,6 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma } }); } else { - log.info("===> {}", ex.getMessage()); // Start replication is failed. replicationStartFuture.completeExceptionally(ex); return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 8b71a62e8d85b..c2fdcfdd76f1d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -592,10 +592,9 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) // 1. Topic can load success( use "topic.initialize()" instead of "load topic" ). // If the topic loading by client is failed, it will retry, so we can do retry "topic.initialize()". // 2. The repl cursor will be deleted. - Awaitility.await().atMost(10, TimeUnit.SECONDS) - .until(() -> { + Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { topic.initialize().get(3, TimeUnit.SECONDS); - return !topic.getManagedLedger().getCursors().iterator().hasNext(); + assertFalse(topic.getManagedLedger().getCursors().iterator().hasNext()); }); } } From 5e1552260738b77a954081bf0a69ec4959555624 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Apr 2023 21:21:37 +0800 Subject: [PATCH 5/8] code format --- .../broker/service/persistent/PersistentTopicTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index c2fdcfdd76f1d..20252bfff3b45 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -593,8 +593,8 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) // If the topic loading by client is failed, it will retry, so we can do retry "topic.initialize()". // 2. The repl cursor will be deleted. Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { - topic.initialize().get(3, TimeUnit.SECONDS); - assertFalse(topic.getManagedLedger().getCursors().iterator().hasNext()); - }); + topic.initialize().get(3, TimeUnit.SECONDS); + assertFalse(topic.getManagedLedger().getCursors().iterator().hasNext()); + }); } } From 593e66c2bab0b5d30c08ec9aead3f3db7eab5418 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Apr 2023 22:31:40 +0800 Subject: [PATCH 6/8] add logs --- .../apache/pulsar/broker/service/persistent/PersistentTopic.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 67ebee0cc2de3..2735a3bf6363a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1747,6 +1747,7 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma }); } else { // Start replication is failed. + log.error("[{}] The replicator startup failed {}", topic, remoteCluster, ex); replicationStartFuture.completeExceptionally(ex); return CompletableFuture.completedFuture(null); } From dd5aafad995dcf4385aa0fc95f140667cbc4b829 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 7 Apr 2023 00:02:02 +0800 Subject: [PATCH 7/8] revert the changes which not effected of the flaky test --- .../pulsar/broker/service/persistent/PersistentTopicTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 20252bfff3b45..f99dba5cd0e7b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -561,11 +561,11 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) if (topicLevelPolicy) { // setReplicationClusters may fail, so do retry. Awaitility.await().ignoreExceptions().untilAsserted(() -> { - admin.topics().setReplicationClusters(topicName, Arrays.asList(remoteCluster, "test")); + admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster)); }); } else { admin.namespaces().setNamespaceReplicationClustersAsync( - namespace, new HashSet<>(Arrays.asList(remoteCluster, "test"))).get(); + namespace, Collections.singleton(remoteCluster)).get(); } final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false) From 395f40c0572829bfc700b5e57e29951adc08ccf5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 7 Apr 2023 00:04:48 +0800 Subject: [PATCH 8/8] revert the changes which not effected of the flaky test --- .../broker/service/persistent/PersistentTopicTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index f99dba5cd0e7b..7b729bf618650 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -581,16 +581,16 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster)); if (topicLevelPolicy) { - admin.topics().setReplicationClusters(topicName, Collections.singletonList("test")); + admin.topics().setReplicationClusters(topicName, Collections.emptyList()); } else { - admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.singleton("test")).get(); + admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get(); } admin.clusters().deleteCluster(remoteCluster); // Now the cluster and its related policy has been removed but the replicator cursor still exists // Verify: - // 1. Topic can load success( use "topic.initialize()" instead of "load topic" ). - // If the topic loading by client is failed, it will retry, so we can do retry "topic.initialize()". + // 1. Topic can load success. If the topic loading by client is failed, it will retry, + // so we can do retry "initialize topic". // 2. The repl cursor will be deleted. Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { topic.initialize().get(3, TimeUnit.SECONDS);