diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index fd2592cb58405..d4c13d8cb0200 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -22,6 +22,7 @@ import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; import java.io.Closeable; +import java.time.Clock; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -61,6 +62,7 @@ public class PulsarStats implements Closeable { private final boolean exposePublisherStats; private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock(); + private final Clock clock; @Getter private long updatedAt; @@ -81,7 +83,7 @@ public PulsarStats(PulsarService pulsar) { this.exposePublisherStats = pulsar.getConfiguration().isExposePublisherStats(); this.updatedAt = 0; - + this.clock = pulsar.getClock(); } @Override @@ -230,7 +232,7 @@ public synchronized void updateStats(Map> } finally { bufferLock.writeLock().unlock(); } - updatedAt = System.currentTimeMillis(); + updatedAt = this.clock.millis(); } public synchronized NamespaceBundleStats invalidBundleStats(String bundleName) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 2db01ccc94624..42fd9c3903b2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import io.github.merlimat.slog.Logger; import io.netty.buffer.ByteBuf; +import java.time.Clock; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -60,6 +61,7 @@ public class MessageDeduplication { private final PersistentTopic topic; private final ManagedLedger managedLedger; private final ManagedLedgerReplayTask replayTask; + private final Clock clock; private ManagedCursor managedCursor; private static final String IS_LAST_CHUNK = "isLastChunk"; @@ -144,6 +146,7 @@ public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, Managed this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers(); this.snapshotCounter = 0; this.replicatorPrefix = pulsar.getConfiguration().getReplicatorPrefix(); + this.clock = pulsar.getClock(); this.replayTask = new ManagedLedgerReplayTask("MessageDeduplication", pulsar.getExecutor(), 100); this.log = LOG.with().attr("topic", topic.getName()).build(); } @@ -601,7 +604,7 @@ private CompletableFuture takeSnapshot(Position position) { log.debug() .attr("position", position) .log("Stored new deduplication snapshot at"); - lastSnapshotTimestamp = System.currentTimeMillis(); + lastSnapshotTimestamp = this.clock.millis(); snapshotTaking.set(false); }); future.exceptionally(e -> { @@ -636,14 +639,14 @@ public void producerRemoved(String producerName) { } // Producer is no-longer active - inactiveProducers.put(producerName, System.currentTimeMillis()); + inactiveProducers.put(producerName, this.clock.millis()); } /** * Remove from hash maps all the producers that were inactive for more than the configured amount of time. */ public synchronized void purgeInactiveProducers() { - long minimumActiveTimestamp = System.currentTimeMillis() - TimeUnit.MINUTES + long minimumActiveTimestamp = this.clock.millis() - TimeUnit.MINUTES .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()); // if not enabled just clear all inactive producer record. @@ -687,7 +690,7 @@ public void takeSnapshot() { } Integer interval = topic.getHierarchyTopicPolicies().getDeduplicationSnapshotIntervalSeconds().get(); - long currentTimeStamp = System.currentTimeMillis(); + long currentTimeStamp = this.clock.millis(); if (interval == null || interval <= 0 || currentTimeStamp - lastSnapshotTimestamp < TimeUnit.SECONDS.toMillis(interval)) { return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java index b1a7dc2a54133..ffe827646a897 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java @@ -25,6 +25,7 @@ import io.opentelemetry.api.metrics.LongCounter; import io.prometheus.client.Counter; import io.prometheus.client.Summary; +import java.time.Clock; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; @@ -106,6 +107,7 @@ enum CompatibilityCheckResponse { private static final Summary putOpsLatency = buildSummary("pulsar_schema_put_ops_latency", "-"); private final Map namespaceAccess = new ConcurrentHashMap<>(); + private final Clock clock; private final ScheduledFuture future; public SchemaRegistryStats(PulsarService pulsarService) { @@ -120,6 +122,7 @@ public SchemaRegistryStats(PulsarService pulsarService) { .setDescription("The number of Schema Registry compatibility check operations performed by the broker.") .setUnit("{operation}") .build(); + this.clock = pulsarService.getClock(); } private static Summary buildSummary(String name, String help) { @@ -212,7 +215,7 @@ private String getNamespace(String schemaId) { namespace = "unknown"; } - this.namespaceAccess.put(namespace, System.currentTimeMillis()); + this.namespaceAccess.put(namespace, this.clock.millis()); return namespace; } @@ -236,7 +239,7 @@ public synchronized void close() throws Exception { @Override public void run() { - long now = System.currentTimeMillis(); + long now = this.clock.millis(); long interval = TimeUnit.MINUTES.toMillis(5); this.namespaceAccess.entrySet().removeIf(entry -> {