diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f10b2cbdc322e..ea1a5790ab7ca 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1356,6 +1356,152 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private int scalableTopicConsumerSessionGracePeriodSeconds = 60; + /**** --- Scalable topic auto split/merge (PIP-483). --- ****/ + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Cluster-wide default for scalable-topic auto split/merge. When true, the controller " + + "leader automatically splits hot segments and merges cold ones, within the caps " + + "below. Can be overridden per-namespace and per-topic." + ) + private boolean scalableTopicAutoScaleEnabled = true; + + @FieldContext( + dynamic = false, + category = CATEGORY_POLICIES, + doc = "Cadence (seconds) of the controller's periodic traffic-driven auto split/merge " + + "evaluation. Consumer-count changes are handled event-driven and are not affected " + + "by this interval. Read when a controller wins leadership; not dynamic." + ) + private int scalableTopicAutoScaleIntervalSeconds = 60; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Hard ceiling on the number of active segments a scalable topic can be auto-scaled to. " + + "Splits stop firing once this is reached." + ) + private int scalableTopicMaxSegments = 64; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Hard floor on the number of active segments. Merges stop firing once this is reached." + ) + private int scalableTopicMinSegments = 1; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Max number of merges allowed in a segment's lineage. Once a segment reaches this depth " + + "it stops being a merge candidate (load-driven splits are still allowed), bounding " + + "split/merge flip-flopping." + ) + private int scalableTopicMaxDagDepth = 10; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Minimum time (seconds) between automatic splits on a topic. Deliberately short — it " + + "only coalesces a burst of near-simultaneous triggers (e.g. a consumer group " + + "connecting at once)." + ) + private int scalableTopicSplitCooldownSeconds = 60; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Minimum time (seconds) between automatic merges on a topic." + ) + private int scalableTopicMergeCooldownSeconds = 300; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "How long (seconds) a segment must continuously stay below every merge threshold before " + + "it becomes merge-eligible." + ) + private int scalableTopicMergeWindowSeconds = 300; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Inbound messages/second above which a segment is split." + ) + private double scalableTopicSplitMsgRateInThreshold = 10_000; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Inbound bytes/second above which a segment is split." + ) + private long scalableTopicSplitBytesRateInThreshold = 50_000_000L; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Outbound (dispatched) messages/second above which a segment is split." + ) + private double scalableTopicSplitMsgRateOutThreshold = 50_000; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Outbound bytes/second above which a segment is split." + ) + private long scalableTopicSplitBytesRateOutThreshold = 250_000_000L; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Inbound messages/second below which a segment counts as cold for merging." + ) + private double scalableTopicMergeMsgRateInThreshold = 1_000; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Inbound bytes/second below which a segment counts as cold for merging." + ) + private long scalableTopicMergeBytesRateInThreshold = 5_000_000L; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Outbound messages/second below which a segment counts as cold for merging." + ) + private double scalableTopicMergeMsgRateOutThreshold = 5_000; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Outbound bytes/second below which a segment counts as cold for merging." + ) + private long scalableTopicMergeBytesRateOutThreshold = 25_000_000L; + + @FieldContext( + dynamic = false, + category = CATEGORY_POLICIES, + doc = "Interval (seconds) at which the segment-owning broker samples its segment topics to " + + "report load for auto split/merge. Read at broker start; not dynamic." + ) + private int scalableTopicLoadReportIntervalSeconds = 10; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Minimum relative change in any segment rate (e.g. 0.25 = 25%) since the last write that " + + "triggers a new load record. Keeps metadata write volume bounded; a steady-state " + + "segment writes once and goes quiet.\n" + + "Note: the band is anchored at the last written value, not at the split/merge " + + "thresholds. A rate that settles within the band of the last record is never " + + "re-reported, so a segment can sustain up to this factor beyond a split/merge " + + "threshold without triggering — the cost of bounded write volume. Lower the " + + "threshold for tighter tracking at the price of more metadata writes." + ) + private double scalableTopicLoadReportRateChangeThreshold = 0.25; + @FieldContext( dynamic = false, category = CATEGORY_POLICIES, diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java index 5647feef52ab8..8d5999c061b7b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java @@ -32,9 +32,11 @@ import lombok.CustomLog; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.scalable.SegmentLoadStats; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.metadata.api.CacheGetResult; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; @@ -64,6 +66,8 @@ public class ScalableTopicResources extends BaseResources private static final String SCALABLE_TOPIC_PATH = "/topics"; private static final String SUBSCRIPTIONS_SEGMENT = "subscriptions"; private static final String CONSUMERS_SEGMENT = "consumers"; + private static final String SEGMENTS_SEGMENT = "segments"; + private static final String LOAD_SEGMENT = "load"; /** * Use the topic's {@code properties} map verbatim as the secondary-index entries. @@ -77,6 +81,7 @@ public class ScalableTopicResources extends BaseResources private final MetadataCache subscriptionCache; private final MetadataCache consumerRegistrationCache; + private final MetadataCache segmentLoadCache; /** * Per-path listeners for scalable-topic metadata events. Each listener watches a @@ -107,6 +112,7 @@ public ScalableTopicResources(MetadataStore store, int operationTimeoutSec) { super(store, ScalableTopicMetadata.class, operationTimeoutSec); this.subscriptionCache = store.getMetadataCache(SubscriptionMetadata.class); this.consumerRegistrationCache = store.getMetadataCache(ConsumerRegistration.class); + this.segmentLoadCache = store.getMetadataCache(SegmentLoadStats.class); // Single shared metadata-store listener fans out to both per-path and // per-namespace subscribers. Per-subscriber lifecycle goes through the // register / deregister methods below. @@ -260,7 +266,10 @@ public CompletableFuture updateScalableTopicAsync(TopicName tn, } public CompletableFuture deleteScalableTopicAsync(TopicName tn) { - return deleteAsync(topicPath(tn)); + // Recursive: the topic record has children — the controller leader lock, the + // subscriptions (and their consumer registrations), and the per-segment load + // records — all of which must go with the topic. + return getStore().deleteRecursive(topicPath(tn)); } public CompletableFuture scalableTopicExistsAsync(TopicName tn) { @@ -432,6 +441,53 @@ public CompletableFuture> listConsumersAsync(TopicName tn, String s /** * Get the metadata store path for the controller leader lock. */ + // --- Segment load records (PIP-483 auto split/merge) --- + + /** + * Upsert a segment's load record. Written by the broker that owns the segment's + * {@code segment://} topic, only when the rates have changed materially since the last + * write (the materiality decision lives in {@code SegmentLoadReporter}). + * + *

An identical value is NOT rewritten: the record's {@code Stat} modification time is + * what the controller uses as "cold since" for the merge window, so a no-op rewrite — + * e.g. the first report after segment ownership moved to a broker with an empty + * last-written cache — would spuriously reset the window and starve merges under + * frequent rebalancing. + */ + public CompletableFuture reportSegmentLoadAsync(TopicName tn, long segmentId, + SegmentLoadStats stats) { + String path = segmentLoadPath(tn, segmentId); + return segmentLoadCache.get(path).thenCompose(existing -> { + if (existing.isPresent() && existing.get().equals(stats)) { + return CompletableFuture.completedFuture(null); + } + return segmentLoadCache.readModifyUpdateOrCreate(path, __ -> stats) + .thenApply(__ -> null); + }); + } + + /** + * Read a segment's load record together with its metadata {@link Stat} — the controller's + * auto-scaling evaluator uses {@code stat.getModificationTimestamp()} to tell how long the + * segment has held its current load (the "cold for at least mergeWindow" check). + * + * @return the value and its stat, or empty if no record has been written yet + */ + public CompletableFuture>> getSegmentLoadAsync( + TopicName tn, long segmentId) { + return segmentLoadCache.getWithStats(segmentLoadPath(tn, segmentId)); + } + + /** Delete a segment's load record (best-effort; tolerates a missing record). */ + public CompletableFuture deleteSegmentLoadAsync(TopicName tn, long segmentId) { + return segmentLoadCache.delete(segmentLoadPath(tn, segmentId)) + .exceptionally(ignoreMissing()); + } + + public String segmentLoadPath(TopicName tn, long segmentId) { + return joinPath(topicPath(tn), SEGMENTS_SEGMENT, Long.toString(segmentId), LOAD_SEGMENT); + } + public String controllerLockPath(TopicName tn) { return joinPath(topicPath(tn), "controller"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d180048b1b316..c99ecdf78a6ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -137,6 +137,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; +import org.apache.pulsar.broker.service.scalable.SegmentLoadReporter; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; @@ -185,6 +186,8 @@ import org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.apache.pulsar.common.scalable.SegmentLoadStats; +import org.apache.pulsar.common.scalable.SegmentTopicName; import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FieldParser; @@ -285,6 +288,9 @@ public class BrokerService implements Closeable { private final SingleThreadNonConcurrentFixedRateScheduler compactionMonitor; private final SingleThreadNonConcurrentFixedRateScheduler consumedLedgersMonitor; private SingleThreadNonConcurrentFixedRateScheduler deduplicationSnapshotMonitor; + /** PIP-483: periodic sweep that writes per-segment load records for auto split/merge. */ + private SingleThreadNonConcurrentFixedRateScheduler segmentLoadReporterMonitor; + private SegmentLoadReporter segmentLoadReporter; protected final PublishRateLimiter brokerPublishRateLimiter; private final DispatchRateLimiterFactory dispatchRateLimiterFactory; protected volatile DispatchRateLimiter brokerDispatchRateLimiter = null; @@ -682,9 +688,69 @@ public void start() throws Exception { this.updateBrokerDispatchThrottlingMaxRate(); this.startCheckReplicationPolicies(); this.startDeduplicationSnapshotMonitor(); + this.startSegmentLoadReporter(); this.startClearInvalidateTopicNameCacheTask(); } + /** + * Start the periodic per-segment load reporter (PIP-483). On each tick this broker sweeps + * the {@code segment://} topics it currently hosts, computes their ingest/dispatch rates, + * and writes a {@link SegmentLoadStats} record to the metadata store — but only when a rate + * changed materially since the last write (see {@link SegmentLoadReporter}). The controller + * leader reads these records to drive auto split/merge. + */ + protected void startSegmentLoadReporter() { + ServiceConfiguration conf = pulsar().getConfiguration(); + if (!conf.isScalableTopicsEnabled()) { + return; + } + var resources = pulsar().getPulsarResources().getScalableTopicResources(); + if (resources == null) { + return; + } + int interval = conf.getScalableTopicLoadReportIntervalSeconds(); + if (interval <= 0) { + return; + } + this.segmentLoadReporter = new SegmentLoadReporter(resources, + () -> pulsar().getConfiguration().getScalableTopicLoadReportRateChangeThreshold()); + this.segmentLoadReporterMonitor = + new SingleThreadNonConcurrentFixedRateScheduler("scalable-segment-load-reporter"); + segmentLoadReporterMonitor.scheduleAtFixedRateNonConcurrently( + () -> forEachTopic(this::reportSegmentLoad), interval, interval, TimeUnit.SECONDS); + } + + @VisibleForTesting + public void runSegmentLoadReportOnceForTest() { + forEachTopic(this::reportSegmentLoad); + } + + private void reportSegmentLoad(Topic topic) { + SegmentLoadReporter reporter = this.segmentLoadReporter; + if (reporter == null) { + return; + } + TopicName topicName = TopicName.get(topic.getName()); + if (topicName.getDomain() != TopicDomain.segment) { + return; + } + try { + TopicName parent = SegmentTopicName.getParentTopicName(topicName); + long segmentId = SegmentTopicName.getSegmentId(topicName); + var stats = topic.getStats(false, false, false); + SegmentLoadStats load = new SegmentLoadStats( + stats.msgRateIn, stats.msgThroughputIn, stats.msgRateOut, stats.msgThroughputOut); + reporter.reportIfChanged(parent, segmentId, load).exceptionally(ex -> { + log.debug().attr("segment", topicName).exceptionMessage(ex) + .log("Failed to report segment load"); + return null; + }); + } catch (Exception e) { + log.debug().attr("segment", topicName).exceptionMessage(e) + .log("Failed to sample segment load"); + } + } + protected void startClearInvalidateTopicNameCacheTask() { final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache(); inactivityMonitor.scheduleAtFixedRateNonConcurrently( @@ -966,7 +1032,8 @@ public CompletableFuture closeAsync() { consumedLedgersMonitor, backlogQuotaChecker, topicOrderedExecutor, - deduplicationSnapshotMonitor) + deduplicationSnapshotMonitor, + segmentLoadReporterMonitor) .handle()); CompletableFuture combined = @@ -2696,9 +2763,34 @@ private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, if (compactor != null) { compactor.getStats().removeTopic(topic); } + forgetSegmentLoad(topic); topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.SUCCESS); } + /** + * Drop the load reporter's last-written cache entry for a segment topic this broker no + * longer owns (unload / delete). Without this the cache grows unboundedly with segment + * churn, and on a later re-acquire the first sample could be wrongly suppressed as + * immaterial. + */ + private void forgetSegmentLoad(String topic) { + SegmentLoadReporter reporter = this.segmentLoadReporter; + if (reporter == null) { + return; + } + TopicName topicName = TopicName.get(topic); + if (topicName.getDomain() != TopicDomain.segment) { + return; + } + try { + reporter.forget(SegmentTopicName.getParentTopicName(topicName), + SegmentTopicName.getSegmentId(topicName)); + } catch (Exception e) { + log.debug().attr("segment", topicName).exceptionMessage(e) + .log("Failed to forget segment load cache entry"); + } + } + public long getNumberOfNamespaceBundles() { this.numberOfNamespaceBundles = 0; this.multiLayerTopicsMap.forEach((namespaceName, bundles) -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java new file mode 100644 index 0000000000000..b739b102a76b9 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import java.time.Duration; +import lombok.Builder; +import org.apache.pulsar.broker.ServiceConfiguration; + +/** + * Fully-resolved auto split/merge policy for a single scalable topic (PIP-483). + * + *

This is the flattened result of merging broker config defaults with any namespace and + * topic overrides. The {@link AutoScalePolicyEvaluator} reads it directly — it never sees + * the partial override objects or the broker config. + * + *

All thresholds are absolute (msg/s and bytes/s). Split thresholds must sit strictly + * above the corresponding merge thresholds: the dead-band between them is the hysteresis + * that prevents a just-merged segment from immediately re-qualifying for a split. + * + * @param enabled whether auto split/merge is active for this topic; when false the + * evaluator always returns {@code NoAction} + * @param maxSegments hard ceiling on active segments; splits stop once reached + * @param minSegments hard floor on active segments; merges stop once reached + * @param maxDagDepth max merges allowed in a segment's lineage; a pair is merge-eligible + * only while neither side has reached this depth (splits are unaffected) + * @param splitCooldown minimum time between automatic splits on the topic; short, only to + * coalesce a burst of near-simultaneous triggers + * @param mergeCooldown minimum time between automatic merges on the topic + * @param mergeWindow how long a segment must continuously stay below every merge threshold + * before it becomes merge-eligible (measured from the load record's + * metadata-store last-modified time) + * @param splitMsgRateIn inbound msg/s above which a segment is split + * @param splitBytesRateIn inbound bytes/s above which a segment is split + * @param splitMsgRateOut outbound (dispatched) msg/s above which a segment is split + * @param splitBytesRateOut outbound bytes/s above which a segment is split + * @param mergeMsgRateIn inbound msg/s below which a segment counts as cold for merging + * @param mergeBytesRateIn inbound bytes/s below which a segment counts as cold for merging + * @param mergeMsgRateOut outbound msg/s below which a segment counts as cold for merging + * @param mergeBytesRateOut outbound bytes/s below which a segment counts as cold for merging + */ +@Builder(toBuilder = true) +public record AutoScaleConfig( + boolean enabled, + int maxSegments, + int minSegments, + int maxDagDepth, + Duration splitCooldown, + Duration mergeCooldown, + Duration mergeWindow, + double splitMsgRateIn, + double splitBytesRateIn, + double splitMsgRateOut, + double splitBytesRateOut, + double mergeMsgRateIn, + double mergeBytesRateIn, + double mergeMsgRateOut, + double mergeBytesRateOut +) { + + /** + * Build the cluster-wide default policy from broker configuration. Per-namespace and + * per-topic overrides (when added) are layered on top of this via {@code toBuilder()}. + * + * @param conf the broker service configuration + * @return the resolved policy reflecting the {@code scalableTopic*} settings + */ + public static AutoScaleConfig fromBrokerConfig(ServiceConfiguration conf) { + return AutoScaleConfig.builder() + .enabled(conf.isScalableTopicAutoScaleEnabled()) + .maxSegments(conf.getScalableTopicMaxSegments()) + .minSegments(conf.getScalableTopicMinSegments()) + .maxDagDepth(conf.getScalableTopicMaxDagDepth()) + .splitCooldown(Duration.ofSeconds(conf.getScalableTopicSplitCooldownSeconds())) + .mergeCooldown(Duration.ofSeconds(conf.getScalableTopicMergeCooldownSeconds())) + .mergeWindow(Duration.ofSeconds(conf.getScalableTopicMergeWindowSeconds())) + .splitMsgRateIn(conf.getScalableTopicSplitMsgRateInThreshold()) + .splitBytesRateIn(conf.getScalableTopicSplitBytesRateInThreshold()) + .splitMsgRateOut(conf.getScalableTopicSplitMsgRateOutThreshold()) + .splitBytesRateOut(conf.getScalableTopicSplitBytesRateOutThreshold()) + .mergeMsgRateIn(conf.getScalableTopicMergeMsgRateInThreshold()) + .mergeBytesRateIn(conf.getScalableTopicMergeBytesRateInThreshold()) + .mergeMsgRateOut(conf.getScalableTopicMergeMsgRateOutThreshold()) + .mergeBytesRateOut(conf.getScalableTopicMergeBytesRateOutThreshold()) + .build() + .validated(); + } + + /** + * Validate the invariants the evaluator depends on; returns {@code this} for chaining. + * + *

In particular every split threshold must be strictly positive — the evaluator + * scores overload as {@code rate / splitThreshold}, and a zero threshold would make any + * positive rate score {@code Infinity} (permanent split pressure) while a zero rate + * scores {@code NaN} (silently ignored). Catching misconfiguration here surfaces a + * clear error at the policy-resolution layer instead. + * + * @throws IllegalArgumentException if any invariant is violated + */ + public AutoScaleConfig validated() { + check(minSegments >= 1, "minSegments must be >= 1"); + check(maxSegments >= minSegments, "maxSegments must be >= minSegments"); + check(maxDagDepth >= 0, "maxDagDepth must be >= 0"); + check(!splitCooldown.isNegative(), "splitCooldown must not be negative"); + check(!mergeCooldown.isNegative(), "mergeCooldown must not be negative"); + check(!mergeWindow.isNegative(), "mergeWindow must not be negative"); + check(splitMsgRateIn > 0, "splitMsgRateInThreshold must be > 0"); + check(splitBytesRateIn > 0, "splitBytesRateInThreshold must be > 0"); + check(splitMsgRateOut > 0, "splitMsgRateOutThreshold must be > 0"); + check(splitBytesRateOut > 0, "splitBytesRateOutThreshold must be > 0"); + check(mergeMsgRateIn >= 0, "mergeMsgRateInThreshold must be >= 0"); + check(mergeBytesRateIn >= 0, "mergeBytesRateInThreshold must be >= 0"); + check(mergeMsgRateOut >= 0, "mergeMsgRateOutThreshold must be >= 0"); + check(mergeBytesRateOut >= 0, "mergeBytesRateOutThreshold must be >= 0"); + check(splitMsgRateIn > mergeMsgRateIn, + "splitMsgRateInThreshold must be > mergeMsgRateInThreshold (hysteresis)"); + check(splitBytesRateIn > mergeBytesRateIn, + "splitBytesRateInThreshold must be > mergeBytesRateInThreshold (hysteresis)"); + check(splitMsgRateOut > mergeMsgRateOut, + "splitMsgRateOutThreshold must be > mergeMsgRateOutThreshold (hysteresis)"); + check(splitBytesRateOut > mergeBytesRateOut, + "splitBytesRateOutThreshold must be > mergeBytesRateOutThreshold (hysteresis)"); + return this; + } + + private static void check(boolean condition, String message) { + if (!condition) { + throw new IllegalArgumentException("Invalid auto split/merge configuration: " + message); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleDecision.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleDecision.java new file mode 100644 index 0000000000000..e8a1713e1f060 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleDecision.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +/** + * The outcome of one {@link AutoScalePolicyEvaluator} evaluation (PIP-483): split one + * segment, merge two adjacent segments, or do nothing. Each non-{@link NoAction} variant + * carries a short {@code reason} string used for logging and metrics. + */ +public sealed interface AutoScaleDecision + permits AutoScaleDecision.Split, AutoScaleDecision.Merge, AutoScaleDecision.NoAction { + + /** Split {@code segmentId} at its midpoint. */ + record Split(long segmentId, String reason) implements AutoScaleDecision { + } + + /** Merge the two adjacent active segments {@code segmentId1} and {@code segmentId2}. */ + record Merge(long segmentId1, long segmentId2, String reason) implements AutoScaleDecision { + } + + /** No action this evaluation. */ + record NoAction() implements AutoScaleDecision { + } + + NoAction NONE = new NoAction(); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java new file mode 100644 index 0000000000000..bcb48ffd6a938 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.pulsar.common.scalable.SegmentInfo; +import org.apache.pulsar.common.scalable.SegmentLoadStats; + +/** + * Pure, side-effect-free decision function for scalable-topic auto split/merge (PIP-483). + * + *

Given a snapshot of the current layout, per-segment load samples, per-subscription + * stream/checkpoint consumer counts, the resolved policy, and the current time, it returns + * exactly one {@link AutoScaleDecision}. It performs no I/O and holds no state — the caller + * (the controller leader) collects the inputs and dispatches the result. + * + *

It runs two passes and emits at most one action: + *

    + *
  1. Split (fast, lightly coalesced by {@code splitCooldown}): consumer-count + * scale-up first, then traffic-driven scale-up.
  2. + *
  3. Merge (lazy, gated by {@code mergeCooldown} + {@code mergeWindow} + + * {@code maxDagDepth}): only if no split fired.
  4. + *
+ */ +public final class AutoScalePolicyEvaluator { + + private AutoScalePolicyEvaluator() { + } + + /** + * Decide whether to split, merge, or do nothing. + * + * @param layout the current segment layout + * @param loadBySegment per active-segment load sample; a missing entry is treated + * as zero load with no age (never merge-eligible) + * @param streamConsumerCount per-subscription count of STREAM/CHECKPOINT (controller-managed) + * consumers; QUEUE subscriptions are excluded by the caller + * @param config the resolved policy + * @param nowMs current wall-clock time, epoch millis + * @param lastSplitAtMs epoch millis of the last split on this topic (manual or auto), + * or {@code Long.MIN_VALUE} if none + * @param lastMergeAtMs epoch millis of the last merge on this topic (manual or auto), + * or {@code Long.MIN_VALUE} if none + * @return the decision + */ + public static AutoScaleDecision decide( + SegmentLayout layout, + Map loadBySegment, + Map streamConsumerCount, + AutoScaleConfig config, + long nowMs, + long lastSplitAtMs, + long lastMergeAtMs) { + + if (!config.enabled()) { + return AutoScaleDecision.NONE; + } + + List active = new ArrayList<>(layout.getActiveSegments().values()); + + AutoScaleDecision split = trySplit(active, loadBySegment, streamConsumerCount, + config, nowMs, lastSplitAtMs); + if (!(split instanceof AutoScaleDecision.NoAction)) { + return split; + } + + return tryMerge(active, layout, loadBySegment, config, nowMs, lastMergeAtMs); + } + + // --- Split pass --- + + private static AutoScaleDecision trySplit( + List active, + Map loadBySegment, + Map streamConsumerCount, + AutoScaleConfig config, + long nowMs, + long lastSplitAtMs) { + + if (active.size() >= config.maxSegments()) { + return AutoScaleDecision.NONE; + } + if (withinCooldown(nowMs, lastSplitAtMs, config.splitCooldown().toMillis())) { + return AutoScaleDecision.NONE; + } + + // (a) Consumer-driven: per-subscription max. If any managed subscription has more + // consumers than there are active segments, add a segment so the 1:1 assignment can + // give the extra consumer its own segment. Split the busiest segment by msgRateIn so + // the new pair lands where it relieves the most ingest. + int requiredConsumers = streamConsumerCount.values().stream() + .mapToInt(Integer::intValue).max().orElse(0); + if (requiredConsumers > active.size()) { + SegmentInfo target = busiestByMsgRateIn(active, loadBySegment); + if (target != null) { + return new AutoScaleDecision.Split(target.segmentId(), "consumer-count"); + } + } + + // (b) Load-driven: split the segment with the highest overload score among those over + // at least one split threshold. + SegmentInfo hottest = null; + double hottestScore = 1.0; // strictly over threshold means a per-metric ratio > 1.0 + String hottestReason = null; + for (SegmentInfo segment : active) { + SegmentLoadStats stats = statsOf(segment.segmentId(), loadBySegment); + double score = 0.0; + String reason = null; + double[] ratios = { + stats.msgRateIn() / config.splitMsgRateIn(), + stats.bytesRateIn() / config.splitBytesRateIn(), + stats.msgRateOut() / config.splitMsgRateOut(), + stats.bytesRateOut() / config.splitBytesRateOut(), + }; + String[] reasons = {"msgRateIn", "bytesRateIn", "msgRateOut", "bytesRateOut"}; + for (int i = 0; i < ratios.length; i++) { + if (ratios[i] > score) { + score = ratios[i]; + reason = reasons[i]; + } + } + if (score > 1.0 && score > hottestScore) { + hottestScore = score; + hottest = segment; + hottestReason = reason; + } + } + if (hottest != null) { + return new AutoScaleDecision.Split(hottest.segmentId(), hottestReason); + } + + return AutoScaleDecision.NONE; + } + + // --- Merge pass --- + + private static AutoScaleDecision tryMerge( + List active, + SegmentLayout layout, + Map loadBySegment, + AutoScaleConfig config, + long nowMs, + long lastMergeAtMs) { + + if (active.size() <= config.minSegments()) { + return AutoScaleDecision.NONE; + } + if (withinCooldown(nowMs, lastMergeAtMs, config.mergeCooldown().toMillis())) { + return AutoScaleDecision.NONE; + } + + long mergeWindowMs = config.mergeWindow().toMillis(); + + AutoScaleDecision.Merge coldest = null; + double coldestCombined = Double.MAX_VALUE; + for (int i = 0; i < active.size(); i++) { + for (int j = i + 1; j < active.size(); j++) { + SegmentInfo a = active.get(i); + SegmentInfo b = active.get(j); + if (!a.hashRange().isAdjacentTo(b.hashRange())) { + continue; + } + if (layout.mergeDepth(a.segmentId()) >= config.maxDagDepth() + || layout.mergeDepth(b.segmentId()) >= config.maxDagDepth()) { + continue; + } + if (!coldEnough(a.segmentId(), loadBySegment, config, nowMs, mergeWindowMs) + || !coldEnough(b.segmentId(), loadBySegment, config, nowMs, mergeWindowMs)) { + continue; + } + double combined = combinedRate(a.segmentId(), loadBySegment) + + combinedRate(b.segmentId(), loadBySegment); + if (combined < coldestCombined) { + coldestCombined = combined; + coldest = new AutoScaleDecision.Merge(a.segmentId(), b.segmentId(), "cold"); + } + } + } + return coldest != null ? coldest : AutoScaleDecision.NONE; + } + + /** + * A segment is cold enough to merge only if it has a load record that has stayed below + * every merge threshold for at least {@code mergeWindowMs}. A missing record means we + * have no evidence the segment is durably cold, so it is never merge-eligible. + * + *

Note that {@code nowMs} is the controller broker's clock while the sample's + * {@code modifiedAtMs} is the metadata store's server-side timestamp; clock skew between + * the two shifts the effective window. Acceptable for a lazy-merge heuristic — skew is + * normally seconds against a multi-minute window. + */ + private static boolean coldEnough(long segmentId, Map loadBySegment, + AutoScaleConfig config, long nowMs, long mergeWindowMs) { + SegmentLoadSample sample = loadBySegment.get(segmentId); + if (sample == null) { + return false; + } + if (nowMs - sample.modifiedAtMs() < mergeWindowMs) { + return false; + } + SegmentLoadStats stats = sample.stats(); + return stats.msgRateIn() < config.mergeMsgRateIn() + && stats.bytesRateIn() < config.mergeBytesRateIn() + && stats.msgRateOut() < config.mergeMsgRateOut() + && stats.bytesRateOut() < config.mergeBytesRateOut(); + } + + // --- Helpers --- + + private static boolean withinCooldown(long nowMs, long lastAtMs, long cooldownMs) { + return lastAtMs != Long.MIN_VALUE && nowMs - lastAtMs < cooldownMs; + } + + private static SegmentLoadStats statsOf(long segmentId, Map load) { + SegmentLoadSample sample = load.get(segmentId); + return sample != null ? sample.stats() : SegmentLoadStats.ZERO; + } + + private static double combinedRate(long segmentId, Map load) { + SegmentLoadStats s = statsOf(segmentId, load); + return s.msgRateIn() + s.bytesRateIn() + s.msgRateOut() + s.bytesRateOut(); + } + + private static SegmentInfo busiestByMsgRateIn(List active, + Map load) { + SegmentInfo best = null; + double bestRate = -1.0; + for (SegmentInfo segment : active) { + double rate = statsOf(segment.segmentId(), load).msgRateIn(); + // Tie-break deterministically on segment id so the choice is stable across ticks. + if (rate > bestRate || (rate == bestRate && best != null + && segment.segmentId() < best.segmentId())) { + bestRate = rate; + best = segment; + } + } + return best; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index c5501db56e2b4..0b8c70d66b19b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.scalable; +import com.google.common.annotations.VisibleForTesting; import io.github.merlimat.slog.Logger; import java.time.Clock; import java.time.Duration; @@ -31,8 +32,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Getter; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resources.ScalableTopicMetadata; import org.apache.pulsar.broker.resources.ScalableTopicResources; import org.apache.pulsar.broker.service.BrokerService; @@ -84,6 +87,27 @@ public class ScalableTopicController { /** Sealed-segment GC scheduled task. Non-null only while this broker is leader. */ private volatile ScheduledFuture gcTask; + /** Periodic auto split/merge evaluation task (PIP-483). Non-null only while leader. */ + private volatile ScheduledFuture autoScaleTask; + + /** + * Serializes auto split/merge: an evaluation acquires this before deciding and holds it + * for the whole split/merge it dispatches, so concurrent ticks / consumer-change triggers + * never launch overlapping auto operations. + */ + private final AtomicBoolean autoScaleInFlight = new AtomicBoolean(false); + + /** + * Set when a trigger arrives while an evaluation is in flight; the in-flight run + * re-evaluates once on completion so coalesced triggers are not lost until the next tick. + */ + private final AtomicBoolean autoScaleReEvaluate = new AtomicBoolean(false); + + /** Epoch millis of the last split on this topic (manual or auto); MIN_VALUE if none. */ + private volatile long lastSplitAtMs = Long.MIN_VALUE; + /** Epoch millis of the last merge on this topic (manual or auto); MIN_VALUE if none. */ + private volatile long lastMergeAtMs = Long.MIN_VALUE; + @Getter private volatile LeaderElectionState leaderState = LeaderElectionState.NoLeader; @@ -126,10 +150,11 @@ public class ScalableTopicController { private void onLeaderStateChange(LeaderElectionState state) { log.info().attr("state", state).log("Leader state change for scalable topic"); if (state != LeaderElectionState.Leading) { - // Stepped down (or never was leader). Stop the GC tick so the deposed leader - // doesn't race the new one on layout writes / backing-topic deletes. The new - // leader's initialize() will reschedule. + // Stepped down (or never was leader). Stop the GC and auto-scale ticks so the + // deposed leader doesn't race the new one on layout writes / backing-topic + // deletes. The new leader's initialize() will reschedule. cancelGcTask(); + cancelAutoScaleTask(); } if (state == LeaderElectionState.NoLeader && !closed) { initialize().exceptionally(ex -> { @@ -160,7 +185,9 @@ public CompletableFuture initialize() { }) .thenCompose(__ -> { if (isLeader()) { + seedAutoScaleCooldownsFromLayout(); scheduleGcTask(); + scheduleAutoScaleTask(); return ensureActiveSegmentsExist() .thenCompose(___ -> restoreSessionsFromStore()); } @@ -168,6 +195,29 @@ public CompletableFuture initialize() { }); } + /** + * Recover the auto split/merge cooldown clocks after winning leadership. The timestamps + * are in-memory only, but the layout itself records when each segment was created — a + * split's children have exactly one parent, a merge's child has two — so the most recent + * creation time of each class is exactly when the last split / merge happened. Without + * this, every leader failover would reset both cooldowns and e.g. allow an auto merge + * seconds after one just ran on the previous leader. + */ + private void seedAutoScaleCooldownsFromLayout() { + long split = Long.MIN_VALUE; + long merge = Long.MIN_VALUE; + for (SegmentInfo segment : currentLayout.getAllSegments().values()) { + int parents = segment.parentIds().size(); + if (parents == 1) { + split = Math.max(split, segment.createdAtMs()); + } else if (parents >= 2) { + merge = Math.max(merge, segment.createdAtMs()); + } + } + lastSplitAtMs = split; + lastMergeAtMs = merge; + } + /** * Recovery path for active segments whose backing topics are missing — e.g., * a {@code createScalableTopic} call that committed metadata but failed to @@ -237,6 +287,186 @@ private void runGcTickSafely() { } } + // --- Auto split/merge (PIP-483) --- + + /** + * Schedule the periodic traffic-driven auto split/merge evaluation. Only fires on the + * controller leader; idempotent. Cancelled on close / leader-loss. Consumer-count + * changes are handled event-driven (see {@link #onConsumerCountChanged()}), not by this + * tick. + * + *

The tick is scheduled even when auto-scaling is currently disabled: the enabled + * flag is dynamic and re-checked on every evaluation, so flipping it on takes effect at + * the next tick rather than waiting for a leadership cycle. A disabled tick is a cheap + * no-op. + */ + private synchronized void scheduleAutoScaleTask() { + if (closed || autoScaleTask != null) { + return; + } + ServiceConfiguration config = brokerConfig(); + if (config == null) { + return; + } + long intervalMs = Duration.ofSeconds( + config.getScalableTopicAutoScaleIntervalSeconds()).toMillis(); + if (intervalMs <= 0) { + return; + } + autoScaleTask = scheduler().scheduleAtFixedRate( + () -> runAutoScaleSafely("tick"), intervalMs, intervalMs, TimeUnit.MILLISECONDS); + } + + private synchronized void cancelAutoScaleTask() { + if (autoScaleTask != null) { + autoScaleTask.cancel(false); + autoScaleTask = null; + } + } + + /** + * Event-driven trigger: a stream/checkpoint consumer registered or unregistered, which + * may change the per-subscription consumer count. Evaluates the consumer-count split rule + * within seconds rather than waiting for the periodic tick. + */ + private void onConsumerCountChanged() { + runAutoScaleSafely("consumer-change"); + } + + private void runAutoScaleSafely(String trigger) { + if (!isLeader() || closed) { + return; + } + try { + evaluateAndAct(trigger).exceptionally(ex -> { + log.warn().attr("trigger", trigger).exceptionMessage(ex) + .log("Auto split/merge evaluation failed"); + return null; + }); + } catch (Throwable t) { + log.warn().attr("trigger", trigger).exception(t) + .log("Auto split/merge evaluation threw"); + } + } + + /** + * Collect the current inputs, run the pure {@link AutoScalePolicyEvaluator}, and dispatch + * the resulting action. At most one auto operation runs at a time: {@link #autoScaleInFlight} + * is held from before the decision through the end of the dispatched split/merge. + */ + private CompletableFuture evaluateAndAct(String trigger) { + ServiceConfiguration brokerConfig = brokerConfig(); + if (brokerConfig == null) { + return CompletableFuture.completedFuture(null); + } + AutoScaleConfig config = AutoScaleConfig.fromBrokerConfig(brokerConfig); + if (!config.enabled()) { + return CompletableFuture.completedFuture(null); + } + if (!autoScaleInFlight.compareAndSet(false, true)) { + // Another evaluation or auto operation is already running. Don't drop the + // trigger: mark it pending so the in-flight run re-evaluates on completion — + // e.g. a consumer registering mid-evaluation would otherwise not be considered + // until the next periodic tick. + autoScaleReEvaluate.set(true); + return CompletableFuture.completedFuture(null); + } + try { + return collectConsumerCounts() + .thenCombine(collectLoadSamples(), (consumers, load) -> + AutoScalePolicyEvaluator.decide(currentLayout, load, consumers, config, + clock.millis(), lastSplitAtMs, lastMergeAtMs)) + .thenCompose(decision -> dispatch(decision, config, trigger)) + .whenComplete((__, ex) -> { + autoScaleInFlight.set(false); + if (autoScaleReEvaluate.getAndSet(false)) { + // Re-run off the completion thread for the trigger(s) coalesced + // while this evaluation was in flight. + scheduler().execute(() -> runAutoScaleSafely("coalesced")); + } + }); + } catch (Throwable t) { + // A synchronous throw between the CAS and the future chain would otherwise leave + // the in-flight flag set forever, silently disabling auto-scaling on this topic. + autoScaleInFlight.set(false); + throw t; + } + } + + private CompletableFuture dispatch(AutoScaleDecision decision, AutoScaleConfig config, + String trigger) { + if (decision instanceof AutoScaleDecision.Split split) { + log.info().attr("segmentId", split.segmentId()).attr("reason", split.reason()) + .attr("trigger", trigger).log("Auto split"); + return splitSegment(split.segmentId()) + .thenApply(__ -> { + scheduleFollowUpEvaluation(config); + return null; + }); + } + if (decision instanceof AutoScaleDecision.Merge merge) { + log.info().attr("segmentId1", merge.segmentId1()).attr("segmentId2", merge.segmentId2()) + .attr("reason", merge.reason()).attr("trigger", trigger).log("Auto merge"); + return mergeSegments(merge.segmentId1(), merge.segmentId2()).thenApply(__ -> null); + } + return CompletableFuture.completedFuture(null); + } + + /** + * After a successful auto split, schedule one follow-up evaluation right after the split + * cooldown expires. A burst of consumers joining at once needs one split per cooldown to + * converge (e.g. 1 segment → N); without this it converges one split per periodic tick + * instead, which is slower whenever the cooldown is shorter than the tick. The chain + * stops naturally at the first evaluation that decides {@code NoAction}. + */ + private void scheduleFollowUpEvaluation(AutoScaleConfig config) { + if (closed || !isLeader()) { + return; + } + long delayMs = config.splitCooldown().toMillis() + 1; + scheduler().schedule(() -> runAutoScaleSafely("post-split"), + delayMs, TimeUnit.MILLISECONDS); + } + + /** + * Per-subscription consumer counts for the controller-managed (STREAM/CHECKPOINT) + * subscriptions. QUEUE subscriptions bypass the controller and have no coordinator here, + * so they are naturally excluded — exactly the set the consumer-count split rule wants. + */ + private CompletableFuture> collectConsumerCounts() { + Map counts = new LinkedHashMap<>(); + subscriptions.forEach((name, coordinator) -> + counts.put(name, coordinator.getConsumers().size())); + return CompletableFuture.completedFuture(counts); + } + + /** Read the load record (value + Stat modified time) for every active segment. */ + private CompletableFuture> collectLoadSamples() { + Map samples = new ConcurrentHashMap<>(); + List> futures = new ArrayList<>(); + for (Long segmentId : currentLayout.getActiveSegments().keySet()) { + futures.add(resources.getSegmentLoadAsync(topicName, segmentId) + .thenAccept(opt -> opt.ifPresent(result -> samples.put(segmentId, + new SegmentLoadSample(result.getValue(), + result.getStat().getModificationTimestamp()))))); + } + return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) + .thenApply(__ -> samples); + } + + private ServiceConfiguration brokerConfig() { + return brokerService.getPulsar().getConfig(); + } + + /** + * Run one auto split/merge evaluation synchronously-awaitable, for tests. Production code + * triggers evaluation via the periodic tick and consumer-change events. + */ + @VisibleForTesting + CompletableFuture evaluateAutoScaleForTest() { + return evaluateAndAct("test"); + } + /** * Load persisted subscriptions and consumer registrations from the metadata store and * install them into per-subscription {@link SubscriptionCoordinator} instances. Called @@ -432,6 +662,9 @@ public CompletableFuture splitSegment(long segmentId) { .thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) .thenCompose(optMd -> { currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow()); + // Start the auto-split cooldown only now that the split actually happened + // (covers manual and auto splits; a failed attempt doesn't burn the cooldown). + lastSplitAtMs = nowMs; // Step 5: Notify subscriptions of layout change (triggers consumer reassignment) return notifySubscriptions(currentLayout); @@ -478,6 +711,9 @@ public CompletableFuture mergeSegments(long segmentId1, long segm .thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) .thenCompose(optMd -> { currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow()); + // Start the auto-merge cooldown only now that the merge actually happened + // (covers manual and auto merges; a failed attempt doesn't burn the cooldown). + lastMergeAtMs = nowMs; return notifySubscriptions(currentLayout); }).thenApply(__ -> currentLayout); } @@ -516,6 +752,9 @@ public CompletableFuture registerConsumer(String subscriptio } return coordinator.registerConsumer(consumerName, consumerId, cnx) .thenApply(assignments -> { + // A new consumer may now outnumber the segments — evaluate the + // consumer-count split rule promptly rather than waiting for the tick. + onConsumerCountChanged(); // Look up by name since the key may have been an existing session return assignments.entrySet().stream() .filter(e -> consumerName.equals(e.getKey().getConsumerName())) @@ -961,7 +1200,11 @@ private CompletableFuture pruneAllAsync(List drained) { }) .thenCompose(__ -> { CompletableFuture[] deletes = drained.stream() - .map(this::deleteSegmentBackingTopic) + .map(s -> deleteSegmentBackingTopic(s) + // The segment is gone from the layout — drop its load record + // too, or the .../segments/{id}/load entry leaks forever. + .thenCompose(___ -> + resources.deleteSegmentLoadAsync(topicName, s.segmentId()))) .toArray(CompletableFuture[]::new); return CompletableFuture.allOf(deletes); }) @@ -1094,6 +1337,7 @@ int sealedSegmentCount() { public CompletableFuture close() { closed = true; cancelGcTask(); + cancelAutoScaleTask(); // Stop each coordinator's drain poller before clearing — otherwise the scheduler // task keeps running after the controller goes away. subscriptions.values().forEach(SubscriptionCoordinator::close); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java index aed7dd3f8e1cb..a3f1f0c5ef101 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java @@ -18,11 +18,15 @@ */ package org.apache.pulsar.broker.service.scalable; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import lombok.Getter; import org.apache.pulsar.broker.resources.ScalableTopicMetadata; @@ -112,6 +116,43 @@ public List getParents(long segmentId) { .collect(Collectors.toList()); } + /** + * Number of merge operations in a segment's ancestry, including the segment itself. + * + *

A merge is the only operation that produces a segment with more than one parent + * (a split produces children with exactly one parent), so the merge depth is the count + * of segments in this segment's ancestor chain — itself included — that have + * {@code parentIds.size() >= 2}. + * + *

Used by auto split/merge (PIP-483) to cap split↔merge churn: a pair is only + * merge-eligible while neither side's merge depth has reached the configured maximum, + * which bounds the merge depth of the resulting child. + * + * @param segmentId the segment to measure + * @return the number of merges in this segment's lineage (0 for a never-merged segment) + */ + public int mergeDepth(long segmentId) { + int depth = 0; + Deque toVisit = new ArrayDeque<>(); + Set visited = new HashSet<>(); + toVisit.add(segmentId); + while (!toVisit.isEmpty()) { + long id = toVisit.poll(); + if (!visited.add(id)) { + continue; + } + SegmentInfo segment = allSegments.get(id); + if (segment == null) { + continue; + } + if (segment.parentIds().size() >= 2) { + depth++; + } + toVisit.addAll(segment.parentIds()); + } + return depth; + } + /** * Produce a new layout by splitting a segment at its midpoint. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporter.java new file mode 100644 index 0000000000000..56001951c4a6d --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.DoubleSupplier; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.scalable.SegmentLoadStats; + +/** + * Writes per-segment {@link SegmentLoadStats} to the metadata store on behalf of the broker + * that owns a segment's {@code segment://} topic (PIP-483). + * + *

To keep metadata write volume bounded, a sample is persisted only when it differs + * materially from the last value this broker wrote for that segment — i.e. some rate moved + * by more than the configured relative threshold (default 25%), or crossed to/from zero. A + * steady-state segment therefore writes once and then stays quiet; the controller's + * windowing relies on the stored record's {@code Stat} modification time staying put while + * load is unchanged. + * + *

Known blind spot — the materiality band is anchored at the last written + * value, not at the policy thresholds. A true rate that settles inside the band never + * produces a new record, so a segment can sustain up to {@code threshold} beyond a split or + * merge threshold indefinitely without the controller seeing it. This is path-dependent + * (whether a given load triggers depends on what was last written) but one-directional: it + * can only delay an action, never cause a spurious one, and its magnitude is bounded by the + * configured threshold. Accepted as the cost of bounded write volume; operators wanting + * tighter tracking lower {@code scalableTopicLoadReportRateChangeThreshold}. + * + *

This class owns only the materiality decision and the last-written cache. Sampling the + * live {@code TopicStats} and scheduling the periodic sweep are wired in by the broker. + */ +public class SegmentLoadReporter { + + private final ScalableTopicResources resources; + /** Re-read on every sample so the broker config knob is honored dynamically. */ + private final DoubleSupplier rateChangeThreshold; + + /** Last value written per load-record path, so we can skip immaterial updates. */ + private final ConcurrentHashMap lastWritten = new ConcurrentHashMap<>(); + + public SegmentLoadReporter(ScalableTopicResources resources, DoubleSupplier rateChangeThreshold) { + this.resources = resources; + this.rateChangeThreshold = rateChangeThreshold; + } + + public SegmentLoadReporter(ScalableTopicResources resources, double rateChangeThreshold) { + this(resources, () -> rateChangeThreshold); + } + + /** + * Report a segment's current load, writing to the store only if it changed materially + * since the last write (or has never been written). + * + *

On a local cache miss (broker restart, or segment ownership just moved here) the + * baseline is seeded from the record already in the store, and the materiality gate is + * applied against that. Without this, the first sample after every ownership move would + * write unconditionally and reset the record's modification time — which the controller + * uses as "cold since" for the merge window — starving merges under frequent rebalancing. + * + * @return a future completing with {@code true} if a write happened, {@code false} if the + * sample was immaterial and skipped + */ + public CompletableFuture reportIfChanged(TopicName topic, long segmentId, + SegmentLoadStats current) { + String path = resources.segmentLoadPath(topic, segmentId); + double threshold = rateChangeThreshold.getAsDouble(); + SegmentLoadStats last = lastWritten.get(path); + if (last == null) { + return resources.getSegmentLoadAsync(topic, segmentId).thenCompose(stored -> { + stored.ifPresent(result -> lastWritten.putIfAbsent(path, result.getValue())); + SegmentLoadStats baseline = lastWritten.get(path); + if (baseline != null && !isMaterialChange(baseline, current, threshold)) { + return CompletableFuture.completedFuture(false); + } + return write(topic, segmentId, path, current); + }); + } + if (!isMaterialChange(last, current, threshold)) { + return CompletableFuture.completedFuture(false); + } + return write(topic, segmentId, path, current); + } + + private CompletableFuture write(TopicName topic, long segmentId, String path, + SegmentLoadStats current) { + return resources.reportSegmentLoadAsync(topic, segmentId, current) + .thenApply(__ -> { + lastWritten.put(path, current); + return true; + }); + } + + /** + * Drop the cached last-written value for a segment — call when this broker stops owning + * the segment topic (unload, seal, or delete) so the cache doesn't grow unboundedly with + * segment churn. A later re-acquire re-seeds the baseline from the stored record. + */ + public void forget(TopicName topic, long segmentId) { + lastWritten.remove(resources.segmentLoadPath(topic, segmentId)); + } + + /** + * True if any of the four rates changed by more than {@code threshold} (relative), or + * crossed to/from zero. + */ + static boolean isMaterialChange(SegmentLoadStats last, SegmentLoadStats current, + double threshold) { + return changed(last.msgRateIn(), current.msgRateIn(), threshold) + || changed(last.bytesRateIn(), current.bytesRateIn(), threshold) + || changed(last.msgRateOut(), current.msgRateOut(), threshold) + || changed(last.bytesRateOut(), current.bytesRateOut(), threshold); + } + + private static boolean changed(double last, double current, double threshold) { + if (last == 0.0) { + // Any move off zero (a segment going from idle to active) is always material; + // staying at zero is not. + return current != 0.0; + } + return Math.abs(current - last) / last > threshold; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadSample.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadSample.java new file mode 100644 index 0000000000000..4fb21d0c95462 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadSample.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import org.apache.pulsar.common.scalable.SegmentLoadStats; + +/** + * A segment's load record as the controller sees it: the persisted {@link SegmentLoadStats} + * plus the metadata store's last-modified timestamp for the record (PIP-483). + * + *

This is an in-memory evaluator input, never persisted — the timestamp comes from the + * metadata {@code Stat}, not from the stored value. {@code modifiedAtMs} is what the merge + * pass uses to require a segment has stayed cold for at least {@code mergeWindow}. + * + * @param stats the persisted rates + * @param modifiedAtMs metadata-store last-modified time of the load record, in epoch millis + */ +public record SegmentLoadSample(SegmentLoadStats stats, long modifiedAtMs) { +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java new file mode 100644 index 0000000000000..acc104c2773c5 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.testng.annotations.Test; + +public class AutoScaleConfigTest { + + @Test + public void testDefaultsMatchBrokerConfig() { + AutoScaleConfig c = AutoScaleConfig.fromBrokerConfig(new ServiceConfiguration()); + assertTrue(c.enabled()); + assertEquals(c.maxSegments(), 64); + assertEquals(c.minSegments(), 1); + assertEquals(c.maxDagDepth(), 10); + assertEquals(c.splitCooldown(), Duration.ofSeconds(60)); + assertEquals(c.mergeCooldown(), Duration.ofMinutes(5)); + assertEquals(c.mergeWindow(), Duration.ofMinutes(5)); + assertEquals(c.splitMsgRateIn(), 10_000.0); + assertEquals(c.splitBytesRateIn(), 50_000_000.0); + assertEquals(c.splitMsgRateOut(), 50_000.0); + assertEquals(c.splitBytesRateOut(), 250_000_000.0); + assertEquals(c.mergeMsgRateIn(), 1_000.0); + assertEquals(c.mergeBytesRateIn(), 5_000_000.0); + assertEquals(c.mergeMsgRateOut(), 5_000.0); + assertEquals(c.mergeBytesRateOut(), 25_000_000.0); + + // Split thresholds must sit strictly above the corresponding merge thresholds + // (the hysteresis dead-band). + assertTrue(c.splitMsgRateIn() > c.mergeMsgRateIn()); + assertTrue(c.splitBytesRateIn() > c.mergeBytesRateIn()); + assertTrue(c.splitMsgRateOut() > c.mergeMsgRateOut()); + assertTrue(c.splitBytesRateOut() > c.mergeBytesRateOut()); + } + + @Test + public void testValidationRejectsBadConfig() { + // Zero split threshold: the evaluator scores rate/threshold, so 0 would yield + // Infinity (or NaN for a zero rate) — must be rejected at resolution time. + ServiceConfiguration zeroSplit = new ServiceConfiguration(); + zeroSplit.setScalableTopicSplitMsgRateInThreshold(0); + assertThrows(IllegalArgumentException.class, + () -> AutoScaleConfig.fromBrokerConfig(zeroSplit)); + + // Hysteresis inversion: merge threshold at/above the split threshold. + ServiceConfiguration inverted = new ServiceConfiguration(); + inverted.setScalableTopicMergeMsgRateInThreshold( + inverted.getScalableTopicSplitMsgRateInThreshold()); + assertThrows(IllegalArgumentException.class, + () -> AutoScaleConfig.fromBrokerConfig(inverted)); + + // min/max segment inversion. + ServiceConfiguration minOverMax = new ServiceConfiguration(); + minOverMax.setScalableTopicMinSegments(10); + minOverMax.setScalableTopicMaxSegments(5); + assertThrows(IllegalArgumentException.class, + () -> AutoScaleConfig.fromBrokerConfig(minOverMax)); + + // Negative cooldown. + ServiceConfiguration negativeCooldown = new ServiceConfiguration(); + negativeCooldown.setScalableTopicSplitCooldownSeconds(-1); + assertThrows(IllegalArgumentException.class, + () -> AutoScaleConfig.fromBrokerConfig(negativeCooldown)); + } + + @Test + public void testZeroMergeThresholdsAllowedAsMergeDisable() { + // Merge thresholds of 0 are a legitimate "never merge" setting: no rate is ever + // strictly below 0, so segments never qualify as cold. Must validate cleanly. + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setScalableTopicMergeMsgRateInThreshold(0); + conf.setScalableTopicMergeBytesRateInThreshold(0L); + conf.setScalableTopicMergeMsgRateOutThreshold(0); + conf.setScalableTopicMergeBytesRateOutThreshold(0L); + AutoScaleConfig c = AutoScaleConfig.fromBrokerConfig(conf); + assertEquals(c.mergeMsgRateIn(), 0.0); + } + + @Test + public void testOverriddenBrokerConfigPropagates() { + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setScalableTopicAutoScaleEnabled(false); + conf.setScalableTopicMaxSegments(8); + conf.setScalableTopicMinSegments(2); + conf.setScalableTopicMaxDagDepth(3); + conf.setScalableTopicSplitCooldownSeconds(30); + conf.setScalableTopicMergeCooldownSeconds(120); + conf.setScalableTopicMergeWindowSeconds(90); + conf.setScalableTopicSplitMsgRateInThreshold(1234); + conf.setScalableTopicMergeBytesRateOutThreshold(99L); + + AutoScaleConfig c = AutoScaleConfig.fromBrokerConfig(conf); + assertFalse(c.enabled()); + assertEquals(c.maxSegments(), 8); + assertEquals(c.minSegments(), 2); + assertEquals(c.maxDagDepth(), 3); + assertEquals(c.splitCooldown(), Duration.ofSeconds(30)); + assertEquals(c.mergeCooldown(), Duration.ofSeconds(120)); + assertEquals(c.mergeWindow(), Duration.ofSeconds(90)); + assertEquals(c.splitMsgRateIn(), 1234.0); + assertEquals(c.mergeBytesRateOut(), 99.0); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluatorTest.java new file mode 100644 index 0000000000000..b85845de6477e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluatorTest.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pulsar.common.scalable.SegmentInfo; +import org.apache.pulsar.common.scalable.SegmentLoadStats; +import org.testng.annotations.Test; + +/** + * Unit tests for the pure {@link AutoScalePolicyEvaluator} decision function (PIP-483). + * All inputs are constructed in-memory; there is no broker or metadata store. + */ +public class AutoScalePolicyEvaluatorTest { + + private static final long NOW = 1_700_000_000_000L; + private static final long NO_PRIOR = Long.MIN_VALUE; + + // Split thresholds well above merge thresholds — the dead-band is the hysteresis. + private static final double SPLIT_MSG_IN = 10_000; + private static final double SPLIT_BYTES_IN = 50_000_000; + private static final double SPLIT_MSG_OUT = 50_000; + private static final double SPLIT_BYTES_OUT = 250_000_000; + private static final double MERGE_MSG_IN = 1_000; + private static final double MERGE_BYTES_IN = 5_000_000; + private static final double MERGE_MSG_OUT = 5_000; + private static final double MERGE_BYTES_OUT = 25_000_000; + + private static AutoScaleConfig.AutoScaleConfigBuilder baseConfig() { + return AutoScaleConfig.builder() + .enabled(true) + .maxSegments(64) + .minSegments(1) + .maxDagDepth(10) + .splitCooldown(Duration.ofMinutes(1)) + .mergeCooldown(Duration.ofMinutes(5)) + .mergeWindow(Duration.ofMinutes(5)) + .splitMsgRateIn(SPLIT_MSG_IN) + .splitBytesRateIn(SPLIT_BYTES_IN) + .splitMsgRateOut(SPLIT_MSG_OUT) + .splitBytesRateOut(SPLIT_BYTES_OUT) + .mergeMsgRateIn(MERGE_MSG_IN) + .mergeBytesRateIn(MERGE_BYTES_IN) + .mergeMsgRateOut(MERGE_MSG_OUT) + .mergeBytesRateOut(MERGE_BYTES_OUT); + } + + private static SegmentLayout initialLayout(int segments) { + return SegmentLayout.fromMetadata( + ScalableTopicController.createInitialMetadata(segments, Map.of())); + } + + /** A load sample with the given rates, last modified {@code ageMs} ago. */ + private static SegmentLoadSample sample(double msgIn, double bytesIn, double msgOut, + double bytesOut, long ageMs) { + return new SegmentLoadSample( + new SegmentLoadStats(msgIn, bytesIn, msgOut, bytesOut), NOW - ageMs); + } + + private static SegmentLoadSample cold(long ageMs) { + return sample(0, 0, 0, 0, ageMs); + } + + private static long old() { + return Duration.ofMinutes(10).toMillis(); + } + + private static AutoScaleDecision decide(SegmentLayout layout, + Map load, + Map consumers, + AutoScaleConfig config) { + return AutoScalePolicyEvaluator.decide(layout, load, consumers, config, NOW, + NO_PRIOR, NO_PRIOR); + } + + // --- enable switch --- + + @Test + public void testDisabledReturnsNoAction() { + SegmentLayout layout = initialLayout(1); + Map load = Map.of(0L, sample(1_000_000, 0, 0, 0, old())); + AutoScaleDecision d = decide(layout, load, Map.of("s", 100), + baseConfig().enabled(false).build()); + assertTrue(d instanceof AutoScaleDecision.NoAction); + } + + // --- consumer-driven split --- + + @Test + public void testConsumerDrivenSplitTargetsBusiestSegment() { + SegmentLayout layout = initialLayout(2); + Map load = Map.of( + 0L, sample(100, 0, 0, 0, old()), + 1L, sample(200, 0, 0, 0, old())); + // One subscription with 3 consumers but only 2 segments → need a 3rd segment. + AutoScaleDecision d = decide(layout, load, Map.of("sub", 3), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.Split, d.toString()); + AutoScaleDecision.Split s = (AutoScaleDecision.Split) d; + assertEquals(s.segmentId(), 1L, "should split the busiest segment by msgRateIn"); + assertEquals(s.reason(), "consumer-count"); + } + + @Test + public void testConsumerCountUsesPerSubscriptionMaxNotSum() { + SegmentLayout layout = initialLayout(2); + // Fresh samples (age 0) so the merge pass can't fire — isolates the consumer check. + Map load = Map.of(0L, cold(0), 1L, cold(0)); + // Two subscriptions, 2 consumers each. Per-subscription max is 2, not 4 → 2 == 2 + // segments, no scale-up needed. + AutoScaleDecision d = decide(layout, load, Map.of("a", 2, "b", 2), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.NoAction, + "per-subscription max (2) does not exceed 2 segments"); + } + + @Test + public void testConsumerDrivenSplitRespectsMaxSegments() { + SegmentLayout layout = initialLayout(2); + // Fresh samples so the merge pass can't fire — isolates the split suppression. + Map load = Map.of(0L, cold(0), 1L, cold(0)); + AutoScaleDecision d = decide(layout, load, Map.of("sub", 5), + baseConfig().maxSegments(2).build()); + assertTrue(d instanceof AutoScaleDecision.NoAction, "at maxSegments, no split"); + } + + @Test + public void testConsumerDrivenSplitRespectsSplitCooldown() { + SegmentLayout layout = initialLayout(2); + Map load = Map.of(0L, cold(0), 1L, cold(0)); + long recentSplit = NOW - Duration.ofSeconds(30).toMillis(); // < 1m cooldown + AutoScaleDecision d = AutoScalePolicyEvaluator.decide(layout, load, Map.of("sub", 3), + baseConfig().build(), NOW, recentSplit, NO_PRIOR); + assertTrue(d instanceof AutoScaleDecision.NoAction, "within split cooldown, no split"); + } + + // --- load-driven split --- + + @Test + public void testLoadDrivenSplitOnMsgRateIn() { + SegmentLayout layout = initialLayout(1); + Map load = Map.of(0L, sample(SPLIT_MSG_IN + 1, 0, 0, 0, old())); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.Split, d.toString()); + AutoScaleDecision.Split s = (AutoScaleDecision.Split) d; + assertEquals(s.segmentId(), 0L); + assertEquals(s.reason(), "msgRateIn"); + } + + @Test + public void testLoadDrivenSplitOnBytesRateOut() { + SegmentLayout layout = initialLayout(1); + Map load = + Map.of(0L, sample(0, 0, 0, SPLIT_BYTES_OUT + 1, old())); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.Split, d.toString()); + assertEquals(((AutoScaleDecision.Split) d).reason(), "bytesRateOut"); + } + + @Test + public void testLoadDrivenSplitPicksMostOverloaded() { + SegmentLayout layout = initialLayout(2); + // seg0 at 1.1x, seg1 at 1.5x of the msgRateIn split threshold. + Map load = Map.of( + 0L, sample(SPLIT_MSG_IN * 1.1, 0, 0, 0, old()), + 1L, sample(SPLIT_MSG_IN * 1.5, 0, 0, 0, old())); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.Split, d.toString()); + assertEquals(((AutoScaleDecision.Split) d).segmentId(), 1L, + "should split the more overloaded segment"); + } + + @Test + public void testNoSplitWhenAllUnderThreshold() { + SegmentLayout layout = initialLayout(1); + // Just below every split threshold, freshly updated → not merge-eligible either. + Map load = Map.of(0L, + sample(SPLIT_MSG_IN - 1, SPLIT_BYTES_IN - 1, SPLIT_MSG_OUT - 1, + SPLIT_BYTES_OUT - 1, 0)); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.NoAction); + } + + // --- merge --- + + @Test + public void testMergeColdAdjacentPair() { + SegmentLayout layout = initialLayout(2); + Map load = Map.of(0L, cold(old()), 1L, cold(old())); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.Merge, d.toString()); + AutoScaleDecision.Merge m = (AutoScaleDecision.Merge) d; + assertTrue((m.segmentId1() == 0L && m.segmentId2() == 1L) + || (m.segmentId1() == 1L && m.segmentId2() == 0L)); + } + + @Test + public void testMergeRespectsMinSegments() { + SegmentLayout layout = initialLayout(2); + Map load = Map.of(0L, cold(old()), 1L, cold(old())); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().minSegments(2).build()); + assertTrue(d instanceof AutoScaleDecision.NoAction, "at minSegments, no merge"); + } + + @Test + public void testMergeRespectsMergeCooldown() { + SegmentLayout layout = initialLayout(2); + Map load = Map.of(0L, cold(old()), 1L, cold(old())); + long recentMerge = NOW - Duration.ofMinutes(1).toMillis(); // < 5m cooldown + AutoScaleDecision d = AutoScalePolicyEvaluator.decide(layout, load, Map.of(), + baseConfig().build(), NOW, NO_PRIOR, recentMerge); + assertTrue(d instanceof AutoScaleDecision.NoAction, "within merge cooldown, no merge"); + } + + @Test + public void testMergeRequiresColdForFullWindow() { + SegmentLayout layout = initialLayout(2); + // Cold values, but only updated 1 minute ago — window is 5 minutes. + long recent = Duration.ofMinutes(1).toMillis(); + Map load = Map.of(0L, cold(recent), 1L, cold(recent)); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.NoAction, + "segment must stay cold for the full mergeWindow"); + } + + @Test + public void testMergeRequiresLoadRecordPresent() { + SegmentLayout layout = initialLayout(2); + // No load records at all → no evidence of durable coldness → never merge. + AutoScaleDecision d = decide(layout, new HashMap<>(), Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.NoAction); + } + + @Test + public void testHysteresisDeadBandNoSplitNoMerge() { + SegmentLayout layout = initialLayout(2); + // msgRateIn sits between the merge threshold and the split threshold for seg0: + // not hot enough to split, not cold enough to merge. + Map load = Map.of( + 0L, sample(MERGE_MSG_IN + 1, 0, 0, 0, old()), + 1L, cold(old())); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.NoAction, + "in the dead-band, neither split nor merge"); + } + + @Test + public void testSplitTakesPriorityOverMerge() { + SegmentLayout layout = initialLayout(2); + // seg0 hot (would split), seg0+seg1 also a cold-ish adjacent pair — split wins. + Map load = Map.of( + 0L, sample(SPLIT_MSG_IN + 1, 0, 0, 0, old()), + 1L, cold(old())); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.Split, "split has priority"); + } + + @Test + public void testMergeRespectsMaxDagDepth() { + // Build a layout whose two active segments each already have merge depth 1: + // split(0) → {1,2}; merge(1,2) → {3}; split(3) → {4,5}. Segments 4 and 5 each have + // one merge (node 3) in their ancestry. + SegmentLayout layout = initialLayout(1) + .splitSegment(0, NOW) + .mergeSegments(1, 2, NOW) + .splitSegment(3, NOW); + List active = layout.getActiveSegments().keySet().stream().sorted().toList(); + assertEquals(active, List.of(4L, 5L)); + assertEquals(layout.mergeDepth(4L), 1); + assertEquals(layout.mergeDepth(5L), 1); + + Map load = Map.of(4L, cold(old()), 5L, cold(old())); + + // maxDagDepth=1: both at the cap → no merge. + AutoScaleDecision blocked = decide(layout, load, Map.of(), + baseConfig().maxDagDepth(1).build()); + assertTrue(blocked instanceof AutoScaleDecision.NoAction, "merge blocked at maxDagDepth"); + + // maxDagDepth=2: under the cap → merge allowed. + AutoScaleDecision allowed = decide(layout, load, Map.of(), + baseConfig().maxDagDepth(2).build()); + assertTrue(allowed instanceof AutoScaleDecision.Merge, "merge allowed below maxDagDepth"); + } + + @Test + public void testMergePicksColdestAdjacentPair() { + SegmentLayout layout = initialLayout(4); // ranges tile [0,MAX] in 4 adjacent quarters + // seg0+seg1 combined hotter than seg2+seg3; all below merge thresholds and old. + Map load = Map.of( + 0L, sample(900, 0, 0, 0, old()), + 1L, sample(900, 0, 0, 0, old()), + 2L, sample(10, 0, 0, 0, old()), + 3L, sample(10, 0, 0, 0, old())); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.Merge, d.toString()); + AutoScaleDecision.Merge m = (AutoScaleDecision.Merge) d; + // The coldest adjacent pair is {2,3}. + assertTrue((m.segmentId1() == 2L && m.segmentId2() == 3L) + || (m.segmentId1() == 3L && m.segmentId2() == 2L), + "should pick the coldest adjacent pair {2,3}, got " + m); + } + + @Test + public void testMergedPairIsAlwaysAdjacent() { + SegmentLayout layout = initialLayout(4); + Map load = Map.of( + 0L, cold(old()), 1L, cold(old()), 2L, cold(old()), 3L, cold(old())); + AutoScaleDecision d = decide(layout, load, Map.of(), baseConfig().build()); + assertTrue(d instanceof AutoScaleDecision.Merge, d.toString()); + AutoScaleDecision.Merge m = (AutoScaleDecision.Merge) d; + SegmentInfo a = layout.getAllSegments().get(m.segmentId1()); + SegmentInfo b = layout.getAllSegments().get(m.segmentId2()); + assertTrue(a.hashRange().isAdjacentTo(b.hashRange()), + "merged pair must be hash-range adjacent"); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java new file mode 100644 index 0000000000000..74ad5e8a5a347 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.TransportCnx; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.ScalableTopics; +import org.apache.pulsar.client.admin.Topics; +import org.apache.pulsar.common.api.proto.ScalableConsumerType; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.scalable.SegmentLoadStats; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.coordination.CoordinationService; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl; +import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Integration tests for the controller's auto split/merge wiring (PIP-483): the periodic / + * event-driven evaluation reads load records + consumer counts, runs the evaluator, and + * dispatches to the real splitSegment / mergeSegments paths (against an in-memory metadata + * store and a mocked cross-broker admin client). The decision logic itself is unit-tested in + * {@link AutoScalePolicyEvaluatorTest}; here we verify the plumbing actually fires. + */ +public class ScalableTopicControllerAutoScaleTest { + + private static final String BROKER_ID = "broker-test"; + + private MetadataStoreExtended store; + private CoordinationService coordinationService; + private ScalableTopicResources resources; + private ScheduledExecutorService scheduler; + private BrokerService brokerService; + private PulsarService pulsar; + private ServiceConfiguration config; + private ScalableTopics scalableTopics; + private ScalableTopicController controller; + private TopicName topicName; + + @BeforeMethod + public void setUp() throws Exception { + store = new LocalMemoryMetadataStore("memory:local", MetadataStoreConfig.builder().build()); + coordinationService = new CoordinationServiceImpl(store); + resources = new ScalableTopicResources(store, 30); + scheduler = Executors.newSingleThreadScheduledExecutor(); + topicName = TopicName.get("topic://tenant/ns/my-topic"); + + // Auto-scale tuned for deterministic single-shot evaluation: no cooldowns/windows so a + // single evaluateAutoScaleForTest() call acts immediately, low-ish thresholds. + config = new ServiceConfiguration(); + config.setScalableTopicAutoScaleEnabled(true); + config.setScalableTopicMaxSegments(64); + config.setScalableTopicMinSegments(1); + config.setScalableTopicSplitCooldownSeconds(0); + config.setScalableTopicMergeCooldownSeconds(0); + config.setScalableTopicMergeWindowSeconds(0); + config.setScalableTopicSplitMsgRateInThreshold(10_000); + + brokerService = mock(BrokerService.class); + pulsar = mock(PulsarService.class); + PulsarAdmin admin = mock(PulsarAdmin.class); + Topics topics = mock(Topics.class); + scalableTopics = mock(ScalableTopics.class); + + when(brokerService.getPulsar()).thenReturn(pulsar); + when(brokerService.getTopicIfExists(anyString())) + .thenReturn(CompletableFuture.completedFuture(Optional.empty())); + when(pulsar.getBrokerId()).thenReturn(BROKER_ID); + when(pulsar.getExecutor()).thenReturn(scheduler); + when(pulsar.getConfig()).thenReturn(config); + when(pulsar.getAdminClient()).thenReturn(admin); + when(admin.topics()).thenReturn(topics); + when(admin.scalableTopics()).thenReturn(scalableTopics); + when(scalableTopics.createSegmentAsync(anyString(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(scalableTopics.terminateSegmentAsync(anyString())) + .thenReturn(CompletableFuture.completedFuture(null)); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws Exception { + if (controller != null) { + controller.close().join(); + } + if (coordinationService != null) { + coordinationService.close(); + } + if (store != null) { + store.close(); + } + if (scheduler != null) { + scheduler.shutdownNow(); + } + } + + private void startController(int initialSegments) throws Exception { + resources.createScalableTopicAsync(topicName, + ScalableTopicController.createInitialMetadata(initialSegments, Map.of())).get(); + controller = new ScalableTopicController(topicName, resources, brokerService, + coordinationService); + controller.initialize().get(); + } + + private int activeSegmentCount() throws Exception { + return controller.getLayout().get().getActiveSegments().size(); + } + + @Test + public void testLoadDrivenSplit() throws Exception { + startController(2); + assertEquals(activeSegmentCount(), 2); + + // Segment 0 is hot on msgRateIn → expect a split. + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, "hot segment should have been split"); + } + + @Test + public void testNoSplitWhenUnderThreshold() throws Exception { + startController(2); + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(100, 0, 0, 0)).get(); + + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 2, "no segment over threshold → no change"); + } + + @Test + public void testDisabledConfigIsNoOp() throws Exception { + config.setScalableTopicAutoScaleEnabled(false); + startController(2); + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(1_000_000, 0, 0, 0)).get(); + + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 2, "disabled → no action even when hot"); + } + + @Test + public void testColdSegmentsMerge() throws Exception { + startController(4); + // All segments cold (no load records written → treated as zero; but merge requires a + // record present, so write explicit cold records). mergeWindow=0 so they're eligible. + for (long id = 0; id < 4; id++) { + resources.reportSegmentLoadAsync(topicName, id, new SegmentLoadStats(1, 0, 0, 0)).get(); + } + + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, "a cold adjacent pair should have been merged"); + } + + @Test + public void testConsumerDrivenSplit() throws Exception { + startController(1); + assertEquals(activeSegmentCount(), 1); + + // Two consumers on one segment → need a second segment. registerConsumer fires an + // event-driven evaluation (fire-and-forget); await its effect. + controller.registerConsumer("sub", "c1", 1L, ScalableConsumerType.STREAM, + mock(TransportCnx.class)).get(); + controller.registerConsumer("sub", "c2", 2L, ScalableConsumerType.STREAM, + mock(TransportCnx.class)).get(); + + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted( + () -> assertEquals(activeSegmentCount(), 2, + "2 consumers on 1 segment should drive a split")); + } + + @Test + public void testConsumerBurstConvergesWithoutTicks() throws Exception { + // A group of consumers joining in quick succession must converge to one segment + // each purely from the event-driven evaluations + post-split follow-up chain — no + // periodic tick and no manual evaluation calls. + startController(1); + for (int i = 1; i <= 4; i++) { + controller.registerConsumer("sub", "c" + i, i, ScalableConsumerType.STREAM, + mock(TransportCnx.class)).get(); + } + Awaitility.await().atMost(Duration.ofSeconds(20)).untilAsserted( + () -> assertEquals(activeSegmentCount(), 4, + "4 consumers must drive convergence to 4 segments")); + } + + @Test + public void testSplitCooldownBlocksSecondSplit() throws Exception { + config.setScalableTopicSplitCooldownSeconds(3600); // 1h — blocks a second split + startController(2); + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, "first split happens"); + + // Still hot, but within cooldown → no second split. + resources.reportSegmentLoadAsync(topicName, 1, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, "second split blocked by cooldown"); + } + + @Test + public void testSplitCooldownSurvivesLeaderFailover() throws Exception { + config.setScalableTopicSplitCooldownSeconds(3600); + startController(2); + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, "first split happens"); + + // Leadership moves: close this controller and elect a fresh one. The new leader's + // in-memory cooldown clocks must be re-seeded from the layout (the children's + // createdAtMs records when the last split ran), not reset to "never". + controller.close().join(); + controller = new ScalableTopicController(topicName, resources, brokerService, + coordinationService); + controller.initialize().get(); + + resources.reportSegmentLoadAsync(topicName, 1, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, + "split cooldown must survive failover via layout-derived seeding"); + } + + @Test + public void testFailedSplitDoesNotBurnCooldown() throws Exception { + config.setScalableTopicSplitCooldownSeconds(3600); + startController(2); + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + + // First attempt fails at the segment-topic-creation step. The evaluation future + // surfaces the failure (the production tick wrapper logs-and-swallows it). + when(scalableTopics.createSegmentAsync(anyString(), any())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("injected"))); + assertThrows(java.util.concurrent.ExecutionException.class, + () -> controller.evaluateAutoScaleForTest().get()); + assertEquals(activeSegmentCount(), 2, "failed split leaves the layout unchanged"); + + // The failure must not have started the cooldown: once the transient error clears, + // the next evaluation splits immediately instead of waiting out the hour. + when(scalableTopics.createSegmentAsync(anyString(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, "retry after a failed split is not cooldown-blocked"); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java index 4a0cca1eb7879..dccd8d0e30794 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java @@ -693,6 +693,10 @@ public void testGcTickPrunesDrainedSealedSegmentPastRetention() throws Exception controller.splitSegment(0).get(); assertEquals(controller.sealedSegmentCount(), sealedBefore + 1); + // Give the doomed segment a load record, as the owning broker's reporter would. + resources.reportSegmentLoadAsync(topicName, 0, + new org.apache.pulsar.common.scalable.SegmentLoadStats(1, 1, 1, 1)).get(); + // Tick at the seal time — retention not yet elapsed; nothing pruned. controller.runGcTickAsync().get(); assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L), @@ -705,6 +709,9 @@ public void testGcTickPrunesDrainedSealedSegmentPastRetention() throws Exception "tick past retention must prune the sealed segment"); // Backing topic delete was issued via the segment-aware admin call. verify(scalableTopics).deleteSegmentAsync(anyString(), anyBoolean()); + // The pruned segment's load record is deleted along with it. + assertFalse(resources.getSegmentLoadAsync(topicName, 0).get().isPresent(), + "prune must delete the segment's load record"); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java index 725016f6e673a..bed418719288b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java @@ -325,9 +325,20 @@ public void testDeleteScalableTopicCleansUp() throws Exception { service.createScalableTopic(tn, 2).get(); assertTrue(resources.scalableTopicExistsAsync(tn).get()); + // Seed child records under the topic — a subscription and a per-segment load + // record — to verify the delete is recursive and takes everything with it. + resources.createSubscriptionAsync(tn, "sub-a", + org.apache.pulsar.broker.resources.SubscriptionType.STREAM).get(); + resources.reportSegmentLoadAsync(tn, 0, + new org.apache.pulsar.common.scalable.SegmentLoadStats(1, 1, 1, 1)).get(); + service.deleteScalableTopic(tn).get(); assertFalse(resources.scalableTopicExistsAsync(tn).get()); + assertFalse(resources.subscriptionExistsAsync(tn, "sub-a").get(), + "topic delete must remove subscription records"); + assertFalse(resources.getSegmentLoadAsync(tn, 0).get().isPresent(), + "topic delete must remove segment load records"); verify(scalableTopicsAdmin, org.mockito.Mockito.atLeast(2)) .deleteSegmentAsync(anyString(), anyBoolean()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java index b936820072f7b..364c9299a1124 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java @@ -291,4 +291,37 @@ public void testNextSegmentIdAdvances() { SegmentLayout split2 = split1.splitSegment(1, 0L); assertEquals(split2.getNextSegmentId(), 6); } + + @Test + public void testMergeDepthZeroForNeverMergedSegments() { + SegmentLayout layout = SegmentLayout.fromMetadata( + ScalableTopicController.createInitialMetadata(2, Map.of())); + assertEquals(layout.mergeDepth(0), 0); + assertEquals(layout.mergeDepth(1), 0); + + // Splits never increase merge depth. + SegmentLayout afterSplit = layout.splitSegment(0, 0L); + assertEquals(afterSplit.mergeDepth(2), 0); + assertEquals(afterSplit.mergeDepth(3), 0); + } + + @Test + public void testMergeDepthCountsMergesInLineage() { + // split(0) → {1,2}; merge(1,2) → {3}; split(3) → {4,5}. + SegmentLayout layout = SegmentLayout + .fromMetadata(ScalableTopicController.createInitialMetadata(1, Map.of())) + .splitSegment(0, 0L) + .mergeSegments(1, 2, 0L) + .splitSegment(3, 0L); + + // The merged node 3 contributes one merge to the ancestry of 4 and 5. + assertEquals(layout.mergeDepth(3), 1); + assertEquals(layout.mergeDepth(4), 1); + assertEquals(layout.mergeDepth(5), 1); + + // A second merge in the lineage bumps it to 2. + SegmentLayout twice = layout.mergeSegments(4, 5, 0L); + long mergedAgain = twice.getNextSegmentId() - 1; + assertEquals(twice.mergeDepth(mergedAgain), 2); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporterTest.java new file mode 100644 index 0000000000000..d35c074105e1a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporterTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.util.Optional; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.scalable.SegmentLoadStats; +import org.apache.pulsar.metadata.api.CacheGetResult; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class SegmentLoadReporterTest { + + private static final double THRESHOLD = 0.25; + private static final TopicName TOPIC = TopicName.get("topic://tenant/ns/my-topic"); + + private MetadataStoreExtended store; + private ScalableTopicResources resources; + private SegmentLoadReporter reporter; + + @BeforeMethod + public void setUp() throws Exception { + store = new LocalMemoryMetadataStore("memory:local", MetadataStoreConfig.builder().build()); + resources = new ScalableTopicResources(store, 30); + reporter = new SegmentLoadReporter(resources, THRESHOLD); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws Exception { + if (store != null) { + store.close(); + } + } + + private static SegmentLoadStats stats(double msgIn) { + return new SegmentLoadStats(msgIn, 0, 0, 0); + } + + // --- isMaterialChange (pure) --- + + @Test + public void testMaterialChangeRelativeThreshold() { + // +30% on msgRateIn exceeds the 25% threshold. + assertTrue(SegmentLoadReporter.isMaterialChange(stats(1000), stats(1300), THRESHOLD)); + // +10% does not. + assertFalse(SegmentLoadReporter.isMaterialChange(stats(1000), stats(1100), THRESHOLD)); + // -30% (drop) is symmetric and material. + assertTrue(SegmentLoadReporter.isMaterialChange(stats(1000), stats(700), THRESHOLD)); + } + + @Test + public void testMaterialChangeCrossingZero() { + assertTrue(SegmentLoadReporter.isMaterialChange(stats(0), stats(1), THRESHOLD)); + assertTrue(SegmentLoadReporter.isMaterialChange(stats(1), stats(0), THRESHOLD)); + assertFalse(SegmentLoadReporter.isMaterialChange(stats(0), stats(0), THRESHOLD)); + } + + @Test + public void testMaterialChangeAnyMetric() { + SegmentLoadStats last = new SegmentLoadStats(1000, 1000, 1000, 1000); + // Only bytesRateOut moves materially; still counts. + SegmentLoadStats current = new SegmentLoadStats(1000, 1000, 1000, 2000); + assertTrue(SegmentLoadReporter.isMaterialChange(last, current, THRESHOLD)); + } + + // --- reportIfChanged (against an in-memory store) --- + + @Test + public void testFirstReportAlwaysWrites() throws Exception { + assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(500)).get()); + Optional> got = + resources.getSegmentLoadAsync(TOPIC, 0).get(); + assertTrue(got.isPresent()); + assertEquals(got.get().getValue().msgRateIn(), 500.0); + } + + @Test + public void testImmaterialSecondReportSkipped() throws Exception { + assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get()); + long firstModified = resources.getSegmentLoadAsync(TOPIC, 0).get() + .get().getStat().getModificationTimestamp(); + + // +10% is immaterial → no write → stored value and timestamp unchanged. + assertFalse(reporter.reportIfChanged(TOPIC, 0, stats(1100)).get()); + CacheGetResult after = resources.getSegmentLoadAsync(TOPIC, 0).get().get(); + assertEquals(after.getValue().msgRateIn(), 1000.0); + assertEquals(after.getStat().getModificationTimestamp(), firstModified); + } + + @Test + public void testMaterialSecondReportWrites() throws Exception { + assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get()); + // +50% is material → write. + assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1500)).get()); + assertEquals(resources.getSegmentLoadAsync(TOPIC, 0).get().get().getValue().msgRateIn(), + 1500.0); + } + + @Test + public void testForgetReSeedsBaselineFromStore() throws Exception { + assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get()); + // Without forget, an immaterial sample is skipped. + assertFalse(reporter.reportIfChanged(TOPIC, 0, stats(1050)).get()); + // After forget (unload + re-acquire), the baseline re-seeds from the stored record: + // an immaterial sample is still skipped (so the merge window isn't reset)... + reporter.forget(TOPIC, 0); + assertFalse(reporter.reportIfChanged(TOPIC, 0, stats(1050)).get()); + // ...while a material one writes. + assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(2000)).get()); + } + + @Test + public void testNewOwnerSeedsBaselineFromStore() throws Exception { + // Old owner writes a record. + assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get()); + long modified = resources.getSegmentLoadAsync(TOPIC, 0).get() + .get().getStat().getModificationTimestamp(); + + // Ownership moves: a fresh reporter (empty lastWritten cache) samples a rate within + // the materiality band of the STORED value. It must seed its baseline from the store + // and skip the write — otherwise every rebalance would reset the record's + // modification time and starve the controller's merge window. + SegmentLoadReporter newOwner = new SegmentLoadReporter(resources, THRESHOLD); + assertFalse(newOwner.reportIfChanged(TOPIC, 0, stats(1100)).get()); + assertEquals(resources.getSegmentLoadAsync(TOPIC, 0).get() + .get().getStat().getModificationTimestamp(), modified); + + // A materially different sample still writes. + assertTrue(newOwner.reportIfChanged(TOPIC, 0, stats(2000)).get()); + } + + @Test + public void testIdenticalValueDoesNotBumpModificationTime() throws Exception { + assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get()); + long modified = resources.getSegmentLoadAsync(TOPIC, 0).get() + .get().getStat().getModificationTimestamp(); + + // Bit-identical re-report through the resources layer is a no-op write. + resources.reportSegmentLoadAsync(TOPIC, 0, stats(1000)).get(); + assertEquals(resources.getSegmentLoadAsync(TOPIC, 0).get() + .get().getStat().getModificationTimestamp(), modified); + } + + @Test + public void testDeleteSegmentLoadToleratesMissing() throws Exception { + // No record yet — delete must not fail. + resources.deleteSegmentLoadAsync(TOPIC, 7).get(); + reporter.reportIfChanged(TOPIC, 7, stats(100)).get(); + resources.deleteSegmentLoadAsync(TOPIC, 7).get(); + assertFalse(resources.getSegmentLoadAsync(TOPIC, 7).get().isPresent()); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentLoadReporterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentLoadReporterTest.java new file mode 100644 index 0000000000000..086d82fcce702 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentLoadReporterTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import lombok.Cleanup; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/** + * End-to-end coverage for the broker-side segment load reporter sweep (PIP-483): after a + * scalable topic has live segment topics on the broker, the periodic sweep (driven manually + * here for determinism) must write a {@link org.apache.pulsar.common.scalable.SegmentLoadStats} + * record to the metadata store for each hosted segment, which is what the controller's auto + * split/merge reads. + */ +public class V5SegmentLoadReporterTest extends V5ClientBaseTest { + + @Test + public void testSweepWritesSegmentLoadRecords() throws Exception { + String topic = newScalableTopic(2); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.bytes()) + .topic(topic) + .create(); + // Produce across keys so both initial segments get a live segment topic on the broker. + for (int i = 0; i < 50; i++) { + producer.newMessage().key("k-" + i).value(("v-" + i).getBytes()).send(); + } + + ScalableTopicResources resources = + getPulsar().getPulsarResources().getScalableTopicResources(); + TopicName parent = TopicName.get(topic); + + // Drive the sweep directly rather than waiting for the 10s scheduled tick. + getPulsar().getBrokerService().runSegmentLoadReportOnceForTest(); + + // Both initial segments (0 and 1) should now have a load record. + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + getPulsar().getBrokerService().runSegmentLoadReportOnceForTest(); + assertTrue(resources.getSegmentLoadAsync(parent, 0).get().isPresent(), + "segment 0 load record must be written by the sweep"); + assertTrue(resources.getSegmentLoadAsync(parent, 1).get().isPresent(), + "segment 1 load record must be written by the sweep"); + }); + } + + @Test + public void testNonSegmentTopicsProduceNoLoadRecords() throws Exception { + // A plain persistent topic must not produce any scalable-segment load record. + @Cleanup + org.apache.pulsar.client.api.PulsarClient v4 = + org.apache.pulsar.client.api.PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()).build(); + String plain = "persistent://" + getNamespace() + "/plain-" + System.nanoTime(); + @Cleanup + org.apache.pulsar.client.api.Producer p = + v4.newProducer().topic(plain).create(); + p.send("x".getBytes()); + + // Sweep must not throw on non-segment topics (and obviously writes nothing for them). + getPulsar().getBrokerService().runSegmentLoadReportOnceForTest(); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentLoadStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentLoadStats.java new file mode 100644 index 0000000000000..00701568a3ed7 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentLoadStats.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.scalable; + +/** + * Per-segment load sample for scalable-topic auto split/merge (PIP-483). + * + *

Written by the broker that owns a segment's {@code segment://} topic, directly to the + * metadata store under {@code .../segments/{segmentId}/load}, and read by the controller + * leader's auto-scaling evaluator. To keep write volume bounded, the owning broker only + * rewrites this record when one of the rates changes by more than a significant threshold + * (default ±25%) since the last write — see {@code SegmentLoadReporter}. + * + *

The record carries no timestamp of its own: the metadata store exposes the record's + * last-modified time via its {@code Stat}, and the controller uses that for the "cold for + * at least mergeWindow" check. + * + * @param msgRateIn inbound messages per second (60s rolling average, from {@code TopicStats}) + * @param bytesRateIn inbound bytes per second + * @param msgRateOut outbound (dispatched) messages per second, summed across subscriptions + * @param bytesRateOut outbound bytes per second + */ +public record SegmentLoadStats( + double msgRateIn, + double bytesRateIn, + double msgRateOut, + double bytesRateOut +) { + public static final SegmentLoadStats ZERO = new SegmentLoadStats(0, 0, 0, 0); +}