-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent #19972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -1696,14 +1696,32 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { | |||
| return future; | ||||
| } | ||||
|
|
||||
| private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster) { | ||||
| return brokerService.getPulsar().getPulsarResources().getNamespaceResources() | ||||
| .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) | ||||
| .thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters) | ||||
| .orElse(Collections.emptySet()).contains(remoteCluster) | ||||
| || topicPolicies.getReplicationClusters().get().contains(remoteCluster)); | ||||
| } | ||||
|
|
||||
| protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor, | ||||
| String localCluster) { | ||||
| return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService) | ||||
| .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources() | ||||
| .getClusterAsync(remoteCluster) | ||||
| .thenApply(clusterData -> | ||||
| brokerService.getReplicationClient(remoteCluster, clusterData))) | ||||
| .thenCompose(__ -> checkReplicationCluster(remoteCluster)) | ||||
| .thenCompose(clusterExists -> { | ||||
| if (!clusterExists) { | ||||
| log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster); | ||||
| return removeReplicator(remoteCluster).thenApply(__ -> null); | ||||
|
Comment on lines
+1710
to
+1714
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we clean up when the Policy is removed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I missed this comment when I merged. And yes, we should do that as well. WDYT? @poorbarcode
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the policy is removed, the cursor will not be clean If this topic is not online. see pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java Line 1589 in d1fc732
|
||||
| } | ||||
| return brokerService.pulsar().getPulsarResources().getClusterResources() | ||||
| .getClusterAsync(remoteCluster) | ||||
| .thenApply(clusterData -> | ||||
| brokerService.getReplicationClient(remoteCluster, clusterData)); | ||||
| }) | ||||
| .thenAccept(replicationClient -> { | ||||
| if (replicationClient == null) { | ||||
| return; | ||||
| } | ||||
| Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { | ||||
| try { | ||||
| return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, | ||||
|
|
@@ -1727,8 +1745,8 @@ CompletableFuture<Void> removeReplicator(String remoteCluster) { | |||
|
|
||||
| String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); | ||||
|
|
||||
| replicators.get(remoteCluster).disconnect().thenRun(() -> { | ||||
|
|
||||
| Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect) | ||||
| .orElse(CompletableFuture.completedFuture(null)).thenRun(() -> { | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think not. See public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}so |
||||
| ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { | ||||
| @Override | ||||
| public void deleteCursorComplete(Object ctx) { | ||||
|
|
||||
Uh oh!
There was an error while loading. Please reload this page.