Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.io.Closeable;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class PulsarStats implements Closeable {
private final boolean exposePublisherStats;

private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock();
private final Clock clock;

@Getter
private long updatedAt;
Expand All @@ -81,7 +83,7 @@ public PulsarStats(PulsarService pulsar) {

this.exposePublisherStats = pulsar.getConfiguration().isExposePublisherStats();
this.updatedAt = 0;

this.clock = pulsar.getClock();
}

@Override
Expand Down Expand Up @@ -230,7 +232,7 @@ public synchronized void updateStats(Map<String, Map<String, Map<String, Topic>>
} finally {
bufferLock.writeLock().unlock();
}
updatedAt = System.currentTimeMillis();
updatedAt = this.clock.millis();
}

public synchronized NamespaceBundleStats invalidBundleStats(String bundleName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.github.merlimat.slog.Logger;
import io.netty.buffer.ByteBuf;
import java.time.Clock;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class MessageDeduplication {
private final PersistentTopic topic;
private final ManagedLedger managedLedger;
private final ManagedLedgerReplayTask replayTask;
private final Clock clock;
private ManagedCursor managedCursor;

private static final String IS_LAST_CHUNK = "isLastChunk";
Expand Down Expand Up @@ -144,6 +146,7 @@ public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, Managed
this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers();
this.snapshotCounter = 0;
this.replicatorPrefix = pulsar.getConfiguration().getReplicatorPrefix();
this.clock = pulsar.getClock();
this.replayTask = new ManagedLedgerReplayTask("MessageDeduplication", pulsar.getExecutor(), 100);
this.log = LOG.with().attr("topic", topic.getName()).build();
}
Expand Down Expand Up @@ -601,7 +604,7 @@ private CompletableFuture<Void> takeSnapshot(Position position) {
log.debug()
.attr("position", position)
.log("Stored new deduplication snapshot at");
lastSnapshotTimestamp = System.currentTimeMillis();
lastSnapshotTimestamp = this.clock.millis();
snapshotTaking.set(false);
});
future.exceptionally(e -> {
Expand Down Expand Up @@ -636,14 +639,14 @@ public void producerRemoved(String producerName) {
}

// Producer is no-longer active
inactiveProducers.put(producerName, System.currentTimeMillis());
inactiveProducers.put(producerName, this.clock.millis());
}

/**
* Remove from hash maps all the producers that were inactive for more than the configured amount of time.
*/
public synchronized void purgeInactiveProducers() {
long minimumActiveTimestamp = System.currentTimeMillis() - TimeUnit.MINUTES
long minimumActiveTimestamp = this.clock.millis() - TimeUnit.MINUTES
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());

// if not enabled just clear all inactive producer record.
Expand Down Expand Up @@ -687,7 +690,7 @@ public void takeSnapshot() {
}

Integer interval = topic.getHierarchyTopicPolicies().getDeduplicationSnapshotIntervalSeconds().get();
long currentTimeStamp = System.currentTimeMillis();
long currentTimeStamp = this.clock.millis();
if (interval == null || interval <= 0
|| currentTimeStamp - lastSnapshotTimestamp < TimeUnit.SECONDS.toMillis(interval)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.opentelemetry.api.metrics.LongCounter;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.time.Clock;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -106,6 +107,7 @@ enum CompatibilityCheckResponse {
private static final Summary putOpsLatency = buildSummary("pulsar_schema_put_ops_latency", "-");

private final Map<String, Long> namespaceAccess = new ConcurrentHashMap<>();
private final Clock clock;
private final ScheduledFuture<?> future;

public SchemaRegistryStats(PulsarService pulsarService) {
Expand All @@ -120,6 +122,7 @@ public SchemaRegistryStats(PulsarService pulsarService) {
.setDescription("The number of Schema Registry compatibility check operations performed by the broker.")
.setUnit("{operation}")
.build();
this.clock = pulsarService.getClock();
}

private static Summary buildSummary(String name, String help) {
Expand Down Expand Up @@ -212,7 +215,7 @@ private String getNamespace(String schemaId) {
namespace = "unknown";
}

this.namespaceAccess.put(namespace, System.currentTimeMillis());
this.namespaceAccess.put(namespace, this.clock.millis());
return namespace;
}

Expand All @@ -236,7 +239,7 @@ public synchronized void close() throws Exception {

@Override
public void run() {
long now = System.currentTimeMillis();
long now = this.clock.millis();
long interval = TimeUnit.MINUTES.toMillis(5);

this.namespaceAccess.entrySet().removeIf(entry -> {
Expand Down