Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,20 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Time in seconds that a persistent geo-replication replicator may stay idle before the broker"
+ " disconnects its replication producer. A replicator is eligible only when it has no backlog and"
+ " has not read entries for replication processing for longer than this threshold. Disconnecting"
+ " only releases the idle producer; the replicator and its cursor remain available, and the"
+ " producer is recreated automatically when new messages need to be replicated. Set this value to"
+ " 0 or a negative value to disable idle-replicator disconnection. The check runs with the"
+ " inactive-topic monitor, whose interval is brokerDeleteInactiveTopicsFrequencySeconds, and only"
+ " when brokerDeleteInactiveTopicsEnabled is true. The default is 86400 seconds (24 hours)."
)
private int brokerReplicationInactiveThresholdSeconds = 24 * 3600;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public abstract class AbstractReplicator implements Replicator {
private static final AtomicReferenceFieldUpdater<AbstractReplicator, Attributes> ATTRIBUTES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, Attributes.class, "attributes");

protected volatile long latestPublishTime = System.currentTimeMillis();

public enum State {
/**
* This enum has two mean meanings:
Expand Down Expand Up @@ -184,7 +186,7 @@ protected CompletableFuture<Void> prepareCreateProducer() {
return CompletableFuture.completedFuture(null);
}

public void startProducer() {
protected void startProducer() {
// Guarantee only one task call "producerBuilder.createAsync()".
Pair<Boolean, State> setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting);
if (!setStartingRes.getLeft()) {
Expand Down Expand Up @@ -313,8 +315,7 @@ protected CompletableFuture<Boolean> isLocalTopicActive() {
/**
* This method only be used by {@link PersistentTopic#checkGC} now.
*/
@Override
public CompletableFuture<Void> disconnect() {
protected CompletableFuture<Void> disconnect() {
long backlog = getNumberOfEntriesInBacklog();
if (backlog > 0) {
CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -389,8 +390,6 @@ protected CompletableFuture<Void> closeProducerAsync(boolean closeTheStartingPro
Pair<Boolean, State> setDisconnectedRes = compareSetAndGetState(State.Disconnecting, State.Disconnected);
if (setDisconnectedRes.getLeft()) {
this.producer = null;
// deactivate further read
disableReplicatorRead();
return;
}
if (setDisconnectedRes.getRight() == State.Terminating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
Expand Down Expand Up @@ -665,6 +666,19 @@ protected Consumer getActiveConsumer(Subscription subscription) {
return null;
}

protected boolean hasProducersActive() {
return !producers.isEmpty();
}

protected boolean hasActiveReplicators() {
for (Replicator replicator : getReplicators().values()) {
if (replicator.isConnected()) {
return true;
}
}
return false;
}

protected boolean hasLocalProducers() {
if (producers.isEmpty()) {
return false;
Expand All @@ -677,6 +691,19 @@ protected boolean hasLocalProducers() {
return false;
}

public void disconnectReplicatorsIfNoTrafficAndBacklog() {
for (Replicator replicator : getReplicators().values()) {
if (replicator instanceof PersistentReplicator persistentReplicator) {
persistentReplicator.disconnectIfNoTrafficAndBacklog();
}
}
for (Replicator replicator : getShadowReplicators().values()) {
if (replicator instanceof PersistentReplicator persistentReplicator) {
persistentReplicator.disconnectIfNoTrafficAndBacklog();
}
}
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("topic", topic).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,10 @@ protected void startInactivityMonitor() {
int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> checkGC(), interval, interval,
TimeUnit.SECONDS);
if (pulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds() > 0) {
Comment thread
poorbarcode marked this conversation as resolved.
inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> checkInactiveReplication(), interval,
interval, TimeUnit.SECONDS);
}
}

// Deduplication info checker
Expand Down Expand Up @@ -2411,6 +2415,14 @@ public void checkGC() {
forEachTopic(Topic::checkGC);
}

public void checkInactiveReplication() {
forEachTopic(topic -> {
if (topic instanceof AbstractTopic abstractTopic) {
abstractTopic.disconnectReplicatorsIfNoTrafficAndBacklog();
}
});
}

public void checkClusterMigration() {
forEachTopic(Topic::checkClusterMigration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,12 @@

public interface Replicator {

void startProducer();

Topic getLocalTopic();

ReplicatorStatsImpl computeStats();

CompletableFuture<Void> terminate();

CompletableFuture<Void> disconnect();

void updateRates();

String getRemoteCluster();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ protected String getProducerName() {
return getReplicatorName(replicatorPrefix, localCluster) + REPL_PRODUCER_NAME_DELIMITER + remoteCluster;
}

@Override
public void startProducer() {
super.startProducer();
}

@Override
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
this.producer = (ProducerImpl) producer;
Expand All @@ -89,6 +94,7 @@ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {

@SuppressWarnings("unchecked")
public void sendMessage(Entry entry) {
latestPublishTime = System.currentTimeMillis();
if ((STATE_UPDATER.get(this) == State.Started) && isWritable()) {

int length = entry.getLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,15 @@ private CompletableFuture<Void> createRemoteTopicIfDoesNotExist(String partition

@Override
@SuppressWarnings("unchecked")
protected boolean replicateEntries(List<Entry> entries, final InFlightTask inFlightTask) {
protected boolean doReplicateEntries(List<Entry> entries, final InFlightTask inFlightTask) {
boolean atLeastOneMessageSentForReplication = false;
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();

try {
// This flag is set to true when we skip at least one local message,
// in order to skip remaining local messages.
boolean isLocalMessageSkippedOnce = false;
boolean skipRemainingMessages = inFlightTask.isSkipReadResultDueToCursorRewind();
boolean skipRemainingMessages = false;
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
// Skip the messages since the replicator need to fetch the schema info to replicate the schema to the
Expand Down Expand Up @@ -239,13 +238,13 @@ protected boolean replicateEntries(List<Entry> entries, final InFlightTask inFli
continue;
}

if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
if (STATE_UPDATER.get(this) != State.Started || inFlightTask.isSkipReadResultDueToCursorRewind()) {
// The producer is not ready yet after having stopped/restarted. Drop the message because it will
// recover when the producer is ready
log.debug()
.attr("position", entry.getPosition())
.log("Dropping read message because producer is not ready");
isLocalMessageSkippedOnce = true;
skipRemainingMessages = true;
inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.service.AbstractReplicator.State.Disconnected;
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Disconnecting;
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Started;
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Starting;
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminated;
Expand Down Expand Up @@ -52,6 +54,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -151,13 +154,6 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man

@Override
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
// Repeat until there are no read operations in progress
if (STATE_UPDATER.get(this) == State.Starting && hasPendingRead() && !cursor.cancelPendingReadRequest()) {
brokerService.getPulsar().getExecutor()
.schedule(() -> setProducerAndTriggerReadEntries(producer), 10, TimeUnit.MILLISECONDS);
return;
}

/**
* 1. Try change state to {@link Started}.
* 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value
Expand All @@ -179,8 +175,6 @@ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
// activate cursor: so, entries can be cached.
this.cursor.setActive();

// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
cursor.rewind();
// read entries
readMoreEntries();
} else {
Expand Down Expand Up @@ -283,6 +277,30 @@ private AvailablePermits getRateLimiterAvailablePermits(int availablePermits) {
return new AvailablePermits((int) availablePermitsOnMsg, availablePermitsOnByte);
}

public void disconnectIfNoTrafficAndBacklog() {
// Disabled the feature.
int threshold = brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds();
if (threshold <= 0) {
return;
}
// Has backlog.
long backlog = getNumberOfEntriesInBacklog();
if (backlog > 0) {
return;
}
// Already disconnected.
if (state != Started) {
return;
}

// Disconnect if no backlog and no traffic for a long time.
if (System.currentTimeMillis() - latestPublishTime > threshold * 1000L) {
log.info().attr("brokerReplicationInactiveThresholdSeconds", threshold)
.log("Disconnecting replication producers since no producer is active for a long time.");
disconnect();
}
}

protected void readMoreEntries() {
if (state.equals(Terminated) || state.equals(Terminating)) {
return;
Expand Down Expand Up @@ -360,20 +378,64 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {

readFailureBackoff.reduceToHalf();

boolean atLeastOneMessageSentForReplication = replicateEntries(entries, inFlightTask);
boolean producerIsWritable = isWritable();
replicateEntries(entries, inFlightTask, !producerIsWritable);

if (atLeastOneMessageSentForReplication && !isWritable()) {
if (!producerIsWritable) {
// Don't read any more entries until the current pending entries are persisted
log.debug()
.attr("atLeastOneMessageSentForReplication", atLeastOneMessageSentForReplication)
.attr("isWritable", isWritable())
.log("Pausing replication traffic");
} else {
readMoreEntries();
}
}

protected abstract boolean replicateEntries(List<Entry> entries, InFlightTask inFlightTask);
protected void replicateEntries(List<Entry> entries, InFlightTask inFlightTask,
final boolean skippedReadAfterSent) {
latestPublishTime = System.currentTimeMillis();
// Release memory if terminated.
if (state == State.Terminated || state == State.Terminating
|| inFlightTask.isSkipReadResultDueToCursorRewind()) {
for (Entry entry : entries) {
inFlightTask.incCompletedEntries();
entry.release();
}
return;
}

// Retry to replicate messages if it is not started.
ManagedLedgerImpl ml = (ManagedLedgerImpl) cursor.getManagedLedger();
Runnable retryReplicateEntries = () -> {
ml.getScheduledExecutor().schedule(() -> {
ml.getExecutor().execute(() -> {
replicateEntries(entries, inFlightTask, skippedReadAfterSent);
});
}, 100, TimeUnit.MILLISECONDS);
};

// Retry.
if (state == Disconnecting || state == Starting) {
retryReplicateEntries.run();
return;
}
// Start producer and retry.
if (state == Disconnected) {
startProducer();
retryReplicateEntries.run();
return;
}
// Do replicate.
// If the previous "read more entries" was skipped due to the producer write buffer is full, we need to trigger
// once after replicated entries. But if "doReplicateEntries" already sent some messages, the event "read more
// entries" can be triggered by the receipt action of publishing.
boolean atLeastOneMessageSentForReplication = doReplicateEntries(entries, inFlightTask);
if (skippedReadAfterSent && !atLeastOneMessageSentForReplication) {
readMoreEntries();
}
}

protected abstract boolean doReplicateEntries(List<Entry> entries, InFlightTask inFlightTask);

protected CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws ExecutionException {
if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) {
Expand Down Expand Up @@ -932,15 +994,10 @@ protected CompletableFuture<Void> beforeDisconnect() {
.TopicBusyException("Cannot close a replicator with backlog"));
}
}
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Disconnecting);
return CompletableFuture.completedFuture(null);
}
}

protected void afterDisconnected() {
doRewindCursor(false);
}

protected void beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding reason) {
synchronized (inFlightTasks) {
boolean hasCanceledPendingRead = cursor.cancelPendingReadRequest();
Expand Down
Loading
Loading