diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e5ecea2c44b7a..c6ca4b876a8fc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -812,6 +812,20 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null; + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Time in seconds that a persistent geo-replication replicator may stay idle before the broker" + + " disconnects its replication producer. A replicator is eligible only when it has no backlog and" + + " has not read entries for replication processing for longer than this threshold. Disconnecting" + + " only releases the idle producer; the replicator and its cursor remain available, and the" + + " producer is recreated automatically when new messages need to be replicated. Set this value to" + + " 0 or a negative value to disable idle-replicator disconnection. The check runs with the" + + " inactive-topic monitor, whose interval is brokerDeleteInactiveTopicsFrequencySeconds, and only" + + " when brokerDeleteInactiveTopicsEnabled is true. The default is 86400 seconds (24 hours)." + ) + private int brokerReplicationInactiveThresholdSeconds = 24 * 3600; + @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 5a5ec2df1fc4a..2342d2a78f623 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -86,6 +86,8 @@ public abstract class AbstractReplicator implements Replicator { private static final AtomicReferenceFieldUpdater ATTRIBUTES_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, Attributes.class, "attributes"); + protected volatile long latestPublishTime = System.currentTimeMillis(); + public enum State { /** * This enum has two mean meanings: @@ -184,7 +186,7 @@ protected CompletableFuture prepareCreateProducer() { return CompletableFuture.completedFuture(null); } - public void startProducer() { + protected void startProducer() { // Guarantee only one task call "producerBuilder.createAsync()". Pair setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting); if (!setStartingRes.getLeft()) { @@ -313,8 +315,7 @@ protected CompletableFuture isLocalTopicActive() { /** * This method only be used by {@link PersistentTopic#checkGC} now. */ - @Override - public CompletableFuture disconnect() { + protected CompletableFuture disconnect() { long backlog = getNumberOfEntriesInBacklog(); if (backlog > 0) { CompletableFuture disconnectFuture = new CompletableFuture<>(); @@ -389,8 +390,6 @@ protected CompletableFuture closeProducerAsync(boolean closeTheStartingPro Pair setDisconnectedRes = compareSetAndGetState(State.Disconnecting, State.Disconnected); if (setDisconnectedRes.getLeft()) { this.producer = null; - // deactivate further read - disableReplicatorRead(); return; } if (setDisconnectedRes.getRight() == State.Terminating diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 3ad9eb438104a..f984dac689de6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -67,6 +67,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; +import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; @@ -665,6 +666,19 @@ protected Consumer getActiveConsumer(Subscription subscription) { return null; } + protected boolean hasProducersActive() { + return !producers.isEmpty(); + } + + protected boolean hasActiveReplicators() { + for (Replicator replicator : getReplicators().values()) { + if (replicator.isConnected()) { + return true; + } + } + return false; + } + protected boolean hasLocalProducers() { if (producers.isEmpty()) { return false; @@ -677,6 +691,19 @@ protected boolean hasLocalProducers() { return false; } + public void disconnectReplicatorsIfNoTrafficAndBacklog() { + for (Replicator replicator : getReplicators().values()) { + if (replicator instanceof PersistentReplicator persistentReplicator) { + persistentReplicator.disconnectIfNoTrafficAndBacklog(); + } + } + for (Replicator replicator : getShadowReplicators().values()) { + if (replicator instanceof PersistentReplicator persistentReplicator) { + persistentReplicator.disconnectIfNoTrafficAndBacklog(); + } + } + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("topic", topic).toString(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d180048b1b316..60b2d199336e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -741,6 +741,10 @@ protected void startInactivityMonitor() { int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds(); inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> checkGC(), interval, interval, TimeUnit.SECONDS); + if (pulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds() > 0) { + inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> checkInactiveReplication(), interval, + interval, TimeUnit.SECONDS); + } } // Deduplication info checker @@ -2411,6 +2415,14 @@ public void checkGC() { forEachTopic(Topic::checkGC); } + public void checkInactiveReplication() { + forEachTopic(topic -> { + if (topic instanceof AbstractTopic abstractTopic) { + abstractTopic.disconnectReplicatorsIfNoTrafficAndBacklog(); + } + }); + } + public void checkClusterMigration() { forEachTopic(Topic::checkClusterMigration); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 86e2b6e74de89..1f781f0e98dcf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -25,16 +25,12 @@ public interface Replicator { - void startProducer(); - Topic getLocalTopic(); ReplicatorStatsImpl computeStats(); CompletableFuture terminate(); - CompletableFuture disconnect(); - void updateRates(); String getRemoteCluster(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 5be04738b67d3..3160d3e178cb8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -73,6 +73,11 @@ protected String getProducerName() { return getReplicatorName(replicatorPrefix, localCluster) + REPL_PRODUCER_NAME_DELIMITER + remoteCluster; } + @Override + public void startProducer() { + super.startProducer(); + } + @Override protected void setProducerAndTriggerReadEntries(Producer producer) { this.producer = (ProducerImpl) producer; @@ -89,6 +94,7 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { @SuppressWarnings("unchecked") public void sendMessage(Entry entry) { + latestPublishTime = System.currentTimeMillis(); if ((STATE_UPDATER.get(this) == State.Started) && isWritable()) { int length = entry.getLength(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index c6534b346c25b..fb49af07c0e04 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -146,7 +146,7 @@ private CompletableFuture createRemoteTopicIfDoesNotExist(String partition @Override @SuppressWarnings("unchecked") - protected boolean replicateEntries(List entries, final InFlightTask inFlightTask) { + protected boolean doReplicateEntries(List entries, final InFlightTask inFlightTask) { boolean atLeastOneMessageSentForReplication = false; boolean isEnableReplicatedSubscriptions = brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions(); @@ -154,8 +154,7 @@ protected boolean replicateEntries(List entries, final InFlightTask inFli try { // This flag is set to true when we skip at least one local message, // in order to skip remaining local messages. - boolean isLocalMessageSkippedOnce = false; - boolean skipRemainingMessages = inFlightTask.isSkipReadResultDueToCursorRewind(); + boolean skipRemainingMessages = false; for (int i = 0; i < entries.size(); i++) { Entry entry = entries.get(i); // Skip the messages since the replicator need to fetch the schema info to replicate the schema to the @@ -239,13 +238,13 @@ protected boolean replicateEntries(List entries, final InFlightTask inFli continue; } - if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { + if (STATE_UPDATER.get(this) != State.Started || inFlightTask.isSkipReadResultDueToCursorRewind()) { // The producer is not ready yet after having stopped/restarted. Drop the message because it will // recover when the producer is ready log.debug() .attr("position", entry.getPosition()) .log("Dropping read message because producer is not ready"); - isLocalMessageSkippedOnce = true; + skipRemainingMessages = true; inFlightTask.incCompletedEntries(); entry.release(); msg.recycle(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 8433d9b9c7ba2..c55e35ff15513 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Disconnected; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Disconnecting; import static org.apache.pulsar.broker.service.AbstractReplicator.State.Started; import static org.apache.pulsar.broker.service.AbstractReplicator.State.Starting; import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminated; @@ -52,6 +54,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; @@ -151,13 +154,6 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man @Override protected void setProducerAndTriggerReadEntries(Producer producer) { - // Repeat until there are no read operations in progress - if (STATE_UPDATER.get(this) == State.Starting && hasPendingRead() && !cursor.cancelPendingReadRequest()) { - brokerService.getPulsar().getExecutor() - .schedule(() -> setProducerAndTriggerReadEntries(producer), 10, TimeUnit.MILLISECONDS); - return; - } - /** * 1. Try change state to {@link Started}. * 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value @@ -179,8 +175,6 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { // activate cursor: so, entries can be cached. this.cursor.setActive(); - // Rewind the cursor to be sure to read again all non-acked messages sent while restarting - cursor.rewind(); // read entries readMoreEntries(); } else { @@ -283,6 +277,30 @@ private AvailablePermits getRateLimiterAvailablePermits(int availablePermits) { return new AvailablePermits((int) availablePermitsOnMsg, availablePermitsOnByte); } + public void disconnectIfNoTrafficAndBacklog() { + // Disabled the feature. + int threshold = brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds(); + if (threshold <= 0) { + return; + } + // Has backlog. + long backlog = getNumberOfEntriesInBacklog(); + if (backlog > 0) { + return; + } + // Already disconnected. + if (state != Started) { + return; + } + + // Disconnect if no backlog and no traffic for a long time. + if (System.currentTimeMillis() - latestPublishTime > threshold * 1000L) { + log.info().attr("brokerReplicationInactiveThresholdSeconds", threshold) + .log("Disconnecting replication producers since no producer is active for a long time."); + disconnect(); + } + } + protected void readMoreEntries() { if (state.equals(Terminated) || state.equals(Terminating)) { return; @@ -360,12 +378,12 @@ public void readEntriesComplete(List entries, Object ctx) { readFailureBackoff.reduceToHalf(); - boolean atLeastOneMessageSentForReplication = replicateEntries(entries, inFlightTask); + boolean producerIsWritable = isWritable(); + replicateEntries(entries, inFlightTask, !producerIsWritable); - if (atLeastOneMessageSentForReplication && !isWritable()) { + if (!producerIsWritable) { // Don't read any more entries until the current pending entries are persisted log.debug() - .attr("atLeastOneMessageSentForReplication", atLeastOneMessageSentForReplication) .attr("isWritable", isWritable()) .log("Pausing replication traffic"); } else { @@ -373,7 +391,51 @@ public void readEntriesComplete(List entries, Object ctx) { } } - protected abstract boolean replicateEntries(List entries, InFlightTask inFlightTask); + protected void replicateEntries(List entries, InFlightTask inFlightTask, + final boolean skippedReadAfterSent) { + latestPublishTime = System.currentTimeMillis(); + // Release memory if terminated. + if (state == State.Terminated || state == State.Terminating + || inFlightTask.isSkipReadResultDueToCursorRewind()) { + for (Entry entry : entries) { + inFlightTask.incCompletedEntries(); + entry.release(); + } + return; + } + + // Retry to replicate messages if it is not started. + ManagedLedgerImpl ml = (ManagedLedgerImpl) cursor.getManagedLedger(); + Runnable retryReplicateEntries = () -> { + ml.getScheduledExecutor().schedule(() -> { + ml.getExecutor().execute(() -> { + replicateEntries(entries, inFlightTask, skippedReadAfterSent); + }); + }, 100, TimeUnit.MILLISECONDS); + }; + + // Retry. + if (state == Disconnecting || state == Starting) { + retryReplicateEntries.run(); + return; + } + // Start producer and retry. + if (state == Disconnected) { + startProducer(); + retryReplicateEntries.run(); + return; + } + // Do replicate. + // If the previous "read more entries" was skipped due to the producer write buffer is full, we need to trigger + // once after replicated entries. But if "doReplicateEntries" already sent some messages, the event "read more + // entries" can be triggered by the receipt action of publishing. + boolean atLeastOneMessageSentForReplication = doReplicateEntries(entries, inFlightTask); + if (skippedReadAfterSent && !atLeastOneMessageSentForReplication) { + readMoreEntries(); + } + } + + protected abstract boolean doReplicateEntries(List entries, InFlightTask inFlightTask); protected CompletableFuture getSchemaInfo(MessageImpl msg) throws ExecutionException { if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) { @@ -932,15 +994,10 @@ protected CompletableFuture beforeDisconnect() { .TopicBusyException("Cannot close a replicator with backlog")); } } - beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Disconnecting); return CompletableFuture.completedFuture(null); } } - protected void afterDisconnected() { - doRewindCursor(false); - } - protected void beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding reason) { synchronized (inFlightTasks) { boolean hasCanceledPendingRead = cursor.cancelPendingReadRequest(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c9bcad341fcd7..1cec7d5e59a95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -836,9 +836,7 @@ public CompletableFuture> addProducer(Producer producer, CompletableFuture producerQueuedFuture) { return super.addProducer(producer, producerQueuedFuture).thenCompose(topicEpoch -> { messageDeduplication.producerAdded(producer.getProducerName()); - - // Start replication producers if not already - return startReplProducers().thenApply(__ -> topicEpoch); + return CompletableFuture.completedFuture(topicEpoch); }); } @@ -888,46 +886,6 @@ private boolean hasRemoteProducers() { return false; } - public CompletableFuture startReplProducers() { - // read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close - return brokerService.pulsar().getPulsarResources().getNamespaceResources() - .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) - .thenAcceptAsync(optPolicies -> { - if (optPolicies.isPresent()) { - if (optPolicies.get().replication_clusters != null) { - Set configuredClusters = Sets.newTreeSet(optPolicies.get().replication_clusters); - replicators.forEach((region, replicator) -> { - if (configuredClusters.contains(region)) { - replicator.startProducer(); - } - }); - } - } else { - replicators.forEach((region, replicator) -> replicator.startProducer()); - } - }, getOrderedExecutor()).exceptionally(ex -> { - log.debug() - .exceptionMessage(ex) - .log("Error getting policies while starting repl-producers"); - replicators.forEach((region, replicator) -> replicator.startProducer()); - return null; - }); - } - - public CompletableFuture stopReplProducers() { - List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate())); - shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.terminate())); - return FutureUtil.waitForAll(closeFutures); - } - - private synchronized CompletableFuture closeReplProducersIfNoBacklog() { - List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect())); - return FutureUtil.waitForAll(closeFutures); - } - @Override protected void handleProducerRemoved(Producer producer) { super.handleProducerRemoved(producer); @@ -3355,7 +3313,7 @@ public boolean isActive(InactiveTopicDeleteMode deleteMode) { break; } // no local producers - return hasLocalProducers(); + return hasProducersActive() || hasActiveReplicators(); } private boolean hasBacklogs(boolean getPreciseBacklog) { @@ -3576,42 +3534,8 @@ public void checkGC() { // Topic activity is still within the retention period return; } else { - CompletableFuture replCloseFuture = new CompletableFuture<>(); - - // Close repl producers first. - // Once all repl producers are closed, we can delete the topic, - // provided no remote producers connected to the broker. - log.debug() - .attr("maxInactiveDurationInSec", maxInactiveDurationInSec) - .log("Topic inactive for seconds, closing repl producers."); - /** - * There is a race condition that may cause a NPE: - * - task 1: a callback of "replicator.cursor.asyncRead" will trigger a replication. - * - task 2: "closeReplProducersIfNoBacklog" called by current thread will make the variable - * "replicator.producer" to a null value. - * Race condition: task 1 will get a NPE when it tries to send messages using the variable - * "replicator.producer", because task 2 will set this variable to "null". - * TODO Create a separate PR to fix it. - */ - closeReplProducersIfNoBacklog().thenRun(() -> { - if (hasRemoteProducers()) { - log.debug("Topic has connected remote producers, not a candidate for GC"); - replCloseFuture - .completeExceptionally(new TopicBusyException("Topic has connected remote producers")); - } else { - log.info() - .attr("maxInactiveDurationInSec", maxInactiveDurationInSec) - .log("Topic inactive, closed repl producers"); - replCloseFuture.complete(null); - } - }).exceptionally(e -> { - log.debug("Topic has replication backlog. Not a candidate for GC"); - replCloseFuture.completeExceptionally(e.getCause()); - return null; - }); - - replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions, - deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false)) + delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions, + deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false) .thenCompose((res) -> tryToDeletePartitionedMetadata()) .thenRun(() -> log.info("Topic deleted successfully due to inactivity")) .exceptionally(e -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 0b4e8d1df47fb..95c306f486d95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -159,9 +159,6 @@ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest requ + "configured for that cluster. Ignoring the request."); return; } - if (!replicator.isConnected()) { - topic.startReplProducers(); - } // Send response containing the current last written message id. The response // marker we're publishing locally and then replicating will have a higher diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index 5d6f6f688cb9f..20c5303b3423c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -61,14 +61,13 @@ protected String getProducerName() { @Override @SuppressWarnings("unchecked") - protected boolean replicateEntries(List entries, InFlightTask inFlightTask) { + protected boolean doReplicateEntries(List entries, InFlightTask inFlightTask) { boolean atLeastOneMessageSentForReplication = false; try { // This flag is set to true when we skip at least one local message, // in order to skip remaining local messages. - boolean isLocalMessageSkippedOnce = false; - boolean skipRemainingMessages = inFlightTask.isSkipReadResultDueToCursorRewind(); + boolean skipRemainingMessages = false; for (int i = 0; i < entries.size(); i++) { Entry entry = entries.get(i); // Skip the messages since the replicator need to fetch the schema info to replicate the schema to the @@ -108,13 +107,13 @@ protected boolean replicateEntries(List entries, InFlightTask inFlightTas continue; } - if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { + if (STATE_UPDATER.get(this) != State.Started || inFlightTask.isSkipReadResultDueToCursorRewind()) { // The producer is not ready yet after having stopped/restarted. Drop the message because it will // recovered when the producer is ready log.debug() .attr("position", entry.getPosition()) .log("Dropping read message because producer is not ready"); - isLocalMessageSkippedOnce = true; + skipRemainingMessages = true; inFlightTask.incCompletedEntries(); entry.release(); msg.recycle(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index c4f5bdeab74e7..2a02d0436fe37 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -58,6 +58,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -79,6 +82,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerTest; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; @@ -129,6 +133,8 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.impl.DualMetadataStore; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.glassfish.jersey.client.JerseyClient; @@ -156,6 +162,11 @@ public void cleanup() throws Exception { super.cleanup(); } + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + } + @Test(timeOut = 45 * 1000) public void testReceiverSideReplicationStats() throws Exception { final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); @@ -235,6 +246,207 @@ public void testDeleteTopicWhenReplicating() throws Exception { }); } + @DataProvider + public Object[][] paramsDisconnectReplicator() { + // Binary way replication. + // local producers on cluster-1 registered. + // local producers on cluster-1 have traffic. + // replicator producer from cluster-2 has traffic. + // replicator producer from cluster-2 is present. + return new Object[][] { + {true, true, false, true, true}, // verify-cluster-2: no replicator terminate occurs. + {true, true, false, false, true}, // verify-cluster-2: replicator terminated and resumed. + {true, true, false, false, false}, // verify-cluster-2: replicator terminated and resumed. + {true, false, false, true, true}, // verify-cluster-2: no replicator terminate occurs. + {true, false, false, false, true}, // verify-cluster-2: replicator terminated and resumed. + {true, false, false, false, false}, // verify-cluster-2: replicator terminated and resumed. + + {false, false, false, false, false}, // verify-cluster-2: replicator terminated and resumed. + {false, true, false, false, false} // verify-cluster-2: replicator terminated and resumed. + }; + } + + @Test(timeOut = 240 * 1000, dataProvider = "paramsDisconnectReplicator") + public void testDisconnectAndReconnectReplicator(boolean binaryWayRepl, + boolean hasLocalProducerRegistered, + boolean localProducerHasTraffic, + boolean hasRemoteProducerTraffic, + boolean hasRemoteProducerRegistered) throws Exception { + ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(1); + ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(1); + ScheduledFuture checkInactiveTopic = executor1.scheduleWithFixedDelay(() -> { + pulsar1.getBrokerService().checkInactiveReplication(); + }, 10, 10, TimeUnit.SECONDS); + // local cluster: let inactive replicator check faster. + int replicationInactiveThresholdSeconds1 = pulsar1.getConfig().getBrokerReplicationInactiveThresholdSeconds(); + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(30); + // remote cluster: let inactive topic deletion never occur. + int replicationInactiveThresholdSeconds2 = pulsar2.getConfig().getBrokerReplicationInactiveThresholdSeconds(); + pulsar2.getConfig().setBrokerReplicationInactiveThresholdSeconds(3600 * 24); + // Lat topic GC does not execute. + int inactiveTopicsMaxInactiveDurationSeconds = pulsar1.getConfig() + .getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(); + pulsar1.getConfig().setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(3600 * 24); + + // Check params. + if (hasRemoteProducerTraffic && !hasRemoteProducerRegistered) { + throw new Exception("If has traffic from remote cluster, the param \"hasRemoteProducer\" can not be false"); + } + // Check params. + if (localProducerHasTraffic && !hasLocalProducerRegistered) { + throw new Exception("If has local traffic, the param \"localProducerEmpty\" can not be true"); + } + + ScheduledFuture scheduledPublish1 = null; + ScheduledFuture scheduledPublish2 = null; + final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + + // Init by params: local producers. + final Producer producer1A = client1.newProducer(Schema.STRING).topic(topic).create(); + Producer producer1B = null; + if (!hasLocalProducerRegistered) { + producer1A.close(); + } + // Init by params: local producer traffic. + if (localProducerHasTraffic) { + AtomicInteger msgCount = new AtomicInteger(); + scheduledPublish1 = executor1.scheduleWithFixedDelay(() -> { + producer1A.sendAsync(msgCount.incrementAndGet() + ""); + }, 1, 1, TimeUnit.SECONDS); + } + // Init by params: binary way replication. + waitReplicatorStarted(topic, pulsar2); + if (binaryWayRepl) { + admin2.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topic, pulsar1); + } + final PersistentTopic persistentTopic1 = (PersistentTopic) broker1.getTopic(topic, false).join().get(); + final PersistentTopic persistentTopic2 = (PersistentTopic) broker2.getTopic(topic, false).join().get(); + // Init by params: remote producer traffic. + final Producer producer2 = client2.newProducer(Schema.STRING).topic(topic).create(); + if (hasRemoteProducerTraffic) { + AtomicInteger msgCount = new AtomicInteger(); + scheduledPublish2 = executor2.scheduleWithFixedDelay(() -> { + producer2.sendAsync(msgCount.incrementAndGet() + ""); + }, 1, 1, TimeUnit.SECONDS); + } + // Init by params: remote producers. + if (binaryWayRepl && !hasRemoteProducerTraffic && !hasRemoteProducerRegistered) { + persistentTopic2.getReplicators().get(cluster1).terminate(); + } + + // Verify: all states match params. + Thread.sleep(3000); + // All states match: local producers. + if (!hasLocalProducerRegistered) { + assertFalse(persistentTopic1.getProducers().values().stream() + .filter(p -> !p.isRemote()).findAny().isPresent()); + } else { + Optional serviceProducer1 = persistentTopic1.getProducers() + .values().stream().filter(p -> !p.isRemote()).findAny(); + assertTrue(serviceProducer1.isPresent()); +// if (localProducerHasTraffic) { +// assertTrue(System.currentTimeMillis() - serviceProducer1.get().getLatestPublishTime() < 1_500); +// } else { +// assertTrue(System.currentTimeMillis() - serviceProducer1.get().getLatestPublishTime() > 2_500); +// } + } + // All states match: remote producers. + if (binaryWayRepl) { + if (!hasRemoteProducerRegistered) { + assertFalse(persistentTopic1.getProducers().values().stream() + .filter(p -> p.isRemote()).findAny().isPresent()); + } else { + Optional serviceProducer1 = persistentTopic1.getProducers() + .values().stream().filter(p -> p.isRemote()).findAny(); + assertTrue(serviceProducer1.isPresent()); +// if (hasRemoteProducerTraffic) { +// assertTrue(System.currentTimeMillis() - serviceProducer1.get().getLatestPublishTime() < 1_500); +// } else { +// assertTrue(System.currentTimeMillis() - serviceProducer1.get().getLatestPublishTime() > 2_500); +// } + } + } + + // Verify: replicator terminated or not. + if (hasRemoteProducerTraffic || localProducerHasTraffic) { + long verifyStartTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - verifyStartTime < 100_000) { + assertFalse(persistentTopic1.getReplicators().isEmpty()); + PersistentReplicator persistentReplicator = + (PersistentReplicator) persistentTopic1.getReplicators().get(cluster2); + assertTrue(persistentReplicator.isConnected()); + assertEquals(persistentReplicator.getState(), AbstractReplicator.State.Started); + Thread.sleep(1000); + } + } else { + // TODO terminate 的判断逻辑搬运到 replicator 中。 + // resume 的判断逻辑可言在 scheduled task 中,也可以在 cursor pending read 中。 + Thread.sleep(100_000); + assertFalse(persistentTopic1.getReplicators().isEmpty()); + PersistentReplicator persistentReplicatorA = + (PersistentReplicator) persistentTopic1.getReplicators().get(cluster2); + assertFalse(persistentReplicatorA.isConnected()); + assertEquals(persistentReplicatorA.getState(), AbstractReplicator.State.Disconnected); + + // Verify: resume. +// if (!hasRemoteProducerRegistered) { +// persistentTopic2.getReplicators().get(cluster1).startProducer(); +// Awaitility.await().untilAsserted(() -> { +// assertTrue(persistentTopic2.getReplicators().get(cluster1).isConnected()); +// }); +// } + if (hasRemoteProducerRegistered && !hasRemoteProducerTraffic) { + producer2.send("msg-remote"); + } + if (!hasLocalProducerRegistered) { + producer1B = client1.newProducer(Schema.STRING).topic(topic).create(); + producer1B.send("msg-local"); + } else { + producer1A.send("msg-local"); + } + Awaitility.await().untilAsserted(() -> { + assertFalse(persistentTopic1.getReplicators().isEmpty()); + PersistentReplicator persistentReplicatorB = + (PersistentReplicator) persistentTopic1.getReplicators().get(cluster2); + assertTrue(persistentReplicatorB.isConnected()); + assertEquals(persistentReplicatorB.getState(), AbstractReplicator.State.Started); + }); + } + + // cleanup. + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(replicationInactiveThresholdSeconds1); + pulsar2.getConfig().setBrokerReplicationInactiveThresholdSeconds(replicationInactiveThresholdSeconds2); + pulsar1.getConfig().setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds( + inactiveTopicsMaxInactiveDurationSeconds); + if (scheduledPublish1 != null) { + scheduledPublish1.cancel(true); + } + if (scheduledPublish2 != null) { + scheduledPublish2.cancel(true); + } + checkInactiveTopic.cancel(true); + if (producer1A.isConnected()) { + producer1A.close(); + } + if (producer1B != null && producer1B.isConnected()) { + producer1B.close(); + } + if (producer2.isConnected()) { + producer2.close(); + } + if (binaryWayRepl) { + admin2.topics().setReplicationClusters(topic, Arrays.asList(cluster2)); + waitReplicatorStopped(pulsar2, pulsar1, topic); + } + cleanupTopics(() -> { + admin1.topics().delete(topic); + admin2.topics().delete(topic); + }); + executor1.shutdown(); + executor2.shutdown(); + } + @Test(timeOut = 45 * 1000) public void testReplicatorProducerStatInTopic() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 0aec5358be6f8..cc0f85ce301da 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -44,6 +44,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -55,6 +56,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; @@ -408,8 +410,13 @@ protected void waitReplicatorStarted(String topicName, PulsarService remoteClust Awaitility.await().untilAsserted(() -> { Optional topicOptional2 = remoteCluster.getBrokerService().getTopic(topicName, false).get(); assertTrue(topicOptional2.isPresent()); - PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); - assertFalse(persistentTopic2.getProducers().isEmpty()); + if (TopicName.get(topicName).getDomain().equals(TopicDomain.persistent)) { + PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); + assertFalse(persistentTopic2.getProducers().isEmpty()); + } else { + NonPersistentTopic nonPersistentTopic2 = (NonPersistentTopic) topicOptional2.get(); + assertFalse(nonPersistentTopic2.getProducers().isEmpty()); + } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java index 89fc126e49c90..eaeedfd4a07a1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java @@ -85,6 +85,16 @@ public void testDeleteTopicWhenReplicating() throws Exception { super.testDeleteTopicWhenReplicating(); } + @Test(enabled = false) + public void testDisconnectAndReconnectReplicator(boolean binaryWayRepl, + boolean hasLocalProducerRegistered, + boolean localProducerHasTraffic, + boolean hasRemoteProducerTraffic, + boolean hasRemoteProducerRegistered) throws Exception { + super.testDisconnectAndReconnectReplicator(binaryWayRepl, hasLocalProducerRegistered, localProducerHasTraffic, + hasRemoteProducerTraffic, hasRemoteProducerRegistered); + } + @Override @Test(enabled = false) public void testReplicatorProducerStatInTopic() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 6bbc09d62ba36..1d19f361e9b66 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -54,6 +54,8 @@ import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; @@ -823,6 +825,82 @@ public void testSystemTopicCreationWithDifferentTopicCreationRule(int localSyste admin2.topics().delete(tp, false); } + @Test(enabled = false) + public void testDisconnectAndReconnectReplicator(boolean binaryWayRepl, + boolean hasLocalProducerRegistered, + boolean localProducerHasTraffic, + boolean hasRemoteProducerTraffic, + boolean hasRemoteProducerRegistered) throws Exception { + super.testDisconnectAndReconnectReplicator(binaryWayRepl, hasLocalProducerRegistered, localProducerHasTraffic, + hasRemoteProducerTraffic, hasRemoteProducerRegistered); + } + + @Test + public void testTopicGCDoesNotDisconnectReplicatorWhenRemoteProducerIsActive() throws Exception { + int replicationInactiveThresholdSeconds = pulsar1.getConfig().getBrokerReplicationInactiveThresholdSeconds(); + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(3600); + final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topic); + Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create(); + + try { + producer1.send("msg-1"); + waitReplicatorStarted(topic, pulsar1); + waitReplicatorStarted(topic, pulsar2); + PersistentTopic persistentTopic2 = (PersistentTopic) broker2.getTopic(topic, false) + .join().get(); + + // Set inactive policies. + InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(); + inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); + inactiveTopicPolicies.setMaxInactiveDurationSeconds(10); + inactiveTopicPolicies.setDeleteWhileInactive(true); + admin2.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies); + + // Ensure policies were set successfully. + Awaitility.await().untilAsserted(() -> { + assertFalse(persistentTopic2.getProducers().values().stream() + .anyMatch(producer -> !producer.isRemote())); + assertTrue(persistentTopic2.getSubscriptions().isEmpty()); + assertTrue(persistentTopic2.getInactiveTopicPolicies().isDeleteWhileInactive()); + assertEquals(persistentTopic2.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 10); + + Replicator replicator = persistentTopic2.getReplicators().get(cluster1); + assertNotNull(replicator); + assertTrue(replicator.isConnected()); + assertEquals(replicator.getNumberOfEntriesInBacklog(), 0); + }); + + // Trigger GC. + persistentTopic2.disconnectReplicatorsIfNoTrafficAndBacklog(); + persistentTopic2.checkGC(); + Thread.sleep(15 * 1000); + persistentTopic2.disconnectReplicatorsIfNoTrafficAndBacklog(); + persistentTopic2.checkGC(); + + // Verify: the replication is not disconnected due to Topic GC. + Replicator replicator = persistentTopic2.getReplicators().get(cluster1); + assertNotNull(replicator); + assertTrue(replicator.isConnected()); + + // Verify: the replication still works. + producer1.send("msg-2"); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin2.topics().getStats(topic).getReplication().get(cluster1).getReplicationBacklog(), 0); + }); + + } finally { + pulsar1.getConfig().setBrokerReplicationInactiveThresholdSeconds(replicationInactiveThresholdSeconds); + producer1.close(); + admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1)); + admin2.topics().setReplicationClusters(topic, Arrays.asList(cluster2)); + waitReplicatorStopped(topic, pulsar1, pulsar2, false); + waitReplicatorStopped(topic, pulsar2, pulsar1, false); + admin1.topics().delete(topic, false); + admin2.topics().delete(topic, false); + } + } + @Test public void testUpdateNamespacePolicies() throws Exception { // Create a namespace and allow both clusters to access. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 3a2774bce2126..a4e9958b7b794 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1681,17 +1681,19 @@ private PulsarAdmin mockReplicationAdmin() { } /** - * NonPersistentReplicator.removeReplicator doesn't remove replicator in atomic way and does in multiple step: - * 1. disconnect replicator producer + * PersistentReplicator.removeReplicator doesn't remove replicator in atomic way and does in multiple step: + * 1. Turn off replication. *

- * 2. close cursor + * 2. Broker will do two things: + * 2-1. terminate replication. + * 2-2. delete cursor that named "repl.x" *

- * 3. remove from replicator-list. + * 3. remove the terminated replicator from "topic.replicators". *

* - * If we try to startReplicationProducer before step-c finish then it should not avoid restarting repl-producer. - * - * @throws Exception + * Test: + * do: try to restart replicator producer before step-2-2 finish. + * verify: the replicator producer will not be started. */ @Test @SuppressWarnings("unchecked") @@ -1749,9 +1751,14 @@ public CompletableFuture createAsync() { // step-2 now, policies doesn't have removed replication cluster so, it should not invoke "startProducer" of the // replicator // try to start replicator again - topic.startReplProducers().join(); + Awaitility.await().untilAsserted(() -> { + assertEquals(replicator.getState(), AbstractReplicator.State.Terminated); + }); + replicator.startProducer(); + Thread.sleep(10_000); // verify: replicator.startProducer is not invoked - verify(replicator, Mockito.times(1)).startProducer(); + assertEquals(replicator.getState(), AbstractReplicator.State.Terminated); + assertFalse(replicator.isConnected()); // step-3 : complete the callback to remove replicator from the list ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteCursorCallback.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java index 478437ffde015..e56d240087992 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java @@ -18,9 +18,15 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import java.util.ArrayList; @@ -167,6 +173,36 @@ public void testReadEntriesFailedCompletesInFlightTaskAfterReplicatorTerminated( } } + @Test + public void testReplicateEntriesProcessesBatchWhenReadWasNotSkipped() throws Exception { + PersistentReplicator replicator = spy(getReplicator(topicName)); + List entries = Collections.emptyList(); + InFlightTask task = new InFlightTask(PositionFactory.create(1, 1), 1, replicator.getReplicatorId()); + task.setEntries(entries); + doReturn(false).when(replicator).doReplicateEntries(any(), any()); + doNothing().when(replicator).readMoreEntries(); + + replicator.replicateEntries(entries, task, false); + + verify(replicator, times(1)).doReplicateEntries(entries, task); + verify(replicator, never()).readMoreEntries(); + } + + @Test + public void testReplicateEntriesResumesReadWhenNoMessagesWereSentAfterSkippedRead() throws Exception { + PersistentReplicator replicator = spy(getReplicator(topicName)); + List entries = Collections.emptyList(); + InFlightTask task = new InFlightTask(PositionFactory.create(1, 1), 1, replicator.getReplicatorId()); + task.setEntries(entries); + doReturn(false).when(replicator).doReplicateEntries(any(), any()); + doNothing().when(replicator).readMoreEntries(); + + replicator.replicateEntries(entries, task, true); + + verify(replicator, times(1)).doReplicateEntries(entries, task); + verify(replicator, times(1)).readMoreEntries(); + } + @Test public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception { log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java index 21ead24cdd463..8119a6bf93514 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java @@ -179,7 +179,7 @@ public void testShadowReplicatorReleasesSourceEntryBuffer() throws Exception { new PersistentReplicator.InFlightTask( entry.getPosition(), entries.size(), replicator.getReplicatorId()); inFlightTask.setEntries(entries); - Assert.assertTrue(replicator.replicateEntries(entries, inFlightTask)); + replicator.replicateEntries(entries, inFlightTask, false); Awaitility.await().untilAsserted(() -> { Assert.assertTrue(inFlightTask.isDone());