From 635bdbc0918de8d2ef8bb72d8546ee54882afd37 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Jun 2026 22:17:46 +0800 Subject: [PATCH 01/10] [fix][broker]Do not call topic GC if the replication is still active --- .../pulsar/broker/ServiceConfiguration.java | 11 ++++ .../pulsar/broker/service/AbstractTopic.java | 47 ++++++++++++-- .../pulsar/broker/service/BrokerService.java | 14 ++++- .../nonpersistent/NonPersistentTopic.java | 12 +++- .../service/persistent/PersistentTopic.java | 5 +- .../OneWayReplicatorUsingGlobalZKTest.java | 61 +++++++++++++++++++ 6 files changed, 139 insertions(+), 11 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e5ecea2c44b7a..78eb9e8321b78 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -812,6 +812,17 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null; + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Time in seconds since a replicator's producer last sent a message, after which the replicator" + + " is considered inactive. When inactive-topic deletion is enabled (via" + + " brokerDeleteInactiveTopicsEnabled, or enabled by namespace/topic policy), a replicated" + + " topic is eligible for deletion only once all its replicators have been inactive for this" + + " duration." + ) + private Integer brokerReplicationInactiveThresholdSeconds = 24 * 3600; + @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 3ad9eb438104a..889f4a650eac9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -130,6 +130,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { // Timestamp of when this topic was last seen active protected volatile long lastActive; + // Timestamp of when the latest producer was disconnected. + protected volatile Long localProducersEmptyTime; // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which // doesn't support batch-message @@ -665,16 +667,50 @@ protected Consumer getActiveConsumer(Subscription subscription) { return null; } - protected boolean hasLocalProducers() { - if (producers.isEmpty()) { - return false; + public abstract CompletableFuture closeReplProducersIfNoBacklog(); + + public abstract CompletableFuture startReplProducers(); + + public void disconnectReplicatorIfNoTrafficForLongTime() { + updateLocalProducersEmptyTime(); + + final Long cachedTime = localProducersEmptyTime; + if (cachedTime == null) { + return; + } + long threshold = brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds() * 1000; + if (System.currentTimeMillis() - cachedTime > threshold) { + closeReplProducersIfNoBacklog().whenCompleteAsync((__, ex) -> { + // Here, "!=" is used instead of the "!equals()" to avoid the problem of "inability to compare due + // to multiple changes in this attribute within a short period of time". + if (cachedTime != localProducersEmptyTime) { + log.warn().log("Restart replication producers since new producers were registered concurently" + + " when closing producers."); + startReplProducers().whenComplete((ignore, ex2) -> { + log.error().exception(ex2).log("Failed to start replication producers after registered" + + " new producers"); + }); + } + }); + } + } + + protected void updateLocalProducersEmptyTime() { + // The timestamp has been set. + if (localProducersEmptyTime != null) { + return; } + // Only contains remote producer | producers is empty: update the time. for (Producer producer : producers.values()) { if (!producer.isRemote()) { - return true; + return; } } - return false; + localProducersEmptyTime = System.currentTimeMillis(); + } + + protected boolean hasProducersActive() { + return !producers.isEmpty(); } @Override @@ -1014,6 +1050,7 @@ protected CompletableFuture internalAddProducer(Producer producer) { if (existProducer != null) { return tryOverwriteOldProducer(existProducer, producer); } else if (!producer.isRemote()) { + localProducersEmptyTime = null; USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this); } return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d180048b1b316..0872a0a4c917e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -741,6 +741,10 @@ protected void startInactivityMonitor() { int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds(); inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> checkGC(), interval, interval, TimeUnit.SECONDS); + if (pulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds() > 0) { + inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> checkInactiveReplication(), interval, + interval, TimeUnit.SECONDS); + } } // Deduplication info checker @@ -1531,7 +1535,7 @@ private CompletableFuture> createNonPersistentTopic(String topic .attr("topic", topic) .exceptionMessage(ex.getCause()) .log("Replication check failed. Removing topic from topics list"); - nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { + nonPersistentTopic.closeReplProducersIfNoBacklog().whenComplete((v, exception) -> { topicFuture.completeExceptionally(ex); }); return null; @@ -2411,6 +2415,14 @@ public void checkGC() { forEachTopic(Topic::checkGC); } + public void checkInactiveReplication() { + forEachTopic(topic -> { + if (topic instanceof AbstractTopic abstractTopic) { + abstractTopic.disconnectReplicatorIfNoTrafficForLongTime(); + } + }); + } + public void checkClusterMigration() { forEachTopic(Topic::checkClusterMigration); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 8043199897a89..24ff7ac044b68 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -598,12 +598,18 @@ public CompletableFuture close( return closeFuture; } - public CompletableFuture stopReplProducers() { + public CompletableFuture closeReplProducersIfNoBacklog() { List> closeFutures = new ArrayList<>(); replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate())); return FutureUtil.waitForAll(closeFutures); } + @Override + public CompletableFuture startReplProducers() { + replicators.forEach((region, replicator) -> replicator.startProducer()); + return CompletableFuture.completedFuture(null); + } + @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); @@ -1025,7 +1031,7 @@ public CompletableFuture getInternalStats(boolean public boolean isActive() { // No local consumers and no local producers - return !subscriptions.isEmpty() || hasLocalProducers(); + return !subscriptions.isEmpty() || hasProducersActive(); } @Override @@ -1088,7 +1094,7 @@ public void checkGC() { .attr("maxInactiveDurationInSec", maxInactiveDurationInSec) .log("Topic inactive for seconds, closing repl producers."); - stopReplProducers().thenCompose(v -> delete(true, false)) + closeReplProducersIfNoBacklog().thenCompose(v -> delete(true, false)) .thenCompose(__ -> tryToDeletePartitionedMetadata()) .thenRun(() -> log.info("Topic deleted successfully due to inactivity")) .exceptionally(e -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c9bcad341fcd7..0601d396f6b6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -888,6 +888,7 @@ private boolean hasRemoteProducers() { return false; } + @Override public CompletableFuture startReplProducers() { // read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close return brokerService.pulsar().getPulsarResources().getNamespaceResources() @@ -921,7 +922,7 @@ public CompletableFuture stopReplProducers() { return FutureUtil.waitForAll(closeFutures); } - private synchronized CompletableFuture closeReplProducersIfNoBacklog() { + public synchronized CompletableFuture closeReplProducersIfNoBacklog() { List> closeFutures = new ArrayList<>(); replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect())); shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect())); @@ -3355,7 +3356,7 @@ public boolean isActive(InactiveTopicDeleteMode deleteMode) { break; } // no local producers - return hasLocalProducers(); + return hasProducersActive(); } private boolean hasBacklogs(boolean getPreciseBacklog) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 6bbc09d62ba36..e7dff52202ad4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -54,6 +54,8 @@ import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; @@ -823,6 +825,65 @@ public void testSystemTopicCreationWithDifferentTopicCreationRule(int localSyste admin2.topics().delete(tp, false); } + @Test + public void testTopicGCDoesNotDisconnectReplicatorWhenRemoteProducerIsActive() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topic); + Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create(); + + try { + producer1.send("msg-1"); + waitReplicatorStarted(topic, pulsar2); + PersistentTopic persistentTopic2 = (PersistentTopic) broker2 + .getTopic(topic, false).join().get(); + + // Set inactive policies. + InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(); + inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); + inactiveTopicPolicies.setMaxInactiveDurationSeconds(60); + inactiveTopicPolicies.setDeleteWhileInactive(true); + admin2.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies); + + // Ensure policies were set successfully. + Awaitility.await().untilAsserted(() -> { + assertFalse(persistentTopic2.getProducers().values().stream() + .anyMatch(producer -> !producer.isRemote())); + assertTrue(persistentTopic2.getSubscriptions().isEmpty()); + assertTrue(persistentTopic2.getInactiveTopicPolicies().isDeleteWhileInactive()); + assertEquals(persistentTopic2.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 60); + + Replicator replicator = persistentTopic2.getReplicators().get(cluster1); + assertNotNull(replicator); + assertTrue(replicator.isConnected()); + assertEquals(replicator.getNumberOfEntriesInBacklog(), 0); + }); + + // Trigger GC. + persistentTopic2.checkGC(); + + // Verify: the replication is not disconnected due to Topic GC. + Awaitility.await().during(Duration.ofSeconds(120)).atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + Replicator replicator = persistentTopic2.getReplicators().get(cluster1); + assertNotNull(replicator); + assertTrue(replicator.isConnected()); + }); + // Verify: the replication still works. + producer1.send("msg-2"); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin2.topics().getStats(topic).getReplication().get(cluster1).getReplicationBacklog(), 0); + }); + + } finally { + producer1.close(); + admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1)); + admin2.topics().setReplicationClusters(topic, Arrays.asList(cluster2)); + waitReplicatorStopped(topic, pulsar1, pulsar2, false); + waitReplicatorStopped(topic, pulsar2, pulsar1, false); + admin1.topics().delete(topic, false); + admin2.topics().delete(topic, false); + } + } + @Test public void testUpdateNamespacePolicies() throws Exception { // Create a namespace and allow both clusters to access. From 0c2ce19fa12f991c3daea665f0969c821def311c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Jun 2026 23:28:21 +0800 Subject: [PATCH 02/10] improve tests --- .../pulsar/broker/ServiceConfiguration.java | 15 +++-- .../pulsar/broker/service/AbstractTopic.java | 14 +++-- .../broker/service/OneWayReplicatorTest.java | 57 +++++++++++++++++++ .../OneWayReplicatorUsingGlobalZKTest.java | 25 +++++--- 4 files changed, 92 insertions(+), 19 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 78eb9e8321b78..b63db3dcba6b1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -815,11 +815,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( category = CATEGORY_POLICIES, dynamic = true, - doc = "Time in seconds since a replicator's producer last sent a message, after which the replicator" - + " is considered inactive. When inactive-topic deletion is enabled (via" - + " brokerDeleteInactiveTopicsEnabled, or enabled by namespace/topic policy), a replicated" - + " topic is eligible for deletion only once all its replicators have been inactive for this" - + " duration." + doc = "Time in seconds that a topic may have no local producers before the broker considers its outbound" + + " replication producers idle. The timer starts when the inactive-replication check first observes" + + " that the topic has no local producers; remote producers created by replication from another" + + " cluster do not reset this timer. When the threshold is exceeded and the replication backlog is" + + " clear, the broker disconnects the topic's replication producers to release idle replication" + + " resources. A connected remote producer still makes the topic active for inactive-topic GC, so" + + " the topic is not deleted only because local producers are absent; deletion is still controlled" + + " by brokerDeleteInactiveTopicsEnabled and the namespace/topic inactive-topic policies. The check" + + " runs with the inactive-topic monitor, whose interval is" + + " brokerDeleteInactiveTopicsFrequencySeconds. The default is 86400 seconds (24 hours)." ) private Integer brokerReplicationInactiveThresholdSeconds = 24 * 3600; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 889f4a650eac9..d73cb11ccaaaf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -678,17 +678,21 @@ public void disconnectReplicatorIfNoTrafficForLongTime() { if (cachedTime == null) { return; } - long threshold = brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds() * 1000; - if (System.currentTimeMillis() - cachedTime > threshold) { + int threshold = brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds(); + if (System.currentTimeMillis() - cachedTime > threshold * 1000L) { + log.info().attr("brokerReplicationInactiveThresholdSeconds", threshold) + .log("Disconnecting replication producers since no producer is active for a long time."); closeReplProducersIfNoBacklog().whenCompleteAsync((__, ex) -> { // Here, "!=" is used instead of the "!equals()" to avoid the problem of "inability to compare due // to multiple changes in this attribute within a short period of time". if (cachedTime != localProducersEmptyTime) { - log.warn().log("Restart replication producers since new producers were registered concurently" + log.warn().log("Restart replication producers since new producers were registered concurrently" + " when closing producers."); startReplProducers().whenComplete((ignore, ex2) -> { - log.error().exception(ex2).log("Failed to start replication producers after registered" - + " new producers"); + if (ex2 != null) { + log.error().exception(ex2).log("Failed to start replication producers after registered" + + " new producers"); + } }); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index c4f5bdeab74e7..7158f1b8f4721 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -116,6 +116,8 @@ import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -235,6 +237,61 @@ public void testDeleteTopicWhenReplicating() throws Exception { }); } + @Test(timeOut = 120 * 1000) + public void testTopicGCDoesNotDisconnectReplicatorWhenRemoteProducerIsActive() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + // Let inactive replicator check faster. + int replicationInactiveThresholdSeconds = pulsar1.getConfig().getBrokerReplicationInactiveThresholdSeconds(); + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(10); + // create topic. + admin1.topics().createNonPartitionedTopic(topic); + client1.newProducer(Schema.STRING).topic(topic).create().close(); + waitReplicatorStarted(topic); + PersistentTopic persistentTopic1 = (PersistentTopic) broker1.getTopic(topic, false).join().get(); + + // Let topic GC slower. + InactiveTopicPolicies inactiveTopicPolicies = + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 3600, true); + admin1.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies); + Awaitility.await().untilAsserted(() -> { + assertFalse(persistentTopic1.getProducers().values().stream() + .anyMatch(producer -> !producer.isRemote())); + assertTrue(persistentTopic1.getSubscriptions().isEmpty()); + assertTrue(persistentTopic1.getInactiveTopicPolicies().isDeleteWhileInactive()); + assertEquals(persistentTopic1.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 3600); + }); + + // Trigger an event: inactive replicator. + // Verify: the producer was closed. + persistentTopic1.disconnectReplicatorIfNoTrafficForLongTime(); + Thread.sleep(1000 * 12); + persistentTopic1.disconnectReplicatorIfNoTrafficForLongTime(); + Awaitility.await().untilAsserted(() -> { + Replicator replicator = persistentTopic1.getReplicators().get(cluster2); + assertNotNull(replicator); + assertFalse(replicator.isConnected()); + }); + + // Trigger an event: new producer registered. + // Verify: the replication is started again. + Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create(); + Awaitility.await().untilAsserted(() -> { + Replicator replicator = persistentTopic1.getReplicators().get(cluster2); + assertNotNull(replicator); + assertTrue(replicator.isConnected()); + }); + + // cleanup. + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(replicationInactiveThresholdSeconds); + if (producer1 != null) { + producer1.close(); + } + cleanupTopics(() -> { + admin1.topics().delete(topic); + admin2.topics().delete(topic); + }); + } + @Test(timeOut = 45 * 1000) public void testReplicatorProducerStatInTopic() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index e7dff52202ad4..dae2ea6caa5fa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -827,20 +827,23 @@ public void testSystemTopicCreationWithDifferentTopicCreationRule(int localSyste @Test public void testTopicGCDoesNotDisconnectReplicatorWhenRemoteProducerIsActive() throws Exception { + int replicationInactiveThresholdSeconds = pulsar1.getConfig().getBrokerReplicationInactiveThresholdSeconds(); + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(3600); final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); admin1.topics().createNonPartitionedTopic(topic); Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create(); try { producer1.send("msg-1"); + waitReplicatorStarted(topic, pulsar1); waitReplicatorStarted(topic, pulsar2); - PersistentTopic persistentTopic2 = (PersistentTopic) broker2 - .getTopic(topic, false).join().get(); + PersistentTopic persistentTopic2 = (PersistentTopic) broker2.getTopic(topic, false) + .join().get(); // Set inactive policies. InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(); inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); - inactiveTopicPolicies.setMaxInactiveDurationSeconds(60); + inactiveTopicPolicies.setMaxInactiveDurationSeconds(10); inactiveTopicPolicies.setDeleteWhileInactive(true); admin2.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies); @@ -850,7 +853,7 @@ public void testTopicGCDoesNotDisconnectReplicatorWhenRemoteProducerIsActive() t .anyMatch(producer -> !producer.isRemote())); assertTrue(persistentTopic2.getSubscriptions().isEmpty()); assertTrue(persistentTopic2.getInactiveTopicPolicies().isDeleteWhileInactive()); - assertEquals(persistentTopic2.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 60); + assertEquals(persistentTopic2.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 10); Replicator replicator = persistentTopic2.getReplicators().get(cluster1); assertNotNull(replicator); @@ -859,14 +862,17 @@ public void testTopicGCDoesNotDisconnectReplicatorWhenRemoteProducerIsActive() t }); // Trigger GC. + persistentTopic2.disconnectReplicatorIfNoTrafficForLongTime(); + persistentTopic2.checkGC(); + Thread.sleep(15 * 1000); + persistentTopic2.disconnectReplicatorIfNoTrafficForLongTime(); persistentTopic2.checkGC(); // Verify: the replication is not disconnected due to Topic GC. - Awaitility.await().during(Duration.ofSeconds(120)).atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - Replicator replicator = persistentTopic2.getReplicators().get(cluster1); - assertNotNull(replicator); - assertTrue(replicator.isConnected()); - }); + Replicator replicator = persistentTopic2.getReplicators().get(cluster1); + assertNotNull(replicator); + assertTrue(replicator.isConnected()); + // Verify: the replication still works. producer1.send("msg-2"); Awaitility.await().untilAsserted(() -> { @@ -874,6 +880,7 @@ public void testTopicGCDoesNotDisconnectReplicatorWhenRemoteProducerIsActive() t }); } finally { + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(replicationInactiveThresholdSeconds); producer1.close(); admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1)); admin2.topics().setReplicationClusters(topic, Arrays.asList(cluster2)); From e4444d9b073b25056b1406fc712f6c9e9c3e55c3 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Jun 2026 23:47:32 +0800 Subject: [PATCH 03/10] rename test --- .../org/apache/pulsar/broker/service/OneWayReplicatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 7158f1b8f4721..7f7f6a66a0dca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -238,7 +238,7 @@ public void testDeleteTopicWhenReplicating() throws Exception { } @Test(timeOut = 120 * 1000) - public void testTopicGCDoesNotDisconnectReplicatorWhenRemoteProducerIsActive() throws Exception { + public void testDisconnectAndReconnectInactiveReplicator() throws Exception { final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); // Let inactive replicator check faster. int replicationInactiveThresholdSeconds = pulsar1.getConfig().getBrokerReplicationInactiveThresholdSeconds(); From ee4ff83e7dbf322fac06d865315da64e441211e4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 3 Jun 2026 16:53:13 +0800 Subject: [PATCH 04/10] address comment --- .../pulsar/broker/ServiceConfiguration.java | 2 +- .../pulsar/broker/service/AbstractTopic.java | 9 +++- .../pulsar/broker/service/BrokerService.java | 2 +- .../nonpersistent/NonPersistentTopic.java | 9 +++- .../broker/service/OneWayReplicatorTest.java | 53 +++++++++++++++++++ .../service/OneWayReplicatorTestBase.java | 11 +++- ...yReplicatorUsingGlobalPartitionedTest.java | 11 ++++ .../OneWayReplicatorUsingGlobalZKTest.java | 10 ++++ 8 files changed, 101 insertions(+), 6 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index b63db3dcba6b1..1707b6301845c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -826,7 +826,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece + " runs with the inactive-topic monitor, whose interval is" + " brokerDeleteInactiveTopicsFrequencySeconds. The default is 86400 seconds (24 hours)." ) - private Integer brokerReplicationInactiveThresholdSeconds = 24 * 3600; + private int brokerReplicationInactiveThresholdSeconds = 24 * 3600; @FieldContext( category = CATEGORY_POLICIES, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index d73cb11ccaaaf..8cf9eea6a21d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -675,10 +675,16 @@ public void disconnectReplicatorIfNoTrafficForLongTime() { updateLocalProducersEmptyTime(); final Long cachedTime = localProducersEmptyTime; + // Still active. if (cachedTime == null) { return; } + // Disabled the feature. int threshold = brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds(); + if (threshold <= 0) { + return; + } + // Check and close replication producers. if (System.currentTimeMillis() - cachedTime > threshold * 1000L) { log.info().attr("brokerReplicationInactiveThresholdSeconds", threshold) .log("Disconnecting replication producers since no producer is active for a long time."); @@ -850,7 +856,8 @@ public CompletableFuture> addProducer(Producer producer, log.warn("Attempting to add producer to a terminated topic"); throw new TopicTerminatedException("Topic was already terminated"); } - return internalAddProducer(producer).thenApply(ignore -> { + return internalAddProducer(producer).thenCompose(__ -> this.startReplProducers()) + .thenApply(ignore -> { USAGE_COUNT_UPDATER.incrementAndGet(this); log.debug() .attr("producerName", producer.getProducerName()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 0872a0a4c917e..9f0582c430069 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1535,7 +1535,7 @@ private CompletableFuture> createNonPersistentTopic(String topic .attr("topic", topic) .exceptionMessage(ex.getCause()) .log("Replication check failed. Removing topic from topics list"); - nonPersistentTopic.closeReplProducersIfNoBacklog().whenComplete((v, exception) -> { + nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { topicFuture.completeExceptionally(ex); }); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 24ff7ac044b68..1f6bd198f619a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -598,12 +598,19 @@ public CompletableFuture close( return closeFuture; } - public CompletableFuture closeReplProducersIfNoBacklog() { + public CompletableFuture stopReplProducers() { List> closeFutures = new ArrayList<>(); replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate())); return FutureUtil.waitForAll(closeFutures); } + @Override + public CompletableFuture closeReplProducersIfNoBacklog() { + List> closeFutures = new ArrayList<>(); + replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect())); + return FutureUtil.waitForAll(closeFutures); + } + @Override public CompletableFuture startReplProducers() { replicators.forEach((region, replicator) -> replicator.startProducer()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 7f7f6a66a0dca..7ca36cce3f6b8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -292,6 +292,59 @@ public void testDisconnectAndReconnectInactiveReplicator() throws Exception { }); } + @Test(timeOut = 120 * 1000) + public void testDisconnectAndReconnectInactiveReplicatorNonPersistent() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("non-persistent://" + replicatedNamespace + "/tp_"); + // Let inactive replicator check faster. + int replicationInactiveThresholdSeconds = pulsar1.getConfig().getBrokerReplicationInactiveThresholdSeconds(); + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(10); + // create topic. + admin1.topics().createNonPartitionedTopic(topic); + Consumer consumer1 = client1.newConsumer().topic(topic).subscriptionName("s1").subscribe(); + waitReplicatorStarted(topic); + Consumer consumer2 = client2.newConsumer().topic(topic).subscriptionName("s1").subscribe(); + + NonPersistentTopic nonPersistentTopic1 = (NonPersistentTopic) broker1.getTopic(topic, false).join().get(); + + // Let topic GC slower. + InactiveTopicPolicies inactiveTopicPolicies = + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, 3600, true); + admin1.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies); + Awaitility.await().untilAsserted(() -> { + assertFalse(nonPersistentTopic1.getProducers().values().stream() + .anyMatch(producer -> !producer.isRemote())); + //assertTrue(nonPersistentTopic1.getSubscriptions().isEmpty()); + assertTrue(nonPersistentTopic1.getInactiveTopicPolicies().isDeleteWhileInactive()); + assertEquals(nonPersistentTopic1.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 3600); + }); + + // Trigger an event: inactive replicator. + // Verify: the producer was closed. + nonPersistentTopic1.disconnectReplicatorIfNoTrafficForLongTime(); + Thread.sleep(1000 * 12); + nonPersistentTopic1.disconnectReplicatorIfNoTrafficForLongTime(); + Awaitility.await().untilAsserted(() -> { + Replicator replicator = nonPersistentTopic1.getReplicators().get(cluster2); + assertNotNull(replicator); + assertFalse(replicator.isConnected()); + }); + + // Trigger an event: new producer registered. + // Verify: the replication is started again. + Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create(); + Awaitility.await().untilAsserted(() -> { + Replicator replicator = nonPersistentTopic1.getReplicators().get(cluster2); + assertNotNull(replicator); + assertTrue(replicator.isConnected()); + }); + + // cleanup. + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(replicationInactiveThresholdSeconds); + producer1.close(); + consumer1.close(); + consumer2.close(); + } + @Test(timeOut = 45 * 1000) public void testReplicatorProducerStatInTopic() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 0aec5358be6f8..cc0f85ce301da 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -44,6 +44,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -55,6 +56,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; @@ -408,8 +410,13 @@ protected void waitReplicatorStarted(String topicName, PulsarService remoteClust Awaitility.await().untilAsserted(() -> { Optional topicOptional2 = remoteCluster.getBrokerService().getTopic(topicName, false).get(); assertTrue(topicOptional2.isPresent()); - PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); - assertFalse(persistentTopic2.getProducers().isEmpty()); + if (TopicName.get(topicName).getDomain().equals(TopicDomain.persistent)) { + PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); + assertFalse(persistentTopic2.getProducers().isEmpty()); + } else { + NonPersistentTopic nonPersistentTopic2 = (NonPersistentTopic) topicOptional2.get(); + assertFalse(nonPersistentTopic2.getProducers().isEmpty()); + } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java index 89fc126e49c90..c568425abce2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java @@ -85,6 +85,17 @@ public void testDeleteTopicWhenReplicating() throws Exception { super.testDeleteTopicWhenReplicating(); } + @Test(enabled = false) + public void testDisconnectAndReconnectInactiveReplicator() throws Exception { + super.testDisconnectAndReconnectInactiveReplicator(); + } + + @Test(enabled = false) + public void testDisconnectAndReconnectInactiveReplicatorNonPersistent() throws Exception { + super.testDisconnectAndReconnectInactiveReplicator(); + } + + @Override @Test(enabled = false) public void testReplicatorProducerStatInTopic() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index dae2ea6caa5fa..d2bf2c728bd27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -825,6 +825,16 @@ public void testSystemTopicCreationWithDifferentTopicCreationRule(int localSyste admin2.topics().delete(tp, false); } + @Test(enabled = false) + public void testDisconnectAndReconnectInactiveReplicator() throws Exception { + super.testDisconnectAndReconnectInactiveReplicator(); + } + + @Test(enabled = false) + public void testDisconnectAndReconnectInactiveReplicatorNonPersistent() throws Exception { + super.testDisconnectAndReconnectInactiveReplicator(); + } + @Test public void testTopicGCDoesNotDisconnectReplicatorWhenRemoteProducerIsActive() throws Exception { int replicationInactiveThresholdSeconds = pulsar1.getConfig().getBrokerReplicationInactiveThresholdSeconds(); From 834af22c848f3de1985c766d7cebbbd714615f3b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 4 Jun 2026 14:25:42 +0800 Subject: [PATCH 05/10] improve code --- .../org/apache/pulsar/broker/service/AbstractTopic.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 8cf9eea6a21d9..40695d855b95d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -856,8 +856,12 @@ public CompletableFuture> addProducer(Producer producer, log.warn("Attempting to add producer to a terminated topic"); throw new TopicTerminatedException("Topic was already terminated"); } - return internalAddProducer(producer).thenCompose(__ -> this.startReplProducers()) - .thenApply(ignore -> { + CompletableFuture internalAppProducerFuture = internalAddProducer(producer); + internalAppProducerFuture.thenApply(__ -> this.startReplProducers()).exceptionally(ex -> { + log.error("Failed to start replication producers."); + return null; + }); + return internalAppProducerFuture.thenApply(ignore -> { USAGE_COUNT_UPDATER.incrementAndGet(this); log.debug() .attr("producerName", producer.getProducerName()) From 16b5b5bc9db3f8db291d61c451db3279106d75ca Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Jun 2026 17:45:07 +0800 Subject: [PATCH 06/10] change solution: replicator disconnect does not rely on topic anymore --- .../broker/service/AbstractReplicator.java | 9 +- .../pulsar/broker/service/AbstractTopic.java | 74 ++--- .../pulsar/broker/service/BrokerService.java | 2 +- .../pulsar/broker/service/Replicator.java | 4 - .../NonPersistentReplicator.java | 6 + .../nonpersistent/NonPersistentTopic.java | 17 +- .../persistent/GeoPersistentReplicator.java | 9 +- .../persistent/PersistentReplicator.java | 85 +++-- .../service/persistent/PersistentTopic.java | 85 +---- .../ReplicatedSubscriptionsController.java | 3 - .../service/persistent/ShadowReplicator.java | 9 +- .../broker/service/OneWayReplicatorTest.java | 296 ++++++++++++------ ...yReplicatorUsingGlobalPartitionedTest.java | 15 +- .../OneWayReplicatorUsingGlobalZKTest.java | 18 +- .../broker/service/PersistentTopicTest.java | 25 +- .../persistent/ShadowReplicatorTest.java | 2 +- 16 files changed, 346 insertions(+), 313 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 5a5ec2df1fc4a..2342d2a78f623 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -86,6 +86,8 @@ public abstract class AbstractReplicator implements Replicator { private static final AtomicReferenceFieldUpdater ATTRIBUTES_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, Attributes.class, "attributes"); + protected volatile long latestPublishTime = System.currentTimeMillis(); + public enum State { /** * This enum has two mean meanings: @@ -184,7 +186,7 @@ protected CompletableFuture prepareCreateProducer() { return CompletableFuture.completedFuture(null); } - public void startProducer() { + protected void startProducer() { // Guarantee only one task call "producerBuilder.createAsync()". Pair setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting); if (!setStartingRes.getLeft()) { @@ -313,8 +315,7 @@ protected CompletableFuture isLocalTopicActive() { /** * This method only be used by {@link PersistentTopic#checkGC} now. */ - @Override - public CompletableFuture disconnect() { + protected CompletableFuture disconnect() { long backlog = getNumberOfEntriesInBacklog(); if (backlog > 0) { CompletableFuture disconnectFuture = new CompletableFuture<>(); @@ -389,8 +390,6 @@ protected CompletableFuture closeProducerAsync(boolean closeTheStartingPro Pair setDisconnectedRes = compareSetAndGetState(State.Disconnecting, State.Disconnected); if (setDisconnectedRes.getLeft()) { this.producer = null; - // deactivate further read - disableReplicatorRead(); return; } if (setDisconnectedRes.getRight() == State.Terminating diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 40695d855b95d..55ea9bcc0b761 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -67,6 +67,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; +import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; @@ -130,8 +131,6 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { // Timestamp of when this topic was last seen active protected volatile long lastActive; - // Timestamp of when the latest producer was disconnected. - protected volatile Long localProducersEmptyTime; // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which // doesn't support batch-message @@ -667,60 +666,37 @@ protected Consumer getActiveConsumer(Subscription subscription) { return null; } - public abstract CompletableFuture closeReplProducersIfNoBacklog(); - - public abstract CompletableFuture startReplProducers(); - - public void disconnectReplicatorIfNoTrafficForLongTime() { - updateLocalProducersEmptyTime(); + protected boolean hasProducersActive() { + return !producers.isEmpty(); + } - final Long cachedTime = localProducersEmptyTime; - // Still active. - if (cachedTime == null) { - return; - } - // Disabled the feature. - int threshold = brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds(); - if (threshold <= 0) { - return; - } - // Check and close replication producers. - if (System.currentTimeMillis() - cachedTime > threshold * 1000L) { - log.info().attr("brokerReplicationInactiveThresholdSeconds", threshold) - .log("Disconnecting replication producers since no producer is active for a long time."); - closeReplProducersIfNoBacklog().whenCompleteAsync((__, ex) -> { - // Here, "!=" is used instead of the "!equals()" to avoid the problem of "inability to compare due - // to multiple changes in this attribute within a short period of time". - if (cachedTime != localProducersEmptyTime) { - log.warn().log("Restart replication producers since new producers were registered concurrently" - + " when closing producers."); - startReplProducers().whenComplete((ignore, ex2) -> { - if (ex2 != null) { - log.error().exception(ex2).log("Failed to start replication producers after registered" - + " new producers"); - } - }); - } - }); + protected boolean hasActiveReplicators() { + for (Replicator replicator : getReplicators().values()) { + if (replicator.isConnected()) { + return true; + } } + return false; } - protected void updateLocalProducersEmptyTime() { - // The timestamp has been set. - if (localProducersEmptyTime != null) { - return; + protected boolean hasLocalProducers() { + if (producers.isEmpty()) { + return false; } - // Only contains remote producer | producers is empty: update the time. for (Producer producer : producers.values()) { if (!producer.isRemote()) { - return; + return true; } } - localProducersEmptyTime = System.currentTimeMillis(); + return false; } - protected boolean hasProducersActive() { - return !producers.isEmpty(); + public void disconnectReplicatorsIfNoTrafficAndBacklog() { + for (Replicator replicator : getReplicators().values()) { + if (replicator instanceof PersistentReplicator persistentReplicator) { + persistentReplicator.disconnectIfNoTrafficAndBacklog(); + } + } } @Override @@ -856,12 +832,7 @@ public CompletableFuture> addProducer(Producer producer, log.warn("Attempting to add producer to a terminated topic"); throw new TopicTerminatedException("Topic was already terminated"); } - CompletableFuture internalAppProducerFuture = internalAddProducer(producer); - internalAppProducerFuture.thenApply(__ -> this.startReplProducers()).exceptionally(ex -> { - log.error("Failed to start replication producers."); - return null; - }); - return internalAppProducerFuture.thenApply(ignore -> { + return internalAddProducer(producer).thenApply(ignore -> { USAGE_COUNT_UPDATER.incrementAndGet(this); log.debug() .attr("producerName", producer.getProducerName()) @@ -1065,7 +1036,6 @@ protected CompletableFuture internalAddProducer(Producer producer) { if (existProducer != null) { return tryOverwriteOldProducer(existProducer, producer); } else if (!producer.isRemote()) { - localProducersEmptyTime = null; USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this); } return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9f0582c430069..60b2d199336e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2418,7 +2418,7 @@ public void checkGC() { public void checkInactiveReplication() { forEachTopic(topic -> { if (topic instanceof AbstractTopic abstractTopic) { - abstractTopic.disconnectReplicatorIfNoTrafficForLongTime(); + abstractTopic.disconnectReplicatorsIfNoTrafficAndBacklog(); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 86e2b6e74de89..1f781f0e98dcf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -25,16 +25,12 @@ public interface Replicator { - void startProducer(); - Topic getLocalTopic(); ReplicatorStatsImpl computeStats(); CompletableFuture terminate(); - CompletableFuture disconnect(); - void updateRates(); String getRemoteCluster(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 5be04738b67d3..3160d3e178cb8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -73,6 +73,11 @@ protected String getProducerName() { return getReplicatorName(replicatorPrefix, localCluster) + REPL_PRODUCER_NAME_DELIMITER + remoteCluster; } + @Override + public void startProducer() { + super.startProducer(); + } + @Override protected void setProducerAndTriggerReadEntries(Producer producer) { this.producer = (ProducerImpl) producer; @@ -89,6 +94,7 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { @SuppressWarnings("unchecked") public void sendMessage(Entry entry) { + latestPublishTime = System.currentTimeMillis(); if ((STATE_UPDATER.get(this) == State.Started) && isWritable()) { int length = entry.getLength(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 1f6bd198f619a..8043199897a89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -604,19 +604,6 @@ public CompletableFuture stopReplProducers() { return FutureUtil.waitForAll(closeFutures); } - @Override - public CompletableFuture closeReplProducersIfNoBacklog() { - List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect())); - return FutureUtil.waitForAll(closeFutures); - } - - @Override - public CompletableFuture startReplProducers() { - replicators.forEach((region, replicator) -> replicator.startProducer()); - return CompletableFuture.completedFuture(null); - } - @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); @@ -1038,7 +1025,7 @@ public CompletableFuture getInternalStats(boolean public boolean isActive() { // No local consumers and no local producers - return !subscriptions.isEmpty() || hasProducersActive(); + return !subscriptions.isEmpty() || hasLocalProducers(); } @Override @@ -1101,7 +1088,7 @@ public void checkGC() { .attr("maxInactiveDurationInSec", maxInactiveDurationInSec) .log("Topic inactive for seconds, closing repl producers."); - closeReplProducersIfNoBacklog().thenCompose(v -> delete(true, false)) + stopReplProducers().thenCompose(v -> delete(true, false)) .thenCompose(__ -> tryToDeletePartitionedMetadata()) .thenRun(() -> log.info("Topic deleted successfully due to inactivity")) .exceptionally(e -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index c6534b346c25b..fb49af07c0e04 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -146,7 +146,7 @@ private CompletableFuture createRemoteTopicIfDoesNotExist(String partition @Override @SuppressWarnings("unchecked") - protected boolean replicateEntries(List entries, final InFlightTask inFlightTask) { + protected boolean doReplicateEntries(List entries, final InFlightTask inFlightTask) { boolean atLeastOneMessageSentForReplication = false; boolean isEnableReplicatedSubscriptions = brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions(); @@ -154,8 +154,7 @@ protected boolean replicateEntries(List entries, final InFlightTask inFli try { // This flag is set to true when we skip at least one local message, // in order to skip remaining local messages. - boolean isLocalMessageSkippedOnce = false; - boolean skipRemainingMessages = inFlightTask.isSkipReadResultDueToCursorRewind(); + boolean skipRemainingMessages = false; for (int i = 0; i < entries.size(); i++) { Entry entry = entries.get(i); // Skip the messages since the replicator need to fetch the schema info to replicate the schema to the @@ -239,13 +238,13 @@ protected boolean replicateEntries(List entries, final InFlightTask inFli continue; } - if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { + if (STATE_UPDATER.get(this) != State.Started || inFlightTask.isSkipReadResultDueToCursorRewind()) { // The producer is not ready yet after having stopped/restarted. Drop the message because it will // recover when the producer is ready log.debug() .attr("position", entry.getPosition()) .log("Dropping read message because producer is not ready"); - isLocalMessageSkippedOnce = true; + skipRemainingMessages = true; inFlightTask.incCompletedEntries(); entry.release(); msg.recycle(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 8433d9b9c7ba2..1fc207d9697be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Disconnected; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Disconnecting; import static org.apache.pulsar.broker.service.AbstractReplicator.State.Started; import static org.apache.pulsar.broker.service.AbstractReplicator.State.Starting; import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminated; @@ -52,6 +54,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; @@ -151,13 +154,6 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man @Override protected void setProducerAndTriggerReadEntries(Producer producer) { - // Repeat until there are no read operations in progress - if (STATE_UPDATER.get(this) == State.Starting && hasPendingRead() && !cursor.cancelPendingReadRequest()) { - brokerService.getPulsar().getExecutor() - .schedule(() -> setProducerAndTriggerReadEntries(producer), 10, TimeUnit.MILLISECONDS); - return; - } - /** * 1. Try change state to {@link Started}. * 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value @@ -179,8 +175,6 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { // activate cursor: so, entries can be cached. this.cursor.setActive(); - // Rewind the cursor to be sure to read again all non-acked messages sent while restarting - cursor.rewind(); // read entries readMoreEntries(); } else { @@ -283,6 +277,30 @@ private AvailablePermits getRateLimiterAvailablePermits(int availablePermits) { return new AvailablePermits((int) availablePermitsOnMsg, availablePermitsOnByte); } + public void disconnectIfNoTrafficAndBacklog() { + // Disabled the feature. + int threshold = brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds(); + if (threshold <= 0) { + return; + } + // Has backlog. + long backlog = getNumberOfEntriesInBacklog(); + if (backlog > 0) { + return; + } + // Already disconnected. + if (state != Started) { + return; + } + + // Disconnect if no backlog and no traffic for a long time. + if (System.currentTimeMillis() - latestPublishTime > threshold * 1000L) { + log.info().attr("brokerReplicationInactiveThresholdSeconds", threshold) + .log("Disconnecting replication producers since no producer is active for a long time."); + disconnect(); + } + } + protected void readMoreEntries() { if (state.equals(Terminated) || state.equals(Terminating)) { return; @@ -360,12 +378,11 @@ public void readEntriesComplete(List entries, Object ctx) { readFailureBackoff.reduceToHalf(); - boolean atLeastOneMessageSentForReplication = replicateEntries(entries, inFlightTask); + replicateEntries(entries, inFlightTask); - if (atLeastOneMessageSentForReplication && !isWritable()) { + if (!isWritable()) { // Don't read any more entries until the current pending entries are persisted log.debug() - .attr("atLeastOneMessageSentForReplication", atLeastOneMessageSentForReplication) .attr("isWritable", isWritable()) .log("Pausing replication traffic"); } else { @@ -373,7 +390,44 @@ public void readEntriesComplete(List entries, Object ctx) { } } - protected abstract boolean replicateEntries(List entries, InFlightTask inFlightTask); + protected void replicateEntries(List entries, InFlightTask inFlightTask) { + latestPublishTime = System.currentTimeMillis(); + // Release memory if terminated. + if (state == State.Terminated || state == State.Terminating + || inFlightTask.isSkipReadResultDueToCursorRewind()) { + for (Entry entry : entries) { + inFlightTask.incCompletedEntries(); + entry.release(); + } + return; + } + + // Retry to replicate messages if it is not started. + ManagedLedgerImpl ml = (ManagedLedgerImpl) cursor.getManagedLedger(); + Runnable retryReplicateEntries = () -> { + ml.getScheduledExecutor().schedule(() -> { + ml.getExecutor().execute(() -> { + replicateEntries(entries, inFlightTask); + }); + }, 100, TimeUnit.MILLISECONDS); + }; + + // Retry. + if (state == Disconnecting || state == Starting) { + retryReplicateEntries.run(); + return; + } + // Start producer and retry. + if (state == Disconnected) { + startProducer(); + retryReplicateEntries.run(); + return; + } + // Do replicate. + doReplicateEntries(entries, inFlightTask); + } + + protected abstract boolean doReplicateEntries(List entries, InFlightTask inFlightTask); protected CompletableFuture getSchemaInfo(MessageImpl msg) throws ExecutionException { if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) { @@ -932,15 +986,10 @@ protected CompletableFuture beforeDisconnect() { .TopicBusyException("Cannot close a replicator with backlog")); } } - beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Disconnecting); return CompletableFuture.completedFuture(null); } } - protected void afterDisconnected() { - doRewindCursor(false); - } - protected void beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding reason) { synchronized (inFlightTasks) { boolean hasCanceledPendingRead = cursor.cancelPendingReadRequest(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0601d396f6b6b..1cec7d5e59a95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -836,9 +836,7 @@ public CompletableFuture> addProducer(Producer producer, CompletableFuture producerQueuedFuture) { return super.addProducer(producer, producerQueuedFuture).thenCompose(topicEpoch -> { messageDeduplication.producerAdded(producer.getProducerName()); - - // Start replication producers if not already - return startReplProducers().thenApply(__ -> topicEpoch); + return CompletableFuture.completedFuture(topicEpoch); }); } @@ -888,47 +886,6 @@ private boolean hasRemoteProducers() { return false; } - @Override - public CompletableFuture startReplProducers() { - // read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close - return brokerService.pulsar().getPulsarResources().getNamespaceResources() - .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) - .thenAcceptAsync(optPolicies -> { - if (optPolicies.isPresent()) { - if (optPolicies.get().replication_clusters != null) { - Set configuredClusters = Sets.newTreeSet(optPolicies.get().replication_clusters); - replicators.forEach((region, replicator) -> { - if (configuredClusters.contains(region)) { - replicator.startProducer(); - } - }); - } - } else { - replicators.forEach((region, replicator) -> replicator.startProducer()); - } - }, getOrderedExecutor()).exceptionally(ex -> { - log.debug() - .exceptionMessage(ex) - .log("Error getting policies while starting repl-producers"); - replicators.forEach((region, replicator) -> replicator.startProducer()); - return null; - }); - } - - public CompletableFuture stopReplProducers() { - List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate())); - shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.terminate())); - return FutureUtil.waitForAll(closeFutures); - } - - public synchronized CompletableFuture closeReplProducersIfNoBacklog() { - List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect())); - return FutureUtil.waitForAll(closeFutures); - } - @Override protected void handleProducerRemoved(Producer producer) { super.handleProducerRemoved(producer); @@ -3356,7 +3313,7 @@ public boolean isActive(InactiveTopicDeleteMode deleteMode) { break; } // no local producers - return hasProducersActive(); + return hasProducersActive() || hasActiveReplicators(); } private boolean hasBacklogs(boolean getPreciseBacklog) { @@ -3577,42 +3534,8 @@ public void checkGC() { // Topic activity is still within the retention period return; } else { - CompletableFuture replCloseFuture = new CompletableFuture<>(); - - // Close repl producers first. - // Once all repl producers are closed, we can delete the topic, - // provided no remote producers connected to the broker. - log.debug() - .attr("maxInactiveDurationInSec", maxInactiveDurationInSec) - .log("Topic inactive for seconds, closing repl producers."); - /** - * There is a race condition that may cause a NPE: - * - task 1: a callback of "replicator.cursor.asyncRead" will trigger a replication. - * - task 2: "closeReplProducersIfNoBacklog" called by current thread will make the variable - * "replicator.producer" to a null value. - * Race condition: task 1 will get a NPE when it tries to send messages using the variable - * "replicator.producer", because task 2 will set this variable to "null". - * TODO Create a separate PR to fix it. - */ - closeReplProducersIfNoBacklog().thenRun(() -> { - if (hasRemoteProducers()) { - log.debug("Topic has connected remote producers, not a candidate for GC"); - replCloseFuture - .completeExceptionally(new TopicBusyException("Topic has connected remote producers")); - } else { - log.info() - .attr("maxInactiveDurationInSec", maxInactiveDurationInSec) - .log("Topic inactive, closed repl producers"); - replCloseFuture.complete(null); - } - }).exceptionally(e -> { - log.debug("Topic has replication backlog. Not a candidate for GC"); - replCloseFuture.completeExceptionally(e.getCause()); - return null; - }); - - replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions, - deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false)) + delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions, + deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false) .thenCompose((res) -> tryToDeletePartitionedMetadata()) .thenRun(() -> log.info("Topic deleted successfully due to inactivity")) .exceptionally(e -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 0b4e8d1df47fb..95c306f486d95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -159,9 +159,6 @@ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest requ + "configured for that cluster. Ignoring the request."); return; } - if (!replicator.isConnected()) { - topic.startReplProducers(); - } // Send response containing the current last written message id. The response // marker we're publishing locally and then replicating will have a higher diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index 5d6f6f688cb9f..20c5303b3423c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -61,14 +61,13 @@ protected String getProducerName() { @Override @SuppressWarnings("unchecked") - protected boolean replicateEntries(List entries, InFlightTask inFlightTask) { + protected boolean doReplicateEntries(List entries, InFlightTask inFlightTask) { boolean atLeastOneMessageSentForReplication = false; try { // This flag is set to true when we skip at least one local message, // in order to skip remaining local messages. - boolean isLocalMessageSkippedOnce = false; - boolean skipRemainingMessages = inFlightTask.isSkipReadResultDueToCursorRewind(); + boolean skipRemainingMessages = false; for (int i = 0; i < entries.size(); i++) { Entry entry = entries.get(i); // Skip the messages since the replicator need to fetch the schema info to replicate the schema to the @@ -108,13 +107,13 @@ protected boolean replicateEntries(List entries, InFlightTask inFlightTas continue; } - if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { + if (STATE_UPDATER.get(this) != State.Started || inFlightTask.isSkipReadResultDueToCursorRewind()) { // The producer is not ready yet after having stopped/restarted. Drop the message because it will // recovered when the producer is ready log.debug() .attr("position", entry.getPosition()) .log("Dropping read message because producer is not ready"); - isLocalMessageSkippedOnce = true; + skipRemainingMessages = true; inFlightTask.incCompletedEntries(); entry.release(); msg.recycle(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 7ca36cce3f6b8..2a02d0436fe37 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -58,6 +58,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -79,6 +82,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerTest; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; @@ -116,8 +120,6 @@ import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; -import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; -import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -131,6 +133,8 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.impl.DualMetadataStore; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.glassfish.jersey.client.JerseyClient; @@ -158,6 +162,11 @@ public void cleanup() throws Exception { super.cleanup(); } + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + } + @Test(timeOut = 45 * 1000) public void testReceiverSideReplicationStats() throws Exception { final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); @@ -237,112 +246,205 @@ public void testDeleteTopicWhenReplicating() throws Exception { }); } - @Test(timeOut = 120 * 1000) - public void testDisconnectAndReconnectInactiveReplicator() throws Exception { + @DataProvider + public Object[][] paramsDisconnectReplicator() { + // Binary way replication. + // local producers on cluster-1 registered. + // local producers on cluster-1 have traffic. + // replicator producer from cluster-2 has traffic. + // replicator producer from cluster-2 is present. + return new Object[][] { + {true, true, false, true, true}, // verify-cluster-2: no replicator terminate occurs. + {true, true, false, false, true}, // verify-cluster-2: replicator terminated and resumed. + {true, true, false, false, false}, // verify-cluster-2: replicator terminated and resumed. + {true, false, false, true, true}, // verify-cluster-2: no replicator terminate occurs. + {true, false, false, false, true}, // verify-cluster-2: replicator terminated and resumed. + {true, false, false, false, false}, // verify-cluster-2: replicator terminated and resumed. + + {false, false, false, false, false}, // verify-cluster-2: replicator terminated and resumed. + {false, true, false, false, false} // verify-cluster-2: replicator terminated and resumed. + }; + } + + @Test(timeOut = 240 * 1000, dataProvider = "paramsDisconnectReplicator") + public void testDisconnectAndReconnectReplicator(boolean binaryWayRepl, + boolean hasLocalProducerRegistered, + boolean localProducerHasTraffic, + boolean hasRemoteProducerTraffic, + boolean hasRemoteProducerRegistered) throws Exception { + ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(1); + ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(1); + ScheduledFuture checkInactiveTopic = executor1.scheduleWithFixedDelay(() -> { + pulsar1.getBrokerService().checkInactiveReplication(); + }, 10, 10, TimeUnit.SECONDS); + // local cluster: let inactive replicator check faster. + int replicationInactiveThresholdSeconds1 = pulsar1.getConfig().getBrokerReplicationInactiveThresholdSeconds(); + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(30); + // remote cluster: let inactive topic deletion never occur. + int replicationInactiveThresholdSeconds2 = pulsar2.getConfig().getBrokerReplicationInactiveThresholdSeconds(); + pulsar2.getConfig().setBrokerReplicationInactiveThresholdSeconds(3600 * 24); + // Lat topic GC does not execute. + int inactiveTopicsMaxInactiveDurationSeconds = pulsar1.getConfig() + .getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(); + pulsar1.getConfig().setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(3600 * 24); + + // Check params. + if (hasRemoteProducerTraffic && !hasRemoteProducerRegistered) { + throw new Exception("If has traffic from remote cluster, the param \"hasRemoteProducer\" can not be false"); + } + // Check params. + if (localProducerHasTraffic && !hasLocalProducerRegistered) { + throw new Exception("If has local traffic, the param \"localProducerEmpty\" can not be true"); + } + + ScheduledFuture scheduledPublish1 = null; + ScheduledFuture scheduledPublish2 = null; final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); - // Let inactive replicator check faster. - int replicationInactiveThresholdSeconds = pulsar1.getConfig().getBrokerReplicationInactiveThresholdSeconds(); - pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(10); - // create topic. - admin1.topics().createNonPartitionedTopic(topic); - client1.newProducer(Schema.STRING).topic(topic).create().close(); - waitReplicatorStarted(topic); - PersistentTopic persistentTopic1 = (PersistentTopic) broker1.getTopic(topic, false).join().get(); - // Let topic GC slower. - InactiveTopicPolicies inactiveTopicPolicies = - new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 3600, true); - admin1.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies); - Awaitility.await().untilAsserted(() -> { - assertFalse(persistentTopic1.getProducers().values().stream() - .anyMatch(producer -> !producer.isRemote())); - assertTrue(persistentTopic1.getSubscriptions().isEmpty()); - assertTrue(persistentTopic1.getInactiveTopicPolicies().isDeleteWhileInactive()); - assertEquals(persistentTopic1.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 3600); - }); + // Init by params: local producers. + final Producer producer1A = client1.newProducer(Schema.STRING).topic(topic).create(); + Producer producer1B = null; + if (!hasLocalProducerRegistered) { + producer1A.close(); + } + // Init by params: local producer traffic. + if (localProducerHasTraffic) { + AtomicInteger msgCount = new AtomicInteger(); + scheduledPublish1 = executor1.scheduleWithFixedDelay(() -> { + producer1A.sendAsync(msgCount.incrementAndGet() + ""); + }, 1, 1, TimeUnit.SECONDS); + } + // Init by params: binary way replication. + waitReplicatorStarted(topic, pulsar2); + if (binaryWayRepl) { + admin2.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topic, pulsar1); + } + final PersistentTopic persistentTopic1 = (PersistentTopic) broker1.getTopic(topic, false).join().get(); + final PersistentTopic persistentTopic2 = (PersistentTopic) broker2.getTopic(topic, false).join().get(); + // Init by params: remote producer traffic. + final Producer producer2 = client2.newProducer(Schema.STRING).topic(topic).create(); + if (hasRemoteProducerTraffic) { + AtomicInteger msgCount = new AtomicInteger(); + scheduledPublish2 = executor2.scheduleWithFixedDelay(() -> { + producer2.sendAsync(msgCount.incrementAndGet() + ""); + }, 1, 1, TimeUnit.SECONDS); + } + // Init by params: remote producers. + if (binaryWayRepl && !hasRemoteProducerTraffic && !hasRemoteProducerRegistered) { + persistentTopic2.getReplicators().get(cluster1).terminate(); + } - // Trigger an event: inactive replicator. - // Verify: the producer was closed. - persistentTopic1.disconnectReplicatorIfNoTrafficForLongTime(); - Thread.sleep(1000 * 12); - persistentTopic1.disconnectReplicatorIfNoTrafficForLongTime(); - Awaitility.await().untilAsserted(() -> { - Replicator replicator = persistentTopic1.getReplicators().get(cluster2); - assertNotNull(replicator); - assertFalse(replicator.isConnected()); - }); + // Verify: all states match params. + Thread.sleep(3000); + // All states match: local producers. + if (!hasLocalProducerRegistered) { + assertFalse(persistentTopic1.getProducers().values().stream() + .filter(p -> !p.isRemote()).findAny().isPresent()); + } else { + Optional serviceProducer1 = persistentTopic1.getProducers() + .values().stream().filter(p -> !p.isRemote()).findAny(); + assertTrue(serviceProducer1.isPresent()); +// if (localProducerHasTraffic) { +// assertTrue(System.currentTimeMillis() - serviceProducer1.get().getLatestPublishTime() < 1_500); +// } else { +// assertTrue(System.currentTimeMillis() - serviceProducer1.get().getLatestPublishTime() > 2_500); +// } + } + // All states match: remote producers. + if (binaryWayRepl) { + if (!hasRemoteProducerRegistered) { + assertFalse(persistentTopic1.getProducers().values().stream() + .filter(p -> p.isRemote()).findAny().isPresent()); + } else { + Optional serviceProducer1 = persistentTopic1.getProducers() + .values().stream().filter(p -> p.isRemote()).findAny(); + assertTrue(serviceProducer1.isPresent()); +// if (hasRemoteProducerTraffic) { +// assertTrue(System.currentTimeMillis() - serviceProducer1.get().getLatestPublishTime() < 1_500); +// } else { +// assertTrue(System.currentTimeMillis() - serviceProducer1.get().getLatestPublishTime() > 2_500); +// } + } + } - // Trigger an event: new producer registered. - // Verify: the replication is started again. - Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create(); - Awaitility.await().untilAsserted(() -> { - Replicator replicator = persistentTopic1.getReplicators().get(cluster2); - assertNotNull(replicator); - assertTrue(replicator.isConnected()); - }); + // Verify: replicator terminated or not. + if (hasRemoteProducerTraffic || localProducerHasTraffic) { + long verifyStartTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - verifyStartTime < 100_000) { + assertFalse(persistentTopic1.getReplicators().isEmpty()); + PersistentReplicator persistentReplicator = + (PersistentReplicator) persistentTopic1.getReplicators().get(cluster2); + assertTrue(persistentReplicator.isConnected()); + assertEquals(persistentReplicator.getState(), AbstractReplicator.State.Started); + Thread.sleep(1000); + } + } else { + // TODO terminate 的判断逻辑搬运到 replicator 中。 + // resume 的判断逻辑可言在 scheduled task 中,也可以在 cursor pending read 中。 + Thread.sleep(100_000); + assertFalse(persistentTopic1.getReplicators().isEmpty()); + PersistentReplicator persistentReplicatorA = + (PersistentReplicator) persistentTopic1.getReplicators().get(cluster2); + assertFalse(persistentReplicatorA.isConnected()); + assertEquals(persistentReplicatorA.getState(), AbstractReplicator.State.Disconnected); + + // Verify: resume. +// if (!hasRemoteProducerRegistered) { +// persistentTopic2.getReplicators().get(cluster1).startProducer(); +// Awaitility.await().untilAsserted(() -> { +// assertTrue(persistentTopic2.getReplicators().get(cluster1).isConnected()); +// }); +// } + if (hasRemoteProducerRegistered && !hasRemoteProducerTraffic) { + producer2.send("msg-remote"); + } + if (!hasLocalProducerRegistered) { + producer1B = client1.newProducer(Schema.STRING).topic(topic).create(); + producer1B.send("msg-local"); + } else { + producer1A.send("msg-local"); + } + Awaitility.await().untilAsserted(() -> { + assertFalse(persistentTopic1.getReplicators().isEmpty()); + PersistentReplicator persistentReplicatorB = + (PersistentReplicator) persistentTopic1.getReplicators().get(cluster2); + assertTrue(persistentReplicatorB.isConnected()); + assertEquals(persistentReplicatorB.getState(), AbstractReplicator.State.Started); + }); + } // cleanup. - pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(replicationInactiveThresholdSeconds); - if (producer1 != null) { - producer1.close(); + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(replicationInactiveThresholdSeconds1); + pulsar2.getConfig().setBrokerReplicationInactiveThresholdSeconds(replicationInactiveThresholdSeconds2); + pulsar1.getConfig().setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds( + inactiveTopicsMaxInactiveDurationSeconds); + if (scheduledPublish1 != null) { + scheduledPublish1.cancel(true); + } + if (scheduledPublish2 != null) { + scheduledPublish2.cancel(true); + } + checkInactiveTopic.cancel(true); + if (producer1A.isConnected()) { + producer1A.close(); + } + if (producer1B != null && producer1B.isConnected()) { + producer1B.close(); + } + if (producer2.isConnected()) { + producer2.close(); + } + if (binaryWayRepl) { + admin2.topics().setReplicationClusters(topic, Arrays.asList(cluster2)); + waitReplicatorStopped(pulsar2, pulsar1, topic); } cleanupTopics(() -> { admin1.topics().delete(topic); admin2.topics().delete(topic); }); - } - - @Test(timeOut = 120 * 1000) - public void testDisconnectAndReconnectInactiveReplicatorNonPersistent() throws Exception { - final String topic = BrokerTestUtil.newUniqueName("non-persistent://" + replicatedNamespace + "/tp_"); - // Let inactive replicator check faster. - int replicationInactiveThresholdSeconds = pulsar1.getConfig().getBrokerReplicationInactiveThresholdSeconds(); - pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(10); - // create topic. - admin1.topics().createNonPartitionedTopic(topic); - Consumer consumer1 = client1.newConsumer().topic(topic).subscriptionName("s1").subscribe(); - waitReplicatorStarted(topic); - Consumer consumer2 = client2.newConsumer().topic(topic).subscriptionName("s1").subscribe(); - - NonPersistentTopic nonPersistentTopic1 = (NonPersistentTopic) broker1.getTopic(topic, false).join().get(); - - // Let topic GC slower. - InactiveTopicPolicies inactiveTopicPolicies = - new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, 3600, true); - admin1.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies); - Awaitility.await().untilAsserted(() -> { - assertFalse(nonPersistentTopic1.getProducers().values().stream() - .anyMatch(producer -> !producer.isRemote())); - //assertTrue(nonPersistentTopic1.getSubscriptions().isEmpty()); - assertTrue(nonPersistentTopic1.getInactiveTopicPolicies().isDeleteWhileInactive()); - assertEquals(nonPersistentTopic1.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 3600); - }); - - // Trigger an event: inactive replicator. - // Verify: the producer was closed. - nonPersistentTopic1.disconnectReplicatorIfNoTrafficForLongTime(); - Thread.sleep(1000 * 12); - nonPersistentTopic1.disconnectReplicatorIfNoTrafficForLongTime(); - Awaitility.await().untilAsserted(() -> { - Replicator replicator = nonPersistentTopic1.getReplicators().get(cluster2); - assertNotNull(replicator); - assertFalse(replicator.isConnected()); - }); - - // Trigger an event: new producer registered. - // Verify: the replication is started again. - Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create(); - Awaitility.await().untilAsserted(() -> { - Replicator replicator = nonPersistentTopic1.getReplicators().get(cluster2); - assertNotNull(replicator); - assertTrue(replicator.isConnected()); - }); - - // cleanup. - pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(replicationInactiveThresholdSeconds); - producer1.close(); - consumer1.close(); - consumer2.close(); + executor1.shutdown(); + executor2.shutdown(); } @Test(timeOut = 45 * 1000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java index c568425abce2b..eaeedfd4a07a1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java @@ -86,16 +86,15 @@ public void testDeleteTopicWhenReplicating() throws Exception { } @Test(enabled = false) - public void testDisconnectAndReconnectInactiveReplicator() throws Exception { - super.testDisconnectAndReconnectInactiveReplicator(); + public void testDisconnectAndReconnectReplicator(boolean binaryWayRepl, + boolean hasLocalProducerRegistered, + boolean localProducerHasTraffic, + boolean hasRemoteProducerTraffic, + boolean hasRemoteProducerRegistered) throws Exception { + super.testDisconnectAndReconnectReplicator(binaryWayRepl, hasLocalProducerRegistered, localProducerHasTraffic, + hasRemoteProducerTraffic, hasRemoteProducerRegistered); } - @Test(enabled = false) - public void testDisconnectAndReconnectInactiveReplicatorNonPersistent() throws Exception { - super.testDisconnectAndReconnectInactiveReplicator(); - } - - @Override @Test(enabled = false) public void testReplicatorProducerStatInTopic() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index d2bf2c728bd27..1d19f361e9b66 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -826,13 +826,13 @@ public void testSystemTopicCreationWithDifferentTopicCreationRule(int localSyste } @Test(enabled = false) - public void testDisconnectAndReconnectInactiveReplicator() throws Exception { - super.testDisconnectAndReconnectInactiveReplicator(); - } - - @Test(enabled = false) - public void testDisconnectAndReconnectInactiveReplicatorNonPersistent() throws Exception { - super.testDisconnectAndReconnectInactiveReplicator(); + public void testDisconnectAndReconnectReplicator(boolean binaryWayRepl, + boolean hasLocalProducerRegistered, + boolean localProducerHasTraffic, + boolean hasRemoteProducerTraffic, + boolean hasRemoteProducerRegistered) throws Exception { + super.testDisconnectAndReconnectReplicator(binaryWayRepl, hasLocalProducerRegistered, localProducerHasTraffic, + hasRemoteProducerTraffic, hasRemoteProducerRegistered); } @Test @@ -872,10 +872,10 @@ public void testTopicGCDoesNotDisconnectReplicatorWhenRemoteProducerIsActive() t }); // Trigger GC. - persistentTopic2.disconnectReplicatorIfNoTrafficForLongTime(); + persistentTopic2.disconnectReplicatorsIfNoTrafficAndBacklog(); persistentTopic2.checkGC(); Thread.sleep(15 * 1000); - persistentTopic2.disconnectReplicatorIfNoTrafficForLongTime(); + persistentTopic2.disconnectReplicatorsIfNoTrafficAndBacklog(); persistentTopic2.checkGC(); // Verify: the replication is not disconnected due to Topic GC. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 3a2774bce2126..a4e9958b7b794 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1681,17 +1681,19 @@ private PulsarAdmin mockReplicationAdmin() { } /** - * NonPersistentReplicator.removeReplicator doesn't remove replicator in atomic way and does in multiple step: - * 1. disconnect replicator producer + * PersistentReplicator.removeReplicator doesn't remove replicator in atomic way and does in multiple step: + * 1. Turn off replication. *

- * 2. close cursor + * 2. Broker will do two things: + * 2-1. terminate replication. + * 2-2. delete cursor that named "repl.x" *

- * 3. remove from replicator-list. + * 3. remove the terminated replicator from "topic.replicators". *

* - * If we try to startReplicationProducer before step-c finish then it should not avoid restarting repl-producer. - * - * @throws Exception + * Test: + * do: try to restart replicator producer before step-2-2 finish. + * verify: the replicator producer will not be started. */ @Test @SuppressWarnings("unchecked") @@ -1749,9 +1751,14 @@ public CompletableFuture createAsync() { // step-2 now, policies doesn't have removed replication cluster so, it should not invoke "startProducer" of the // replicator // try to start replicator again - topic.startReplProducers().join(); + Awaitility.await().untilAsserted(() -> { + assertEquals(replicator.getState(), AbstractReplicator.State.Terminated); + }); + replicator.startProducer(); + Thread.sleep(10_000); // verify: replicator.startProducer is not invoked - verify(replicator, Mockito.times(1)).startProducer(); + assertEquals(replicator.getState(), AbstractReplicator.State.Terminated); + assertFalse(replicator.isConnected()); // step-3 : complete the callback to remove replicator from the list ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteCursorCallback.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java index 21ead24cdd463..76460b1918ba0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java @@ -179,7 +179,7 @@ public void testShadowReplicatorReleasesSourceEntryBuffer() throws Exception { new PersistentReplicator.InFlightTask( entry.getPosition(), entries.size(), replicator.getReplicatorId()); inFlightTask.setEntries(entries); - Assert.assertTrue(replicator.replicateEntries(entries, inFlightTask)); + replicator.replicateEntries(entries, inFlightTask); Awaitility.await().untilAsserted(() -> { Assert.assertTrue(inFlightTask.isDone()); From c6ef5f531f32ab756c0eb3008330874a5381fa49 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Jun 2026 18:01:44 +0800 Subject: [PATCH 07/10] improve java doc --- .../pulsar/broker/ServiceConfiguration.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1707b6301845c..c6ca4b876a8fc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -815,16 +815,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( category = CATEGORY_POLICIES, dynamic = true, - doc = "Time in seconds that a topic may have no local producers before the broker considers its outbound" - + " replication producers idle. The timer starts when the inactive-replication check first observes" - + " that the topic has no local producers; remote producers created by replication from another" - + " cluster do not reset this timer. When the threshold is exceeded and the replication backlog is" - + " clear, the broker disconnects the topic's replication producers to release idle replication" - + " resources. A connected remote producer still makes the topic active for inactive-topic GC, so" - + " the topic is not deleted only because local producers are absent; deletion is still controlled" - + " by brokerDeleteInactiveTopicsEnabled and the namespace/topic inactive-topic policies. The check" - + " runs with the inactive-topic monitor, whose interval is" - + " brokerDeleteInactiveTopicsFrequencySeconds. The default is 86400 seconds (24 hours)." + doc = "Time in seconds that a persistent geo-replication replicator may stay idle before the broker" + + " disconnects its replication producer. A replicator is eligible only when it has no backlog and" + + " has not read entries for replication processing for longer than this threshold. Disconnecting" + + " only releases the idle producer; the replicator and its cursor remain available, and the" + + " producer is recreated automatically when new messages need to be replicated. Set this value to" + + " 0 or a negative value to disable idle-replicator disconnection. The check runs with the" + + " inactive-topic monitor, whose interval is brokerDeleteInactiveTopicsFrequencySeconds, and only" + + " when brokerDeleteInactiveTopicsEnabled is true. The default is 86400 seconds (24 hours)." ) private int brokerReplicationInactiveThresholdSeconds = 24 * 3600; From ef21818cfbaec5e11dfa17c63c9a7ecfb99a78bd Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Jun 2026 18:15:47 +0800 Subject: [PATCH 08/10] fix stuck issue --- .../persistent/PersistentReplicator.java | 17 ++++++++++++----- .../persistent/ShadowReplicatorTest.java | 2 +- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 1fc207d9697be..7aa9288302b37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -378,9 +378,10 @@ public void readEntriesComplete(List entries, Object ctx) { readFailureBackoff.reduceToHalf(); - replicateEntries(entries, inFlightTask); + boolean producerIsWritable = isWritable(); + replicateEntries(entries, inFlightTask, !producerIsWritable); - if (!isWritable()) { + if (!producerIsWritable) { // Don't read any more entries until the current pending entries are persisted log.debug() .attr("isWritable", isWritable()) @@ -390,7 +391,8 @@ public void readEntriesComplete(List entries, Object ctx) { } } - protected void replicateEntries(List entries, InFlightTask inFlightTask) { + protected void replicateEntries(List entries, InFlightTask inFlightTask, + final boolean skippedReadAfterSent) { latestPublishTime = System.currentTimeMillis(); // Release memory if terminated. if (state == State.Terminated || state == State.Terminating @@ -407,7 +409,7 @@ protected void replicateEntries(List entries, InFlightTask inFlightTask) Runnable retryReplicateEntries = () -> { ml.getScheduledExecutor().schedule(() -> { ml.getExecutor().execute(() -> { - replicateEntries(entries, inFlightTask); + replicateEntries(entries, inFlightTask, skippedReadAfterSent); }); }, 100, TimeUnit.MILLISECONDS); }; @@ -424,7 +426,12 @@ protected void replicateEntries(List entries, InFlightTask inFlightTask) return; } // Do replicate. - doReplicateEntries(entries, inFlightTask); + // If the previous "read more entries" was skipped due to the producer write buffer is full, we need to trigger + // once after replicated entries. But if "doReplicateEntries" already sent some messages, the event "read more + // entries" can be triggered by the receipt action of publishing. + if (skippedReadAfterSent && !doReplicateEntries(entries, inFlightTask)) { + readMoreEntries(); + } } protected abstract boolean doReplicateEntries(List entries, InFlightTask inFlightTask); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java index 76460b1918ba0..8119a6bf93514 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java @@ -179,7 +179,7 @@ public void testShadowReplicatorReleasesSourceEntryBuffer() throws Exception { new PersistentReplicator.InFlightTask( entry.getPosition(), entries.size(), replicator.getReplicatorId()); inFlightTask.setEntries(entries); - replicator.replicateEntries(entries, inFlightTask); + replicator.replicateEntries(entries, inFlightTask, false); Awaitility.await().untilAsserted(() -> { Assert.assertTrue(inFlightTask.isDone()); From 1c9c79d793e64e280c351c95bcdc3717435a294f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Jun 2026 18:25:32 +0800 Subject: [PATCH 09/10] involves shadow replicator --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 5 +++++ .../broker/service/persistent/PersistentReplicator.java | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 55ea9bcc0b761..f984dac689de6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -697,6 +697,11 @@ public void disconnectReplicatorsIfNoTrafficAndBacklog() { persistentReplicator.disconnectIfNoTrafficAndBacklog(); } } + for (Replicator replicator : getShadowReplicators().values()) { + if (replicator instanceof PersistentReplicator persistentReplicator) { + persistentReplicator.disconnectIfNoTrafficAndBacklog(); + } + } } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 7aa9288302b37..c55e35ff15513 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -429,7 +429,8 @@ protected void replicateEntries(List entries, InFlightTask inFlightTask, // If the previous "read more entries" was skipped due to the producer write buffer is full, we need to trigger // once after replicated entries. But if "doReplicateEntries" already sent some messages, the event "read more // entries" can be triggered by the receipt action of publishing. - if (skippedReadAfterSent && !doReplicateEntries(entries, inFlightTask)) { + boolean atLeastOneMessageSentForReplication = doReplicateEntries(entries, inFlightTask); + if (skippedReadAfterSent && !atLeastOneMessageSentForReplication) { readMoreEntries(); } } From 8d7a46aabf51c2fd63a0242725f6e8d59a707043 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Jun 2026 18:26:51 +0800 Subject: [PATCH 10/10] add little tests --- .../PersistentReplicatorInflightTaskTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java index 478437ffde015..e56d240087992 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java @@ -18,9 +18,15 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import java.util.ArrayList; @@ -167,6 +173,36 @@ public void testReadEntriesFailedCompletesInFlightTaskAfterReplicatorTerminated( } } + @Test + public void testReplicateEntriesProcessesBatchWhenReadWasNotSkipped() throws Exception { + PersistentReplicator replicator = spy(getReplicator(topicName)); + List entries = Collections.emptyList(); + InFlightTask task = new InFlightTask(PositionFactory.create(1, 1), 1, replicator.getReplicatorId()); + task.setEntries(entries); + doReturn(false).when(replicator).doReplicateEntries(any(), any()); + doNothing().when(replicator).readMoreEntries(); + + replicator.replicateEntries(entries, task, false); + + verify(replicator, times(1)).doReplicateEntries(entries, task); + verify(replicator, never()).readMoreEntries(); + } + + @Test + public void testReplicateEntriesResumesReadWhenNoMessagesWereSentAfterSkippedRead() throws Exception { + PersistentReplicator replicator = spy(getReplicator(topicName)); + List entries = Collections.emptyList(); + InFlightTask task = new InFlightTask(PositionFactory.create(1, 1), 1, replicator.getReplicatorId()); + task.setEntries(entries); + doReturn(false).when(replicator).doReplicateEntries(any(), any()); + doNothing().when(replicator).readMoreEntries(); + + replicator.replicateEntries(entries, task, true); + + verify(replicator, times(1)).doReplicateEntries(entries, task); + verify(replicator, times(1)).readMoreEntries(); + } + @Test public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception { log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");