[improve][broker] PIP-483: scalable topic auto split/merge#25980
[improve][broker] PIP-483: scalable topic auto split/merge#25980merlimat wants to merge 14 commits into
Conversation
First implementation increment for PIP-483: the pure, I/O-free decision logic and its data types. No live broker paths are wired yet. - SegmentLoadStats: persisted per-segment load record (4 rates). - AutoScaleConfig: fully-resolved policy (caps, cooldowns, split/merge thresholds), per-field javadoc. - AutoScaleDecision: sealed Split | Merge | NoAction with reason strings. - SegmentLoadSample: evaluator input (stats + metadata-store modified time). - AutoScalePolicyEvaluator.decide(): split pass (consumer-count then traffic, gated by splitCooldown and maxSegments) followed by merge pass (cold adjacent pair, gated by mergeCooldown, mergeWindow, minSegments and maxDagDepth). - SegmentLayout.mergeDepth(): counts merges in a segment's lineage, used by the depth cap. Exhaustively unit-tested in AutoScalePolicyEvaluatorTest (18 cases) plus mergeDepth coverage in SegmentLayoutTest.
Second increment for PIP-483: the cluster-wide broker configuration and the resolver that flattens it into the policy the evaluator reads. - ServiceConfiguration: 17 scalableTopicAutoScale*/threshold/loadReport knobs (all dynamic, CATEGORY_POLICIES) with the PIP defaults (auto-scale on, maxSegments=64, maxDagDepth=10, splitCooldown=60s, mergeCooldown/Window=300s, the four split + four merge rate thresholds, loadReportInterval=10s, rateChangeThreshold=0.25). - AutoScaleConfig.fromBrokerConfig(): resolves the broker config into AutoScaleConfig. Namespace/topic override layering will graft onto this via toBuilder() in a later increment. - AutoScaleConfigTest: defaults match the PIP, the split>merge hysteresis invariant holds, and overridden config propagates.
Third increment for PIP-483: the metadata-store path for per-segment load and the materiality-gated writer.
- ScalableTopicResources: SegmentLoadStats cache + reportSegmentLoadAsync (upsert), getSegmentLoadAsync (value + Stat, so the controller can read the modification timestamp for windowing), deleteSegmentLoadAsync (tolerates missing), and segmentLoadPath (.../segments/{id}/load).
- SegmentLoadReporter: writes a sample only when it changed materially since the last write — a rate moved by more than the relative threshold (default 25%) or crossed to/from zero — keeping metadata write volume bounded. forget() drops the cached last-written value when the broker stops owning a segment.
The reporter owns only the materiality decision + last-written cache; sampling live TopicStats and scheduling the sweep are wired in by the broker in a later increment.
Tested: isMaterialChange (relative threshold, zero-crossing, any-metric) plus reportIfChanged against an in-memory store (first-write, immaterial-skip keeps the Stat timestamp put, material-write, forget-forces-rewrite, delete-tolerates-missing).
Fourth increment for PIP-483: the controller leader now drives auto split/merge. - Two evaluation triggers: a periodic AutoScaleTick (cadence scalableTopicAutoScaleIntervalSeconds, default 60s) for traffic-driven decisions, and an event-driven evaluation fired the moment a stream/checkpoint consumer registers (so consumer-count scale-up reacts within seconds, no polling). Both scheduled on leadership win, cancelled on leader-loss/close, mirroring the GC tick. - evaluateAndAct(): collects per-subscription consumer counts (managed coordinators only — QUEUE bypass subs aren't tracked here, so they're naturally excluded) and per-segment load samples (value + Stat modified time), runs the pure AutoScalePolicyEvaluator, and dispatches to the existing splitSegment / mergeSegments. An AtomicBoolean serializes evaluation+dispatch so concurrent ticks/consumer events never launch overlapping auto operations. - splitSegment/mergeSegments set lastSplitAtMs/lastMergeAtMs at entry, so the split/merge cooldowns cover manual operations too. Tested in ScalableTopicControllerAutoScaleTest against an in-memory store + mocked admin: load-driven split, no-split-under-threshold, disabled-config no-op, cold-pair merge, consumer-driven split (event-driven), and split-cooldown-blocks-second-split.
Fifth increment for PIP-483: the connecting piece that actually populates the load records the controller reads. BrokerService.startSegmentLoadReporter() starts a dedicated periodic sweep (scalable-segment-load-reporter, cadence scalableTopicLoadReportIntervalSeconds, default 10s) when scalable topics are enabled. Each tick iterates the segment:// topics this broker hosts via forEachTopic, derives the parent topic + segment id from the segment name, reads the topic's in/out msg and byte rates from getStats, and hands them to the SegmentLoadReporter — which writes to the metadata store only on a material change. The monitor is registered for graceful shutdown alongside the other broker monitors. Tested end-to-end in V5SegmentLoadReporterTest: producing across a 2-segment scalable topic and running the sweep writes a load record for each segment; a plain persistent topic produces none and the sweep tolerates non-segment topics.
|
I performed an AI assisted (Claude Fable 5) review and these were the findings. Some might be useful to address. Findings
|
lhotari
left a comment
There was a problem hiding this comment.
LGTM, just check the local Claude Fable 5 review findings and the comments from my actual review. Feel free to mark them resolved without waiting for a new review.
| stats.msgRateIn() / config.splitMsgRateIn(), | ||
| stats.bytesRateIn() / config.splitBytesRateIn(), | ||
| stats.msgRateOut() / config.splitMsgRateOut(), | ||
| stats.bytesRateOut() / config.splitBytesRateOut(), |
There was a problem hiding this comment.
does divide by 0 (NaN for double) behave in a correct way?
| if (!config.enabled()) { | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
| if (!autoScaleInFlight.compareAndSet(false, true)) { |
There was a problem hiding this comment.
How does the concurrency of auto scaling logic work in the case that the user performs "manual" split or merge
with org.apache.pulsar.broker.admin.v2.ScalableTopics#splitSegment (/{tenant}/{namespace}/{topic}/split/{segmentId}) and org.apache.pulsar.broker.admin.v2.ScalableTopics#mergeSegments (/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}) REST APIs?
Is it planned as part of PIP-483 to be able to instead of splitting and merging manually with the REST API to change maxSegments, minSegments and maxDagDepth per scalable topic? It could be useful to have some setting to control whether the change is made immediately or only when the auto scaling rules take place. I guess scaling immediately after increasing minSegments would have some separate challenges for picking the segments which are worth splitting. I guess it could be done based on load stats also in that case. Decreasing maxSegments would be a similar case but in the other direction.
…ous throw If anything between the successful compareAndSet and the future-chain construction threw synchronously, the whenComplete callback never existed and the flag stayed set forever — silently disabling auto-scaling on the topic until a leadership change. Wrap the chain construction so the guard is released on a synchronous throw too.
…) on unload
Two leaks from the review:
- The GC tick pruned a segment from the layout and deleted its backing topic but left the .../segments/{id}/load znode behind forever. The prune fan-out now deletes the load record alongside the backing topic (deleteSegmentLoadAsync already tolerates a missing record).
- SegmentLoadReporter.forget() had no production caller, so the per-broker lastWritten cache grew unboundedly with segment churn, and a re-acquired segment's first sample could be wrongly suppressed as immaterial. BrokerService.removeTopicFromCache now drops the cache entry when a segment topic is unloaded/deleted from this broker.
The GC controller test now seeds a load record for the doomed segment and asserts it is gone after the prune tick.
…e terminology - deleteScalableTopicAsync now uses MetadataStore.deleteRecursive, so deleting a scalable topic removes everything under its record: the controller leader lock, subscription + consumer registrations, and the per-segment load records. This also fixes the pre-existing subscription-record leak on topic delete. Covered by testDeleteScalableTopicCleansUp, which now seeds a subscription and a load record and asserts both are gone. - Replace 'znode' with store-neutral wording in the javadoc/comments this PR introduced — ZooKeeper is only one metadata backend.
The merge window derives from the load record's Stat modification time, so any rewrite of an effectively-unchanged record restarts the 'cold since' clock. Two layers fixed: - SegmentLoadReporter.reportIfChanged: on a local cache miss (broker restart or ownership just moved here) the materiality baseline is now seeded from the record already in the store and the ±threshold gate applies against it — instead of writing the first sample unconditionally. Under frequent load-manager rebalancing this was enough to starve merges cluster-wide. - ScalableTopicResources.reportSegmentLoadAsync: a bit-identical value is no longer rewritten (readModifyUpdateOrCreate always puts, even for an equal value), as a backstop for any caller bypassing the reporter. Also documents the controller-clock vs store-clock skew on the coldEnough window check. forget() keeps its cache-hygiene role; its javadoc now reflects that a re-acquire re-seeds from the store rather than force-rewriting (test updated accordingly, plus new coverage for the new-owner seeding and the identical-value no-op).
Nothing previously enforced the invariants the evaluator depends on. In particular a split threshold configured as 0 makes the overload score rate/threshold yield Infinity for any positive rate (permanent split pressure) and NaN for a zero rate (silently ignored) — doubles never throw on division by zero, so the misbehavior was invisible. AutoScaleConfig.validated(), invoked from fromBrokerConfig (and reusable by the future namespace/topic override resolution), now rejects: minSegments < 1, maxSegments < minSegments, negative maxDagDepth/cooldowns/window, non-positive split thresholds, negative merge thresholds, and split thresholds not strictly above their merge counterparts (the hysteresis dead-band the javadoc already required). A violation surfaces as a warn-logged IllegalArgumentException on each evaluation until the operator fixes the config. Merge thresholds of 0 remain valid as an explicit 'never merge' setting.
… operations Two cooldown-clock fixes: - lastSplitAtMs/lastMergeAtMs were set at splitSegment/mergeSegments entry, so an attempt that failed (e.g. segment-topic creation error, CAS conflict) still consumed the cooldown and blocked the retry for the full window. They are now set only after the layout CAS has succeeded. - The clocks are in-memory and used to reset on every leader failover, allowing e.g. an auto merge seconds after one ran on the previous leader. The new leader now seeds them from the layout itself: a split's children carry exactly one parentId and a merge's child carries two, so the most recent createdAtMs of each class is precisely when the last split/merge happened. No new persistent state. New tests: split cooldown enforced across a close + re-elect cycle, and a failed split leaving the layout unchanged retries immediately once the injected error clears.
Several knobs were marked dynamic=true but read once: - scalableTopicAutoScaleEnabled: turning it ON after it was off at leader election had no effect until the next leadership cycle, because the tick was never scheduled. The tick is now scheduled unconditionally (a disabled evaluation is a cheap no-op that re-reads the flag), making enable/disable genuinely dynamic. - scalableTopicLoadReportRateChangeThreshold: was captured at broker start in the reporter constructor. The reporter now takes a DoubleSupplier and re-reads the config on every sample (the double constructor remains for tests). - scalableTopicAutoScaleIntervalSeconds and scalableTopicLoadReportIntervalSeconds genuinely are read once (at leadership win / broker start respectively): marked dynamic=false and documented, rather than pretending. The rate thresholds, cooldowns, windows and caps were already dynamic — AutoScaleConfig.fromBrokerConfig re-resolves them on every evaluation.
…low-ups Two burst-behavior gaps: - A trigger arriving while an evaluation was in flight (CAS busy) was silently dropped and only recovered by the next periodic tick — e.g. a consumer registering mid-evaluation. The busy path now sets a re-evaluate flag that the in-flight run honors on completion, so coalesced triggers are never lost. - After a successful auto split, one follow-up evaluation is scheduled right after the split cooldown expires. A burst of N consumers on one segment now converges at one split per cooldown (the chain stops at the first NoAction decision) instead of one split per periodic tick, which is slower whenever the cooldown is shorter than the tick. New test: 4 consumers registering on a 1-segment topic converge to 4 segments purely from event-driven evaluations + the follow-up chain, with no periodic tick and no manual evaluation calls.
The load reporter's significant-change gate is anchored at the last written value, not at the split/merge thresholds: a rate that settles within the band of the last record is never re-reported, so a segment can sustain up to the configured factor beyond a threshold without the controller seeing it. The effect is path-dependent but one-directional (delays action, never causes a spurious one) and bounded by the knob. Accepted as the cost of bounded metadata write volume; now documented on scalableTopicLoadReportRateChangeThreshold and in the SegmentLoadReporter javadoc so operators know to lower the threshold for tighter tracking.
Implements PIP-483: Scalable Topic Auto Split/Merge (sub-PIP of PIP-468). The controller now scales a scalable topic's segment count automatically, default-on cluster-wide, bounded by hard caps and asymmetric cooldowns.
End-to-end flow: each broker reports per-segment load to the metadata store → the controller leader reads load + consumer counts → a pure decision function decides → it dispatches to the existing split/merge protocols.
Increments (one commit each, each independently tested)
AutoScalePolicyEvaluator.decide(...), a pure I/O-free function: split pass (consumer-count then traffic, gated bysplitCooldown+maxSegments) then merge pass (cold adjacent pair, gated bymergeCooldown+mergeWindow+minSegments+maxDagDepth). PlusSegmentLoadStats,AutoScaleConfig,AutoScaleDecision,SegmentLoadSample, andSegmentLayout.mergeDepth().scalableTopic*knobs (auto-scale on,maxSegments=64,maxDagDepth=10,splitCooldown=60s,mergeCooldown/Window=300s, four split + four merge rate thresholds, etc.) andAutoScaleConfig.fromBrokerConfig().ScalableTopicResourcesload get/put/delete paths andSegmentLoadReporter, which writes a sample only when a rate changed materially (default ±25%) or crossed zero, keeping metadata write volume bounded.AutoScaleTick(traffic) + event-driven evaluation on stream/checkpoint consumer registration (consumer-count scale-up within seconds, no polling). Serialized by an in-flight guard;splitSegment/mergeSegmentsset the cooldowns so manual ops count too.BrokerServiceperiodically sweeps thesegment://topics it hosts, reads their in/out msg+byte rates, and feeds the reporter. This populates the records the controller reads.Design highlights
splitCooldowncoalesces bursts); merges require a durable cold window and a longer cooldown.Test plan
AutoScalePolicyEvaluatorTest(20 cases) — every split/merge rule, caps, cooldowns, hysteresis, adjacency, max-depth.AutoScaleConfigTest,SegmentLoadReporterTest,SegmentLayoutTest(mergeDepth).ScalableTopicControllerAutoScaleTest— load-driven split, consumer-driven split (event path), cold-pair merge, disabled no-op, cooldown blocks second split (against in-memory store + mocked admin).V5SegmentLoadReporterTest— end-to-end: producing across a scalable topic + running the sweep writes a load record per segment.org.apache.pulsar.broker.service.scalable.*suite + checkstyle.Follow-up (separate PR)
Per-namespace and per-topic
AutoScalePolicyOverride+admin.scalableTopics().set/get/removeAutoScalePolicy(...). The feature is fully functional via the cluster config without it.