Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,152 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private int scalableTopicConsumerSessionGracePeriodSeconds = 60;

/**** --- Scalable topic auto split/merge (PIP-483). --- ****/

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Cluster-wide default for scalable-topic auto split/merge. When true, the controller "
+ "leader automatically splits hot segments and merges cold ones, within the caps "
+ "below. Can be overridden per-namespace and per-topic."
)
private boolean scalableTopicAutoScaleEnabled = true;

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
doc = "Cadence (seconds) of the controller's periodic traffic-driven auto split/merge "
+ "evaluation. Consumer-count changes are handled event-driven and are not affected "
+ "by this interval. Read when a controller wins leadership; not dynamic."
)
private int scalableTopicAutoScaleIntervalSeconds = 60;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Hard ceiling on the number of active segments a scalable topic can be auto-scaled to. "
+ "Splits stop firing once this is reached."
)
private int scalableTopicMaxSegments = 64;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Hard floor on the number of active segments. Merges stop firing once this is reached."
)
private int scalableTopicMinSegments = 1;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Max number of merges allowed in a segment's lineage. Once a segment reaches this depth "
+ "it stops being a merge candidate (load-driven splits are still allowed), bounding "
+ "split/merge flip-flopping."
)
private int scalableTopicMaxDagDepth = 10;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Minimum time (seconds) between automatic splits on a topic. Deliberately short — it "
+ "only coalesces a burst of near-simultaneous triggers (e.g. a consumer group "
+ "connecting at once)."
)
private int scalableTopicSplitCooldownSeconds = 60;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Minimum time (seconds) between automatic merges on a topic."
)
private int scalableTopicMergeCooldownSeconds = 300;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "How long (seconds) a segment must continuously stay below every merge threshold before "
+ "it becomes merge-eligible."
)
private int scalableTopicMergeWindowSeconds = 300;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Inbound messages/second above which a segment is split."
)
private double scalableTopicSplitMsgRateInThreshold = 10_000;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Inbound bytes/second above which a segment is split."
)
private long scalableTopicSplitBytesRateInThreshold = 50_000_000L;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Outbound (dispatched) messages/second above which a segment is split."
)
private double scalableTopicSplitMsgRateOutThreshold = 50_000;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Outbound bytes/second above which a segment is split."
)
private long scalableTopicSplitBytesRateOutThreshold = 250_000_000L;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Inbound messages/second below which a segment counts as cold for merging."
)
private double scalableTopicMergeMsgRateInThreshold = 1_000;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Inbound bytes/second below which a segment counts as cold for merging."
)
private long scalableTopicMergeBytesRateInThreshold = 5_000_000L;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Outbound messages/second below which a segment counts as cold for merging."
)
private double scalableTopicMergeMsgRateOutThreshold = 5_000;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Outbound bytes/second below which a segment counts as cold for merging."
)
private long scalableTopicMergeBytesRateOutThreshold = 25_000_000L;

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
doc = "Interval (seconds) at which the segment-owning broker samples its segment topics to "
+ "report load for auto split/merge. Read at broker start; not dynamic."
)
private int scalableTopicLoadReportIntervalSeconds = 10;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Minimum relative change in any segment rate (e.g. 0.25 = 25%) since the last write that "
+ "triggers a new load record. Keeps metadata write volume bounded; a steady-state "
+ "segment writes once and goes quiet.\n"
+ "Note: the band is anchored at the last written value, not at the split/merge "
+ "thresholds. A rate that settles within the band of the last record is never "
+ "re-reported, so a segment can sustain up to this factor beyond a split/merge "
+ "threshold without triggering — the cost of bounded write volume. Lower the "
+ "threshold for tighter tracking at the price of more metadata writes."
)
private double scalableTopicLoadReportRateChangeThreshold = 0.25;

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import lombok.CustomLog;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.scalable.SegmentLoadStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
Expand Down Expand Up @@ -64,6 +66,8 @@ public class ScalableTopicResources extends BaseResources<ScalableTopicMetadata>
private static final String SCALABLE_TOPIC_PATH = "/topics";
private static final String SUBSCRIPTIONS_SEGMENT = "subscriptions";
private static final String CONSUMERS_SEGMENT = "consumers";
private static final String SEGMENTS_SEGMENT = "segments";
private static final String LOAD_SEGMENT = "load";

/**
* Use the topic's {@code properties} map verbatim as the secondary-index entries.
Expand All @@ -77,6 +81,7 @@ public class ScalableTopicResources extends BaseResources<ScalableTopicMetadata>

private final MetadataCache<SubscriptionMetadata> subscriptionCache;
private final MetadataCache<ConsumerRegistration> consumerRegistrationCache;
private final MetadataCache<SegmentLoadStats> segmentLoadCache;

/**
* Per-path listeners for scalable-topic metadata events. Each listener watches a
Expand Down Expand Up @@ -107,6 +112,7 @@ public ScalableTopicResources(MetadataStore store, int operationTimeoutSec) {
super(store, ScalableTopicMetadata.class, operationTimeoutSec);
this.subscriptionCache = store.getMetadataCache(SubscriptionMetadata.class);
this.consumerRegistrationCache = store.getMetadataCache(ConsumerRegistration.class);
this.segmentLoadCache = store.getMetadataCache(SegmentLoadStats.class);
// Single shared metadata-store listener fans out to both per-path and
// per-namespace subscribers. Per-subscriber lifecycle goes through the
// register / deregister methods below.
Expand Down Expand Up @@ -260,7 +266,10 @@ public CompletableFuture<Void> updateScalableTopicAsync(TopicName tn,
}

public CompletableFuture<Void> deleteScalableTopicAsync(TopicName tn) {
return deleteAsync(topicPath(tn));
// Recursive: the topic record has children — the controller leader lock, the
// subscriptions (and their consumer registrations), and the per-segment load
// records — all of which must go with the topic.
return getStore().deleteRecursive(topicPath(tn));
}

public CompletableFuture<Boolean> scalableTopicExistsAsync(TopicName tn) {
Expand Down Expand Up @@ -432,6 +441,53 @@ public CompletableFuture<List<String>> listConsumersAsync(TopicName tn, String s
/**
* Get the metadata store path for the controller leader lock.
*/
// --- Segment load records (PIP-483 auto split/merge) ---

/**
* Upsert a segment's load record. Written by the broker that owns the segment's
* {@code segment://} topic, only when the rates have changed materially since the last
* write (the materiality decision lives in {@code SegmentLoadReporter}).
*
* <p>An identical value is NOT rewritten: the record's {@code Stat} modification time is
* what the controller uses as "cold since" for the merge window, so a no-op rewrite —
* e.g. the first report after segment ownership moved to a broker with an empty
* last-written cache — would spuriously reset the window and starve merges under
* frequent rebalancing.
*/
public CompletableFuture<Void> reportSegmentLoadAsync(TopicName tn, long segmentId,
SegmentLoadStats stats) {
String path = segmentLoadPath(tn, segmentId);
return segmentLoadCache.get(path).thenCompose(existing -> {
if (existing.isPresent() && existing.get().equals(stats)) {
return CompletableFuture.completedFuture(null);
}
return segmentLoadCache.readModifyUpdateOrCreate(path, __ -> stats)
.thenApply(__ -> null);
});
}

/**
* Read a segment's load record together with its metadata {@link Stat} — the controller's
* auto-scaling evaluator uses {@code stat.getModificationTimestamp()} to tell how long the
* segment has held its current load (the "cold for at least mergeWindow" check).
*
* @return the value and its stat, or empty if no record has been written yet
*/
public CompletableFuture<Optional<CacheGetResult<SegmentLoadStats>>> getSegmentLoadAsync(
TopicName tn, long segmentId) {
return segmentLoadCache.getWithStats(segmentLoadPath(tn, segmentId));
}

/** Delete a segment's load record (best-effort; tolerates a missing record). */
public CompletableFuture<Void> deleteSegmentLoadAsync(TopicName tn, long segmentId) {
return segmentLoadCache.delete(segmentLoadPath(tn, segmentId))
.exceptionally(ignoreMissing());
}

public String segmentLoadPath(TopicName tn, long segmentId) {
return joinPath(topicPath(tn), SEGMENTS_SEGMENT, Long.toString(segmentId), LOAD_SEGMENT);
}

public String controllerLockPath(TopicName tn) {
return joinPath(topicPath(tn), "controller");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.service.scalable.SegmentLoadReporter;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -185,6 +186,8 @@
import org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.scalable.SegmentLoadStats;
import org.apache.pulsar.common.scalable.SegmentTopicName;
import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
Expand Down Expand Up @@ -285,6 +288,9 @@ public class BrokerService implements Closeable {
private final SingleThreadNonConcurrentFixedRateScheduler compactionMonitor;
private final SingleThreadNonConcurrentFixedRateScheduler consumedLedgersMonitor;
private SingleThreadNonConcurrentFixedRateScheduler deduplicationSnapshotMonitor;
/** PIP-483: periodic sweep that writes per-segment load records for auto split/merge. */
private SingleThreadNonConcurrentFixedRateScheduler segmentLoadReporterMonitor;
private SegmentLoadReporter segmentLoadReporter;
protected final PublishRateLimiter brokerPublishRateLimiter;
private final DispatchRateLimiterFactory dispatchRateLimiterFactory;
protected volatile DispatchRateLimiter brokerDispatchRateLimiter = null;
Expand Down Expand Up @@ -682,9 +688,69 @@ public void start() throws Exception {
this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
this.startSegmentLoadReporter();
this.startClearInvalidateTopicNameCacheTask();
}

/**
* Start the periodic per-segment load reporter (PIP-483). On each tick this broker sweeps
* the {@code segment://} topics it currently hosts, computes their ingest/dispatch rates,
* and writes a {@link SegmentLoadStats} record to the metadata store — but only when a rate
* changed materially since the last write (see {@link SegmentLoadReporter}). The controller
* leader reads these records to drive auto split/merge.
*/
protected void startSegmentLoadReporter() {
ServiceConfiguration conf = pulsar().getConfiguration();
if (!conf.isScalableTopicsEnabled()) {
return;
}
var resources = pulsar().getPulsarResources().getScalableTopicResources();
if (resources == null) {
return;
}
int interval = conf.getScalableTopicLoadReportIntervalSeconds();
if (interval <= 0) {
return;
}
this.segmentLoadReporter = new SegmentLoadReporter(resources,
() -> pulsar().getConfiguration().getScalableTopicLoadReportRateChangeThreshold());
this.segmentLoadReporterMonitor =
new SingleThreadNonConcurrentFixedRateScheduler("scalable-segment-load-reporter");
segmentLoadReporterMonitor.scheduleAtFixedRateNonConcurrently(
() -> forEachTopic(this::reportSegmentLoad), interval, interval, TimeUnit.SECONDS);
}

@VisibleForTesting
public void runSegmentLoadReportOnceForTest() {
forEachTopic(this::reportSegmentLoad);
}

private void reportSegmentLoad(Topic topic) {
SegmentLoadReporter reporter = this.segmentLoadReporter;
if (reporter == null) {
return;
}
TopicName topicName = TopicName.get(topic.getName());
if (topicName.getDomain() != TopicDomain.segment) {
return;
}
try {
TopicName parent = SegmentTopicName.getParentTopicName(topicName);
long segmentId = SegmentTopicName.getSegmentId(topicName);
var stats = topic.getStats(false, false, false);
SegmentLoadStats load = new SegmentLoadStats(
stats.msgRateIn, stats.msgThroughputIn, stats.msgRateOut, stats.msgThroughputOut);
reporter.reportIfChanged(parent, segmentId, load).exceptionally(ex -> {
log.debug().attr("segment", topicName).exceptionMessage(ex)
.log("Failed to report segment load");
return null;
});
} catch (Exception e) {
log.debug().attr("segment", topicName).exceptionMessage(e)
.log("Failed to sample segment load");
}
}

protected void startClearInvalidateTopicNameCacheTask() {
final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
inactivityMonitor.scheduleAtFixedRateNonConcurrently(
Expand Down Expand Up @@ -966,7 +1032,8 @@ public CompletableFuture<Void> closeAsync() {
consumedLedgersMonitor,
backlogQuotaChecker,
topicOrderedExecutor,
deduplicationSnapshotMonitor)
deduplicationSnapshotMonitor,
segmentLoadReporterMonitor)
.handle());

CompletableFuture<Void> combined =
Expand Down Expand Up @@ -2696,9 +2763,34 @@ private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle,
if (compactor != null) {
compactor.getStats().removeTopic(topic);
}
forgetSegmentLoad(topic);
topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.SUCCESS);
}

/**
* Drop the load reporter's last-written cache entry for a segment topic this broker no
* longer owns (unload / delete). Without this the cache grows unboundedly with segment
* churn, and on a later re-acquire the first sample could be wrongly suppressed as
* immaterial.
*/
private void forgetSegmentLoad(String topic) {
SegmentLoadReporter reporter = this.segmentLoadReporter;
if (reporter == null) {
return;
}
TopicName topicName = TopicName.get(topic);
if (topicName.getDomain() != TopicDomain.segment) {
return;
}
try {
reporter.forget(SegmentTopicName.getParentTopicName(topicName),
SegmentTopicName.getSegmentId(topicName));
} catch (Exception e) {
log.debug().attr("segment", topicName).exceptionMessage(e)
.log("Failed to forget segment load cache entry");
}
}

public long getNumberOfNamespaceBundles() {
this.numberOfNamespaceBundles = 0;
this.multiLayerTopicsMap.forEach((namespaceName, bundles) -> {
Expand Down
Loading
Loading