From 4ddfd6f4363ba795e527b59e253f4103ae4fd170 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 11 Jun 2026 20:29:38 -0700 Subject: [PATCH 1/4] [improve][broker] PIP-483: namespace + topic auto split/merge policy overrides MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the auto split/merge core (#25980): per-namespace and per-topic policy overrides, resolved most-specific-wins on top of the broker defaults. - AutoScalePolicyOverride: all-optional override carrying the same knobs as the broker config (caps, cooldowns, window, the eight rate thresholds, enabled). Unset fields fall through; enabled=false opts a namespace or topic out entirely. - Storage: Policies.scalableTopicAutoScalePolicy (namespace) and ScalableTopicMetadata.autoScalePolicy (topic, both broker-internal and admin wire shape). SegmentLayout.toMetadata now takes the original record and carries over all non-layout fields — without this, every split/merge CAS would silently drop the topic override. - Resolution: AutoScaleConfig.resolve(conf, nsOverride, topicOverride) layers the overrides via toBuilder and runs the existing invariant validation on the result, so an override that is only invalid in combination (e.g. merge threshold raised above the default split threshold) is rejected. - Controller: evaluateAndAct resolves the effective policy per evaluation from the (cached) namespace policies + topic metadata, so override changes take effect on the next tick with no controller restart. - Admin API: admin.scalableTopics().set/get/removeAutoScalePolicy(topic) and admin.namespaces().set/get/removeScalableTopicAutoScalePolicy(ns), with REST endpoints, PolicyName.SCALABLE_TOPIC_AUTO_SCALE authorization, and 412 on invariant violations at set time. Tests: resolve-layering + invalid-combination units; controller integration (namespace disable, topic-wins-over-namespace, per-topic maxSegments cap, override survives a split's metadata rewrite); end-to-end admin round-trips at both levels incl. 412 and 404 paths. --- .../resources/ScalableTopicMetadata.java | 7 + .../broker/admin/impl/NamespacesBase.java | 32 +++++ .../pulsar/broker/admin/v2/Namespaces.java | 96 +++++++++++++ .../broker/admin/v2/ScalableTopics.java | 136 ++++++++++++++++++ .../service/scalable/AutoScaleConfig.java | 81 ++++++++++- .../scalable/ScalableTopicController.java | 50 +++++-- .../service/scalable/SegmentLayout.java | 9 +- .../ScalableTopicAutoScalePolicyTest.java | 106 ++++++++++++++ .../service/scalable/AutoScaleConfigTest.java | 46 ++++++ .../ScalableTopicControllerAutoScaleTest.java | 97 +++++++++++++ .../service/scalable/SegmentLayoutTest.java | 7 +- .../pulsar/client/admin/Namespaces.java | 67 +++++++++ .../pulsar/client/admin/ScalableTopics.java | 49 +++++++ .../data/AutoScalePolicyOverride.java | 90 ++++++++++++ .../pulsar/common/policies/data/Policies.java | 5 +- .../policies/data/ScalableTopicMetadata.java | 6 + .../client/admin/internal/NamespacesImpl.java | 40 ++++++ .../admin/internal/ScalableTopicsImpl.java | 38 +++++ .../common/policies/data/PolicyName.java | 1 + 19 files changed, 944 insertions(+), 19 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java create mode 100644 pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java index 8af852316514f..228e2c25e6518 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java @@ -24,6 +24,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.scalable.SegmentInfo; /** @@ -51,4 +52,10 @@ public class ScalableTopicMetadata { /** User-defined topic properties. */ @Builder.Default private Map properties = Map.of(); + + /** + * Per-topic auto split/merge policy override (PIP-483). {@code null} means no override: + * the namespace policy and then the broker configuration apply. + */ + private AutoScalePolicyOverride autoScalePolicy; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index baff684bd08c8..5b14c909d7c16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -66,6 +66,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.scalable.AutoScaleConfig; import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter; import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache; import org.apache.pulsar.broker.web.RestException; @@ -84,6 +85,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -1308,6 +1310,36 @@ protected CompletableFuture internalSetAutoTopicCreationAsync( })); } + protected CompletableFuture internalGetScalableTopicAutoScalePolicyAsync() { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SCALABLE_TOPIC_AUTO_SCALE, + PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenApply(policies -> policies.scalableTopicAutoScalePolicy); + } + + protected CompletableFuture internalSetScalableTopicAutoScalePolicyAsync( + AutoScalePolicyOverride override) { + return validateNamespacePolicyOperationAsync(namespaceName, + PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenAccept(__ -> { + if (override != null) { + // The override only has to be valid in combination with the broker + // defaults — resolve and let the invariant checks reject bad values + // (e.g. zero split thresholds, split <= merge hysteresis inversion). + try { + AutoScaleConfig.resolve(pulsar().getConfig(), override, null); + } catch (IllegalArgumentException e) { + throw new RestException(Status.PRECONDITION_FAILED, e.getMessage()); + } + } + }) + .thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { + policies.scalableTopicAutoScalePolicy = override; + return policies; + })); + } + protected CompletableFuture internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) { // Force to read the data s.t. the watch to the cache content is setup. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 77c18f4169d17..ac0fa75292d47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -60,6 +60,7 @@ import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -880,6 +881,101 @@ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse }); } + @GET + @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy") + @Operation(summary = "Get the scalable-topic auto split/merge policy override for a namespace") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", + description = "Get the scalable-topic auto split/merge policy override for a namespace", + content = @Content(schema = @Schema(implementation = AutoScalePolicyOverride.class))), + @ApiResponse(responseCode = "403", description = "Don't have admin permission"), + @ApiResponse(responseCode = "404", description = "Tenant or namespace doesn't exist")}) + public void getScalableTopicAutoScalePolicy(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalGetScalableTopicAutoScalePolicyAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error() + .attr("namespace", namespaceName) + .exception(ex) + .log("Failed to get scalableTopicAutoScalePolicy for namespace"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy") + @Operation(summary = "Override the broker's scalable-topic auto split/merge settings for a namespace") + @ApiResponses(value = { + @ApiResponse(responseCode = "204", description = "Operation successful"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission"), + @ApiResponse(responseCode = "404", description = "Tenant or cluster or namespace doesn't exist"), + @ApiResponse(responseCode = "412", + description = "The resolved auto split/merge policy violates an invariant")}) + public void setScalableTopicAutoScalePolicy( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + @RequestBody(description = "Auto split/merge policy override", required = true) + AutoScalePolicyOverride override) { + validateNamespaceName(tenant, namespace); + internalSetScalableTopicAutoScalePolicyAsync(override) + .thenAccept(__ -> { + log.info() + .attr("namespace", namespaceName) + .log("Successfully set scalableTopicAutoScalePolicy on namespace"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); + log.error() + .attr("namespace", namespaceName) + .exception(ex) + .log("Failed to set scalableTopicAutoScalePolicy on namespace"); + if (ex instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, ex); + } + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy") + @Operation(summary = "Remove the scalable-topic auto split/merge policy override from a namespace") + @ApiResponses(value = { + @ApiResponse(responseCode = "204", description = "Operation successful"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission"), + @ApiResponse(responseCode = "404", description = "Tenant or cluster or namespace doesn't exist") }) + public void removeScalableTopicAutoScalePolicy(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalSetScalableTopicAutoScalePolicyAsync(null) + .thenAccept(__ -> { + log.info() + .attr("namespace", namespaceName) + .log("Successfully removed scalableTopicAutoScalePolicy on namespace"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); + log.error() + .attr("namespace", namespaceName) + .exception(ex) + .log("Failed to remove scalableTopicAutoScalePolicy on namespace"); + if (ex instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, ex); + } + return null; + }); + } + @POST @Path("/{tenant}/{namespace}/autoSubscriptionCreation") @Operation(summary = "Override broker's allowAutoSubscriptionCreation setting for a namespace") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java index 13afb2abe2e62..838c22e776707 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java @@ -55,13 +55,17 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.resources.ScalableTopicMetadata; import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.broker.service.scalable.AutoScaleConfig; import org.apache.pulsar.broker.service.scalable.ScalableTopicController; import org.apache.pulsar.broker.service.scalable.ScalableTopicService; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.scalable.ScalableTopicConstants; @@ -477,6 +481,138 @@ public void getScalableTopicMetadata( }); } + // --- Auto split/merge policy override (PIP-483) --- + + @GET + @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy") + @Operation(summary = "Get the per-topic auto split/merge policy override.") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "The per-topic auto split/merge policy override; " + + "empty body if no override is set.", + content = @Content(schema = @Schema(implementation = AutoScalePolicyOverride.class))), + @ApiResponse(responseCode = "401", + description = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission on the namespace"), + @ApiResponse(responseCode = "404", description = "Scalable topic doesn't exist"), + @ApiResponse(responseCode = "500", description = "Internal server error")}) + public void getAutoScalePolicy( + @Suspended final AsyncResponse asyncResponse, + @Parameter(description = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @Parameter(description = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @Parameter(description = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic) { + validateNamespaceName(tenant, namespace); + TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, encodedTopic); + + validateTopicPolicyOperationAsync(tn, PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.READ) + .thenCompose(__ -> resources().getScalableTopicMetadataAsync(tn)) + .thenAccept(optMd -> { + if (optMd.isEmpty()) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, + "Scalable topic not found: " + tn)); + } else { + asyncResponse.resume(optMd.get().getAutoScalePolicy()); + } + }) + .exceptionally(ex -> { + log.error().attr("clientAppId", clientAppId()).attr("topic", tn) + .exception(ex).log("Failed to get autoScalePolicy for scalable topic"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy") + @Operation(summary = "Set the per-topic auto split/merge policy override.") + @ApiResponses(value = { + @ApiResponse(responseCode = "204", description = "Override set successfully"), + @ApiResponse(responseCode = "401", + description = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission on the namespace"), + @ApiResponse(responseCode = "404", description = "Scalable topic doesn't exist"), + @ApiResponse(responseCode = "412", + description = "The resolved auto split/merge policy violates an invariant"), + @ApiResponse(responseCode = "500", description = "Internal server error")}) + public void setAutoScalePolicy( + @Suspended final AsyncResponse asyncResponse, + @Parameter(description = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @Parameter(description = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @Parameter(description = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @RequestBody(description = "Auto split/merge policy override", required = true) + AutoScalePolicyOverride override) { + validateNamespaceName(tenant, namespace); + TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, encodedTopic); + internalSetAutoScalePolicy(asyncResponse, tn, override); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy") + @Operation(summary = "Remove the per-topic auto split/merge policy override.") + @ApiResponses(value = { + @ApiResponse(responseCode = "204", description = "Override removed successfully"), + @ApiResponse(responseCode = "401", + description = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission on the namespace"), + @ApiResponse(responseCode = "404", description = "Scalable topic doesn't exist"), + @ApiResponse(responseCode = "500", description = "Internal server error")}) + public void removeAutoScalePolicy( + @Suspended final AsyncResponse asyncResponse, + @Parameter(description = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @Parameter(description = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @Parameter(description = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic) { + validateNamespaceName(tenant, namespace); + TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, encodedTopic); + internalSetAutoScalePolicy(asyncResponse, tn, null); + } + + private void internalSetAutoScalePolicy(AsyncResponse asyncResponse, TopicName tn, + AutoScalePolicyOverride override) { + validateTopicPolicyOperationAsync(tn, PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.WRITE) + .thenAccept(__ -> { + if (override != null) { + // The override must produce a valid policy in combination with the + // broker defaults (the namespace layer can only have been valid the + // same way). Reject invariant violations at set time. + try { + AutoScaleConfig.resolve(pulsar().getConfig(), null, override); + } catch (IllegalArgumentException e) { + throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage()); + } + } + }) + .thenCompose(__ -> resources().updateScalableTopicAsync(tn, md -> { + md.setAutoScalePolicy(override); + return md; + })) + .thenAccept(__ -> { + log.info().attr("clientAppId", clientAppId()).attr("topic", tn) + .attr("removed", override == null) + .log("Updated autoScalePolicy on scalable topic"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); + if (ex instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, + "Scalable topic not found: " + tn)); + return null; + } + log.error().attr("clientAppId", clientAppId()).attr("topic", tn) + .exception(ex).log("Failed to update autoScalePolicy for scalable topic"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + // --- Delete --- @DELETE diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java index b739b102a76b9..848b59cfa146b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java @@ -21,6 +21,7 @@ import java.time.Duration; import lombok.Builder; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; /** * Fully-resolved auto split/merge policy for a single scalable topic (PIP-483). @@ -81,6 +82,30 @@ public record AutoScaleConfig( * @return the resolved policy reflecting the {@code scalableTopic*} settings */ public static AutoScaleConfig fromBrokerConfig(ServiceConfiguration conf) { + return brokerDefaults(conf).validated(); + } + + /** + * Resolve the effective policy for a topic: broker defaults, overlaid with the namespace + * override, overlaid with the topic override (most-specific wins per field; {@code null} + * override fields fall through). + * + * @param conf the broker configuration (cluster-wide defaults) + * @param namespaceOverride the namespace-level override, or {@code null} if none + * @param topicOverride the topic-level override, or {@code null} if none + * @return the validated effective policy + * @throws IllegalArgumentException if the resolved policy violates an invariant + */ + public static AutoScaleConfig resolve(ServiceConfiguration conf, + AutoScalePolicyOverride namespaceOverride, + AutoScalePolicyOverride topicOverride) { + AutoScaleConfig config = brokerDefaults(conf); + config = applyOverride(config, namespaceOverride); + config = applyOverride(config, topicOverride); + return config.validated(); + } + + private static AutoScaleConfig brokerDefaults(ServiceConfiguration conf) { return AutoScaleConfig.builder() .enabled(conf.isScalableTopicAutoScaleEnabled()) .maxSegments(conf.getScalableTopicMaxSegments()) @@ -97,8 +122,60 @@ public static AutoScaleConfig fromBrokerConfig(ServiceConfiguration conf) { .mergeBytesRateIn(conf.getScalableTopicMergeBytesRateInThreshold()) .mergeMsgRateOut(conf.getScalableTopicMergeMsgRateOutThreshold()) .mergeBytesRateOut(conf.getScalableTopicMergeBytesRateOutThreshold()) - .build() - .validated(); + .build(); + } + + private static AutoScaleConfig applyOverride(AutoScaleConfig base, AutoScalePolicyOverride o) { + if (o == null) { + return base; + } + AutoScaleConfigBuilder b = base.toBuilder(); + if (o.getEnabled() != null) { + b.enabled(o.getEnabled()); + } + if (o.getMaxSegments() != null) { + b.maxSegments(o.getMaxSegments()); + } + if (o.getMinSegments() != null) { + b.minSegments(o.getMinSegments()); + } + if (o.getMaxDagDepth() != null) { + b.maxDagDepth(o.getMaxDagDepth()); + } + if (o.getSplitCooldownSeconds() != null) { + b.splitCooldown(Duration.ofSeconds(o.getSplitCooldownSeconds())); + } + if (o.getMergeCooldownSeconds() != null) { + b.mergeCooldown(Duration.ofSeconds(o.getMergeCooldownSeconds())); + } + if (o.getMergeWindowSeconds() != null) { + b.mergeWindow(Duration.ofSeconds(o.getMergeWindowSeconds())); + } + if (o.getSplitMsgRateInThreshold() != null) { + b.splitMsgRateIn(o.getSplitMsgRateInThreshold()); + } + if (o.getSplitBytesRateInThreshold() != null) { + b.splitBytesRateIn(o.getSplitBytesRateInThreshold()); + } + if (o.getSplitMsgRateOutThreshold() != null) { + b.splitMsgRateOut(o.getSplitMsgRateOutThreshold()); + } + if (o.getSplitBytesRateOutThreshold() != null) { + b.splitBytesRateOut(o.getSplitBytesRateOutThreshold()); + } + if (o.getMergeMsgRateInThreshold() != null) { + b.mergeMsgRateIn(o.getMergeMsgRateInThreshold()); + } + if (o.getMergeBytesRateInThreshold() != null) { + b.mergeBytesRateIn(o.getMergeBytesRateInThreshold()); + } + if (o.getMergeMsgRateOutThreshold() != null) { + b.mergeMsgRateOut(o.getMergeMsgRateOutThreshold()); + } + if (o.getMergeBytesRateOutThreshold() != null) { + b.mergeBytesRateOut(o.getMergeBytesRateOutThreshold()); + } + return b.build(); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index 0b8c70d66b19b..952199279eda2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -44,6 +44,7 @@ import org.apache.pulsar.common.api.proto.ScalableConsumerType; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.scalable.HashRange; @@ -359,10 +360,6 @@ private CompletableFuture evaluateAndAct(String trigger) { if (brokerConfig == null) { return CompletableFuture.completedFuture(null); } - AutoScaleConfig config = AutoScaleConfig.fromBrokerConfig(brokerConfig); - if (!config.enabled()) { - return CompletableFuture.completedFuture(null); - } if (!autoScaleInFlight.compareAndSet(false, true)) { // Another evaluation or auto operation is already running. Don't drop the // trigger: mark it pending so the in-flight run re-evaluates on completion — @@ -372,11 +369,18 @@ private CompletableFuture evaluateAndAct(String trigger) { return CompletableFuture.completedFuture(null); } try { - return collectConsumerCounts() - .thenCombine(collectLoadSamples(), (consumers, load) -> - AutoScalePolicyEvaluator.decide(currentLayout, load, consumers, config, - clock.millis(), lastSplitAtMs, lastMergeAtMs)) - .thenCompose(decision -> dispatch(decision, config, trigger)) + return resolveAutoScaleConfig(brokerConfig) + .thenCompose(config -> { + if (!config.enabled()) { + return CompletableFuture.completedFuture(null); + } + return collectConsumerCounts() + .thenCombine(collectLoadSamples(), (consumers, load) -> + AutoScalePolicyEvaluator.decide(currentLayout, load, + consumers, config, clock.millis(), + lastSplitAtMs, lastMergeAtMs)) + .thenCompose(decision -> dispatch(decision, config, trigger)); + }) .whenComplete((__, ex) -> { autoScaleInFlight.set(false); if (autoScaleReEvaluate.getAndSet(false)) { @@ -393,6 +397,28 @@ private CompletableFuture evaluateAndAct(String trigger) { } } + /** + * Resolve the effective auto split/merge policy for this topic: broker defaults overlaid + * with the namespace-level override ({@code Policies.scalableTopicAutoScalePolicy}) and + * then the per-topic override ({@code ScalableTopicMetadata.autoScalePolicy}). Both reads + * are metadata-cache-backed, so this is cheap per evaluation and override changes take + * effect on the next tick without controller restarts. + */ + private CompletableFuture resolveAutoScaleConfig( + ServiceConfiguration brokerConfig) { + CompletableFuture namespaceOverride = + brokerService.getPulsar().getPulsarResources().getNamespaceResources() + .getPoliciesAsync(topicName.getNamespaceObject()) + .thenApply(opt -> opt.map(p -> p.scalableTopicAutoScalePolicy) + .orElse(null)); + CompletableFuture topicOverride = + resources.getScalableTopicMetadataAsync(topicName) + .thenApply(opt -> opt.map(ScalableTopicMetadata::getAutoScalePolicy) + .orElse(null)); + return namespaceOverride.thenCombine(topicOverride, + (ns, topic) -> AutoScaleConfig.resolve(brokerConfig, ns, topic)); + } + private CompletableFuture dispatch(AutoScaleDecision decision, AutoScaleConfig config, String trigger) { if (decision instanceof AutoScaleDecision.Split split) { @@ -657,7 +683,7 @@ public CompletableFuture splitSegment(long segmentId) { .thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md -> { SegmentLayout latest = SegmentLayout.fromMetadata(md); SegmentLayout updated = latest.splitSegment(segmentId, nowMs); - return updated.toMetadata(md.getProperties()); + return updated.toMetadata(md); })) .thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) .thenCompose(optMd -> { @@ -706,7 +732,7 @@ public CompletableFuture mergeSegments(long segmentId1, long segm .thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md -> { SegmentLayout latest = SegmentLayout.fromMetadata(md); SegmentLayout updated = latest.mergeSegments(segmentId1, segmentId2, nowMs); - return updated.toMetadata(md.getProperties()); + return updated.toMetadata(md); })) .thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) .thenCompose(optMd -> { @@ -1192,7 +1218,7 @@ private CompletableFuture pruneAllAsync(List drained) { updated = updated.pruneSegment(s.segmentId()); } } - return updated == latest ? md : updated.toMetadata(md.getProperties()); + return updated == latest ? md : updated.toMetadata(md); }).thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) .thenCompose(optMd -> { currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java index a3f1f0c5ef101..c823fb75e918a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java @@ -276,14 +276,17 @@ public SegmentLayout pruneSegment(long segmentId) { } /** - * Convert back to metadata for persistence. + * Convert back to metadata for persistence, carrying over the non-layout fields + * (properties, per-topic auto-scale policy) from the record being replaced. Layout + * mutations must never silently drop fields they don't model. */ - public ScalableTopicMetadata toMetadata(Map properties) { + public ScalableTopicMetadata toMetadata(ScalableTopicMetadata original) { return ScalableTopicMetadata.builder() .epoch(epoch) .nextSegmentId(nextSegmentId) .segments(new LinkedHashMap<>(allSegments)) - .properties(properties) + .properties(original.getProperties()) + .autoScalePolicy(original.getAutoScalePolicy()) .build(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java new file mode 100644 index 0000000000000..6e867d05f4aa1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import java.util.UUID; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; +import org.testng.annotations.Test; + +/** + * End-to-end coverage for the auto split/merge policy override admin API (PIP-483), at both + * levels, through the full HTTP path against a real shared broker. + * + *

Endpoints under test: + * + *

    + *
  • namespace: {@code /admin/v2/namespaces/{tenant}/{ns}/scalableTopicAutoScalePolicy}
  • + *
  • topic: {@code /admin/v2/scalable/{tenant}/{ns}/{topic}/autoScalePolicy}
  • + *
+ */ +public class ScalableTopicAutoScalePolicyTest extends SharedPulsarBaseTest { + + private String newScalableTopic() throws Exception { + String topic = "topic://" + getNamespace() + "/autoscale-" + + UUID.randomUUID().toString().substring(0, 8); + admin.scalableTopics().createScalableTopic(topic, 1); + return topic; + } + + @Test + public void testTopicLevelRoundTrip() throws Exception { + String topic = newScalableTopic(); + + // No override initially. + assertNull(admin.scalableTopics().getAutoScalePolicy(topic)); + + AutoScalePolicyOverride override = AutoScalePolicyOverride.builder() + .enabled(true) + .maxSegments(8) + .splitMsgRateInThreshold(5_000.0) + .build(); + admin.scalableTopics().setAutoScalePolicy(topic, override); + assertEquals(admin.scalableTopics().getAutoScalePolicy(topic), override); + + admin.scalableTopics().removeAutoScalePolicy(topic); + assertNull(admin.scalableTopics().getAutoScalePolicy(topic)); + } + + @Test + public void testNamespaceLevelRoundTrip() throws Exception { + String namespace = getNamespace(); + + assertNull(admin.namespaces().getScalableTopicAutoScalePolicy(namespace)); + + AutoScalePolicyOverride override = AutoScalePolicyOverride.builder() + .enabled(false) + .build(); + admin.namespaces().setScalableTopicAutoScalePolicy(namespace, override); + assertEquals(admin.namespaces().getScalableTopicAutoScalePolicy(namespace), override); + + admin.namespaces().removeScalableTopicAutoScalePolicy(namespace); + assertNull(admin.namespaces().getScalableTopicAutoScalePolicy(namespace)); + } + + @Test + public void testInvalidOverrideRejected() throws Exception { + String topic = newScalableTopic(); + + // minSegments above the broker-default maxSegments breaks min <= max on resolution. + AutoScalePolicyOverride bad = AutoScalePolicyOverride.builder() + .minSegments(1_000_000) + .build(); + assertThrows(PulsarAdminException.PreconditionFailedException.class, + () -> admin.scalableTopics().setAutoScalePolicy(topic, bad)); + assertThrows(PulsarAdminException.PreconditionFailedException.class, + () -> admin.namespaces().setScalableTopicAutoScalePolicy(getNamespace(), bad)); + } + + @Test + public void testTopicLevelOnMissingTopicIs404() { + String missing = "topic://" + getNamespace() + "/does-not-exist"; + assertThrows(PulsarAdminException.NotFoundException.class, + () -> admin.scalableTopics().setAutoScalePolicy(missing, + AutoScalePolicyOverride.builder().enabled(false).build())); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java index acc104c2773c5..7a991834461eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java @@ -55,6 +55,52 @@ public void testDefaultsMatchBrokerConfig() { assertTrue(c.splitBytesRateOut() > c.mergeBytesRateOut()); } + @Test + public void testResolveLayersOverrides() { + ServiceConfiguration conf = new ServiceConfiguration(); + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride ns = + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder() + .maxSegments(16) + .splitMsgRateInThreshold(20_000.0) + .build(); + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride topic = + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder() + .maxSegments(4) + .splitCooldownSeconds(5L) + .build(); + + AutoScaleConfig c = AutoScaleConfig.resolve(conf, ns, topic); + // Topic wins where both set. + assertEquals(c.maxSegments(), 4); + // Namespace applies where the topic is silent. + assertEquals(c.splitMsgRateIn(), 20_000.0); + // Topic-only field applies. + assertEquals(c.splitCooldown(), Duration.ofSeconds(5)); + // Untouched fields fall through to the broker defaults. + assertEquals(c.mergeCooldown(), Duration.ofMinutes(5)); + assertTrue(c.enabled()); + } + + @Test + public void testResolveNullOverridesEqualsBrokerConfig() { + ServiceConfiguration conf = new ServiceConfiguration(); + assertEquals(AutoScaleConfig.resolve(conf, null, null), + AutoScaleConfig.fromBrokerConfig(conf)); + } + + @Test + public void testResolveRejectsInvalidCombination() { + // The override is only invalid in combination: a merge threshold raised above the + // broker-default split threshold breaks the hysteresis invariant. + ServiceConfiguration conf = new ServiceConfiguration(); + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride bad = + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder() + .mergeMsgRateInThreshold(conf.getScalableTopicSplitMsgRateInThreshold()) + .build(); + assertThrows(IllegalArgumentException.class, + () -> AutoScaleConfig.resolve(conf, null, bad)); + } + @Test public void testValidationRejectsBadConfig() { // Zero split threshold: the evaluator scores rate/threshold, so 0 would yield diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java index 74ad5e8a5a347..3457155085f2d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java @@ -32,6 +32,8 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.resources.ScalableTopicResources; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.TransportCnx; @@ -39,7 +41,10 @@ import org.apache.pulsar.client.admin.ScalableTopics; import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.common.api.proto.ScalableConsumerType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.scalable.SegmentLoadStats; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.coordination.CoordinationService; @@ -70,6 +75,10 @@ public class ScalableTopicControllerAutoScaleTest { private PulsarService pulsar; private ServiceConfiguration config; private ScalableTopics scalableTopics; + private PulsarResources pulsarResources; + private NamespaceResources namespaceResources; + /** Namespace policies served by the mocked resources; null = namespace has no policies. */ + private Policies namespacePolicies; private ScalableTopicController controller; private TopicName topicName; @@ -104,6 +113,19 @@ public void setUp() throws Exception { when(pulsar.getBrokerId()).thenReturn(BROKER_ID); when(pulsar.getExecutor()).thenReturn(scheduler); when(pulsar.getConfig()).thenReturn(config); + + // Namespace policies feed the per-namespace auto-scale override resolution. + // Default: no policies → broker config applies. Tests install overrides via + // namespacePolicies. Reset explicitly — TestNG reuses the test instance. + namespacePolicies = null; + pulsarResources = mock(PulsarResources.class); + namespaceResources = mock(NamespaceResources.class); + when(pulsar.getPulsarResources()).thenReturn(pulsarResources); + when(pulsarResources.getNamespaceResources()).thenReturn(namespaceResources); + when(namespaceResources.getPoliciesAsync(any(NamespaceName.class))) + .thenAnswer(__ -> CompletableFuture.completedFuture( + Optional.ofNullable(namespacePolicies))); + when(pulsar.getAdminClient()).thenReturn(admin); when(admin.topics()).thenReturn(topics); when(admin.scalableTopics()).thenReturn(scalableTopics); @@ -205,6 +227,81 @@ public void testConsumerDrivenSplit() throws Exception { "2 consumers on 1 segment should drive a split")); } + @Test + public void testNamespaceOverrideDisablesAutoScale() throws Exception { + namespacePolicies = new Policies(); + namespacePolicies.scalableTopicAutoScalePolicy = + AutoScalePolicyOverride.builder().enabled(false).build(); + + startController(2); + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 2, + "namespace override enabled=false must suppress the split"); + } + + @Test + public void testTopicOverrideWinsOverNamespace() throws Exception { + // Namespace disables auto-scale; the topic explicitly re-enables it. + namespacePolicies = new Policies(); + namespacePolicies.scalableTopicAutoScalePolicy = + AutoScalePolicyOverride.builder().enabled(false).build(); + + startController(2); + resources.updateScalableTopicAsync(topicName, md -> { + md.setAutoScalePolicy(AutoScalePolicyOverride.builder().enabled(true).build()); + return md; + }).get(); + + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, + "topic override enabled=true must win over the namespace disable"); + } + + @Test + public void testTopicOverrideMaxSegmentsCapsSplit() throws Exception { + startController(2); + resources.updateScalableTopicAsync(topicName, md -> { + md.setAutoScalePolicy(AutoScalePolicyOverride.builder().maxSegments(2).build()); + return md; + }).get(); + + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 2, + "per-topic maxSegments=2 must cap the split despite the hot segment"); + } + + @Test + public void testTopicOverrideSurvivesSplit() throws Exception { + // The override must survive a layout mutation (toMetadata must carry it over). + startController(2); + AutoScalePolicyOverride override = + AutoScalePolicyOverride.builder().maxSegments(3).build(); + resources.updateScalableTopicAsync(topicName, md -> { + md.setAutoScalePolicy(override); + return md; + }).get(); + + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, "first split allowed up to maxSegments=3"); + assertEquals(resources.getScalableTopicMetadataAsync(topicName).get() + .orElseThrow().getAutoScalePolicy(), override, + "the per-topic override must survive the split's metadata rewrite"); + + // At the cap now — further hot reports must not split. + resources.reportSegmentLoadAsync(topicName, 1, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, "capped at the per-topic maxSegments"); + } + @Test public void testConsumerBurstConvergesWithoutTicks() throws Exception { // A group of consumers joining in quick succession must converge to one segment diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java index 364c9299a1124..afcab4c8b9861 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java @@ -269,14 +269,19 @@ public void testGetLineage() { @Test public void testToMetadata() { ScalableTopicMetadata metadata = ScalableTopicController.createInitialMetadata(2, Map.of("key", "value")); + // A layout mutation must round-trip every non-layout field, not just properties. + metadata.setAutoScalePolicy(org.apache.pulsar.common.policies.data.AutoScalePolicyOverride + .builder().enabled(false).maxSegments(8).build()); SegmentLayout layout = SegmentLayout.fromMetadata(metadata); SegmentLayout afterSplit = layout.splitSegment(0, 0L); - ScalableTopicMetadata restored = afterSplit.toMetadata(Map.of("key", "value")); + ScalableTopicMetadata restored = afterSplit.toMetadata(metadata); assertEquals(restored.getEpoch(), 1); assertEquals(restored.getNextSegmentId(), 4); assertEquals(restored.getSegments().size(), 4); assertEquals(restored.getProperties().get("key"), "value"); + assertEquals(restored.getAutoScalePolicy(), metadata.getAutoScalePolicy(), + "split/merge must not drop the per-topic auto-scale policy"); } @Test diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index cc3f5a9296403..b1063e8b542cb 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -1366,6 +1367,72 @@ CompletableFuture setAutoTopicCreationAsync( */ CompletableFuture removeAutoTopicCreationAsync(String namespace); + /** + * Sets the scalable-topic auto split/merge policy override for a namespace (PIP-483), + * overriding the broker's defaults for every scalable topic in the namespace that does + * not carry its own per-topic override. + * + * @param namespace + * Namespace name + * @param override + * the override; unset fields fall through to the broker configuration + * @throws PulsarAdminException + * Unexpected error + */ + void setScalableTopicAutoScalePolicy(String namespace, AutoScalePolicyOverride override) + throws PulsarAdminException; + + /** + * Sets the scalable-topic auto split/merge policy override for a namespace asynchronously. + * + * @param namespace + * Namespace name + * @param override + * the override; unset fields fall through to the broker configuration + */ + CompletableFuture setScalableTopicAutoScalePolicyAsync( + String namespace, AutoScalePolicyOverride override); + + /** + * Get the scalable-topic auto split/merge policy override for a namespace. + * + * @param namespace + * Namespace name + * @return the override, or {@code null} if none is set + * @throws PulsarAdminException + * Unexpected error + */ + AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String namespace) throws PulsarAdminException; + + /** + * Get the scalable-topic auto split/merge policy override for a namespace asynchronously. + * + * @param namespace + * Namespace name + * @return the override, or {@code null} if none is set + */ + CompletableFuture getScalableTopicAutoScalePolicyAsync(String namespace); + + /** + * Removes the scalable-topic auto split/merge policy override from a namespace, letting + * the broker configuration apply. + * + * @param namespace + * Namespace name + * @throws PulsarAdminException + * Unexpected error + */ + void removeScalableTopicAutoScalePolicy(String namespace) throws PulsarAdminException; + + /** + * Removes the scalable-topic auto split/merge policy override from a namespace + * asynchronously. + * + * @param namespace + * Namespace name + */ + CompletableFuture removeScalableTopicAutoScalePolicyAsync(String namespace); + /** * Sets the autoSubscriptionCreation policy for a given namespace, overriding broker settings. *

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java index debf54231b951..b2f7632951e7c 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.ScalableSubscriptionType; import org.apache.pulsar.common.policies.data.ScalableTopicMetadata; import org.apache.pulsar.common.policies.data.ScalableTopicStats; @@ -145,6 +146,54 @@ CompletableFuture createScalableTopicAsync(String topic, int numInitialSeg */ CompletableFuture getMetadataAsync(String topic); + /** + * Set the per-topic auto split/merge policy override (PIP-483). Overrides the namespace + * policy and the broker defaults for this topic; unset fields fall through. + * + * @param topic Topic name in the format "tenant/namespace/topic" + * @param override the override to apply + */ + void setAutoScalePolicy(String topic, AutoScalePolicyOverride override) throws PulsarAdminException; + + /** + * Set the per-topic auto split/merge policy override asynchronously. + * + * @param topic Topic name in the format "tenant/namespace/topic" + * @param override the override to apply + */ + CompletableFuture setAutoScalePolicyAsync(String topic, AutoScalePolicyOverride override); + + /** + * Get the per-topic auto split/merge policy override. + * + * @param topic Topic name in the format "tenant/namespace/topic" + * @return the override, or {@code null} if none is set + */ + AutoScalePolicyOverride getAutoScalePolicy(String topic) throws PulsarAdminException; + + /** + * Get the per-topic auto split/merge policy override asynchronously. + * + * @param topic Topic name in the format "tenant/namespace/topic" + * @return the override, or {@code null} if none is set + */ + CompletableFuture getAutoScalePolicyAsync(String topic); + + /** + * Remove the per-topic auto split/merge policy override, letting the namespace policy + * and broker defaults apply. + * + * @param topic Topic name in the format "tenant/namespace/topic" + */ + void removeAutoScalePolicy(String topic) throws PulsarAdminException; + + /** + * Remove the per-topic auto split/merge policy override asynchronously. + * + * @param topic Topic name in the format "tenant/namespace/topic" + */ + CompletableFuture removeAutoScalePolicyAsync(String topic); + /** * Delete a scalable topic and all its underlying segment topics. * diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java new file mode 100644 index 0000000000000..30ecd78e5a7ab --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Override of the scalable-topic auto split/merge policy (PIP-483), settable at the + * namespace level (field on {@link Policies}) and at the topic level (field on the + * scalable-topic metadata). + * + *

Every field is optional: an unset ({@code null}) field falls through to the next + * resolution layer — topic override → namespace override → broker configuration. Setting + * {@code enabled = false} opts the namespace or topic out of auto split/merge entirely. + * + *

The resolved policy must satisfy the same invariants as the broker configuration + * (positive split thresholds, split thresholds strictly above merge thresholds, + * {@code minSegments <= maxSegments}, non-negative cooldowns); an override that would + * violate them is rejected when it is set. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public final class AutoScalePolicyOverride { + + /** Master switch; {@code false} opts this namespace/topic out of auto split/merge. */ + private Boolean enabled; + + /** Hard ceiling on active segments; splits stop once reached. */ + private Integer maxSegments; + + /** Hard floor on active segments; merges stop once reached. */ + private Integer minSegments; + + /** Max merges in a segment's lineage before merges are disabled for it. */ + private Integer maxDagDepth; + + /** Minimum seconds between automatic splits (coalesces bursts). */ + private Long splitCooldownSeconds; + + /** Minimum seconds between automatic merges. */ + private Long mergeCooldownSeconds; + + /** Seconds a segment pair must stay cold before becoming merge-eligible. */ + private Long mergeWindowSeconds; + + /** Inbound msg/s above which a segment is split. */ + private Double splitMsgRateInThreshold; + + /** Inbound bytes/s above which a segment is split. */ + private Long splitBytesRateInThreshold; + + /** Outbound (dispatched) msg/s above which a segment is split. */ + private Double splitMsgRateOutThreshold; + + /** Outbound bytes/s above which a segment is split. */ + private Long splitBytesRateOutThreshold; + + /** Inbound msg/s below which a segment counts as cold for merging. */ + private Double mergeMsgRateInThreshold; + + /** Inbound bytes/s below which a segment counts as cold for merging. */ + private Long mergeBytesRateInThreshold; + + /** Outbound msg/s below which a segment counts as cold for merging. */ + private Double mergeMsgRateOutThreshold; + + /** Outbound bytes/s below which a segment counts as cold for merging. */ + private Long mergeBytesRateOutThreshold; +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index db0aa0620d56b..df3cf53a3ca59 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -61,6 +61,8 @@ public class Policies { public AutoTopicCreationOverride autoTopicCreationOverride = null; // If set, it will override the broker settings for allowing auto subscription creation public AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = null; + // If set, it will override the broker's scalable-topic auto split/merge settings (PIP-483) + public AutoScalePolicyOverride scalableTopicAutoScalePolicy = null; public Map publishMaxMessageRate = new HashMap<>(); @SuppressWarnings("checkstyle:MemberName") @@ -158,7 +160,7 @@ public int hashCode() { backlog_quota_map, publishMaxMessageRate, clusterDispatchRate, topicDispatchRate, subscriptionDispatchRate, replicatorDispatchRate, clusterSubscribeRate, deduplicationEnabled, autoTopicCreationOverride, - autoSubscriptionCreationOverride, persistence, + autoSubscriptionCreationOverride, scalableTopicAutoScalePolicy, persistence, bundles, latency_stats_sample_rate, message_ttl_in_seconds, subscription_expiration_time_minutes, retention_policies, encryption_required, delayed_delivery_policies, inactive_topic_policies, @@ -198,6 +200,7 @@ public boolean equals(Object obj) { && Objects.equals(deduplicationEnabled, other.deduplicationEnabled) && Objects.equals(autoTopicCreationOverride, other.autoTopicCreationOverride) && Objects.equals(autoSubscriptionCreationOverride, other.autoSubscriptionCreationOverride) + && Objects.equals(scalableTopicAutoScalePolicy, other.scalableTopicAutoScalePolicy) && Objects.equals(persistence, other.persistence) && Objects.equals(bundles, other.bundles) && Objects.equals(latency_stats_sample_rate, other.latency_stats_sample_rate) && Objects.equals(message_ttl_in_seconds, diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java index 6f3123228f167..ce246a1340d66 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java @@ -54,6 +54,12 @@ public class ScalableTopicMetadata { @Builder.Default private Map properties = Map.of(); + /** + * Per-topic auto split/merge policy override (PIP-483). {@code null} means no override: + * the namespace policy and then the broker configuration apply. + */ + private AutoScalePolicyOverride autoScalePolicy; + /** * Describes a single segment in a scalable topic's DAG. */ diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 75a331a45dbd6..fd0e5c5bec00c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -566,6 +567,45 @@ public CompletableFuture removeAutoTopicCreationAsync(String namespace) { return asyncDeleteRequest(path); } + @Override + public void setScalableTopicAutoScalePolicy(String namespace, + AutoScalePolicyOverride override) throws PulsarAdminException { + sync(() -> setScalableTopicAutoScalePolicyAsync(namespace, override)); + } + + @Override + public CompletableFuture setScalableTopicAutoScalePolicyAsync( + String namespace, AutoScalePolicyOverride override) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "scalableTopicAutoScalePolicy"); + return asyncPostRequest(path, Entity.entity(override, MediaType.APPLICATION_JSON)); + } + + @Override + public AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String namespace) + throws PulsarAdminException { + return sync(() -> getScalableTopicAutoScalePolicyAsync(namespace)); + } + + @Override + public CompletableFuture getScalableTopicAutoScalePolicyAsync(String namespace) { + return asyncGetNamespaceParts(new FutureCallback() { + }, namespace, + "scalableTopicAutoScalePolicy"); + } + + @Override + public void removeScalableTopicAutoScalePolicy(String namespace) throws PulsarAdminException { + sync(() -> removeScalableTopicAutoScalePolicyAsync(namespace)); + } + + @Override + public CompletableFuture removeScalableTopicAutoScalePolicyAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "scalableTopicAutoScalePolicy"); + return asyncDeleteRequest(path); + } + @Override public void setAutoSubscriptionCreation(String namespace, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) throws PulsarAdminException { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java index e11c6cfe74711..e4905aa63c82d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.ScalableTopicMetadata; import org.apache.pulsar.common.policies.data.ScalableTopicStats; @@ -141,6 +142,43 @@ public CompletableFuture getMetadataAsync(String topic) { return asyncGetRequest(topicPath(tn), ScalableTopicMetadata.class); } + // --- Auto split/merge policy override (PIP-483) --- + + @Override + public void setAutoScalePolicy(String topic, AutoScalePolicyOverride override) + throws PulsarAdminException { + sync(() -> setAutoScalePolicyAsync(topic, override)); + } + + @Override + public CompletableFuture setAutoScalePolicyAsync(String topic, AutoScalePolicyOverride override) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn).path("autoScalePolicy"); + return asyncPostRequest(path, Entity.entity(override, MediaType.APPLICATION_JSON)); + } + + @Override + public AutoScalePolicyOverride getAutoScalePolicy(String topic) throws PulsarAdminException { + return sync(() -> getAutoScalePolicyAsync(topic)); + } + + @Override + public CompletableFuture getAutoScalePolicyAsync(String topic) { + TopicName tn = validateTopic(topic); + return asyncGetRequest(topicPath(tn).path("autoScalePolicy"), AutoScalePolicyOverride.class); + } + + @Override + public void removeAutoScalePolicy(String topic) throws PulsarAdminException { + sync(() -> removeAutoScalePolicyAsync(topic)); + } + + @Override + public CompletableFuture removeAutoScalePolicyAsync(String topic) { + TopicName tn = validateTopic(topic); + return asyncDeleteRequest(topicPath(tn).path("autoScalePolicy")); + } + // --- Delete --- @Override diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java index fd05ee40e76b7..2a2b56c599b32 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java @@ -55,6 +55,7 @@ public enum PolicyName { DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, ALLOW_CLUSTERS, ALLOW_CUSTOM_METRIC_LABELS, + SCALABLE_TOPIC_AUTO_SCALE, // cluster policies CLUSTER_MIGRATION, From c04de645eb6faf49d7e3b1ec6d679f2641519f62 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 12 Jun 2026 07:48:43 -0700 Subject: [PATCH 2/4] Fix ScalableTopicMetadataTest for the new autoScalePolicy constructor arg The all-args constructor grew a parameter when autoScalePolicy was added to ScalableTopicMetadata; update the admin-api test accordingly and assert the new field round-trips. --- .../common/policies/data/ScalableTopicMetadataTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java index 4784ea9b1b985..6259d9799a46c 100644 --- a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java +++ b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java @@ -79,13 +79,16 @@ public void testAllArgsConstructor() { segments.put(0L, activeSegment(0L, 0, 0xFFFF, 0L)); Map props = Map.of("retention", "7d"); + AutoScalePolicyOverride autoScalePolicy = + AutoScalePolicyOverride.builder().enabled(false).build(); - ScalableTopicMetadata md = new ScalableTopicMetadata(3L, 1L, segments, props); + ScalableTopicMetadata md = new ScalableTopicMetadata(3L, 1L, segments, props, autoScalePolicy); assertEquals(md.getEpoch(), 3L); assertEquals(md.getNextSegmentId(), 1L); assertEquals(md.getSegments(), segments); assertEquals(md.getProperties(), props); + assertEquals(md.getAutoScalePolicy(), autoScalePolicy); } @Test From c09f6653b7cfc87b19713e98d597a49341f4841c Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 12 Jun 2026 08:39:44 -0700 Subject: [PATCH 3/4] Address review: validate override combinations; fall back to disabled on invalid resolve MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The namespace and topic overrides were each validated only against the broker defaults, never against each other — two individually-valid layers could combine into an invalid policy (e.g. ns raises a merge threshold, topic lowers the matching split threshold), after which the controller's per-evaluation resolve threw on every tick and auto split/merge was silently dead for the topic. Two-part fix, as suggested in review: - The topic-level set now resolves against the current namespace override (cached read) and rejects the combination with 412. The misleading comment claiming the namespace layer 'can only have been valid the same way' is gone; the check is documented as best-effort since the namespace override can change afterwards and broker defaults can change across restarts. - The controller is resilient to a stored combination that is (or has become) invalid: resolveAutoScaleConfig catches the invariant violation, warn-logs it on each evaluation, and treats auto split/merge as disabled for the topic — predictable and visible, instead of failing the evaluation chain. Tests: REST-level 412 for a cross-layer conflict (and acceptance of the same override once the conflicting layer is removed); controller-level fallback (evaluation completes, no action taken) for an invalid stored combination. --- .../broker/admin/v2/ScalableTopics.java | 35 +++++++++++++------ .../scalable/ScalableTopicController.java | 21 +++++++++-- .../ScalableTopicAutoScalePolicyTest.java | 27 ++++++++++++++ .../ScalableTopicControllerAutoScaleTest.java | 27 ++++++++++++++ 4 files changed, 98 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java index 838c22e776707..86f9be6a73c62 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java @@ -577,17 +577,32 @@ public void removeAutoScalePolicy( private void internalSetAutoScalePolicy(AsyncResponse asyncResponse, TopicName tn, AutoScalePolicyOverride override) { validateTopicPolicyOperationAsync(tn, PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.WRITE) - .thenAccept(__ -> { - if (override != null) { - // The override must produce a valid policy in combination with the - // broker defaults (the namespace layer can only have been valid the - // same way). Reject invariant violations at set time. - try { - AutoScaleConfig.resolve(pulsar().getConfig(), null, override); - } catch (IllegalArgumentException e) { - throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage()); - } + .thenCompose(__ -> { + if (override == null) { + return CompletableFuture.completedFuture(null); } + // Validate the override in combination with the layers it will actually + // be resolved with: the broker defaults AND the current namespace + // override — two layers that are each valid against the defaults can + // still combine into an invalid policy (e.g. the namespace raises a + // merge threshold and the topic lowers the matching split threshold). + // This check is best-effort: the namespace override can still change + // afterwards, and broker defaults can differ across restarts — the + // controller handles a combination that has become invalid by falling + // back to disabled (see ScalableTopicController.resolveAutoScaleConfig). + return pulsar().getPulsarResources().getNamespaceResources() + .getPoliciesAsync(namespaceName) + .thenAccept(optPolicies -> { + AutoScalePolicyOverride nsOverride = optPolicies + .map(p -> p.scalableTopicAutoScalePolicy) + .orElse(null); + try { + AutoScaleConfig.resolve(pulsar().getConfig(), nsOverride, override); + } catch (IllegalArgumentException e) { + throw new RestException(Response.Status.PRECONDITION_FAILED, + e.getMessage()); + } + }); }) .thenCompose(__ -> resources().updateScalableTopicAsync(tn, md -> { md.setAutoScalePolicy(override); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index 952199279eda2..a85187df28c52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -403,6 +403,13 @@ private CompletableFuture evaluateAndAct(String trigger) { * then the per-topic override ({@code ScalableTopicMetadata.autoScalePolicy}). Both reads * are metadata-cache-backed, so this is cheap per evaluation and override changes take * effect on the next tick without controller restarts. + * + *

Set-time validation is best-effort only (the namespace override can change after a + * topic override was validated against it, and broker defaults can change across + * restarts), so the stored combination can be invalid here. In that case auto split/merge + * is treated as disabled for the topic — predictable, and loudly logged on every + * evaluation until an operator fixes the overrides — rather than failing the evaluation + * chain. */ private CompletableFuture resolveAutoScaleConfig( ServiceConfiguration brokerConfig) { @@ -415,8 +422,18 @@ private CompletableFuture resolveAutoScaleConfig( resources.getScalableTopicMetadataAsync(topicName) .thenApply(opt -> opt.map(ScalableTopicMetadata::getAutoScalePolicy) .orElse(null)); - return namespaceOverride.thenCombine(topicOverride, - (ns, topic) -> AutoScaleConfig.resolve(brokerConfig, ns, topic)); + return namespaceOverride.thenCombine(topicOverride, (ns, topic) -> { + try { + return AutoScaleConfig.resolve(brokerConfig, ns, topic); + } catch (IllegalArgumentException e) { + log.warn().attr("reason", e.getMessage()) + .log("Resolved auto split/merge policy is invalid; treating auto " + + "split/merge as disabled for this topic until the namespace " + + "or topic override is fixed"); + return AutoScaleConfig.fromBrokerConfig(brokerConfig) + .toBuilder().enabled(false).build(); + } + }); } private CompletableFuture dispatch(AutoScaleDecision decision, AutoScaleConfig config, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java index 6e867d05f4aa1..85c65f9b9ec8d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java @@ -96,6 +96,33 @@ public void testInvalidOverrideRejected() throws Exception { () -> admin.namespaces().setScalableTopicAutoScalePolicy(getNamespace(), bad)); } + @Test + public void testTopicOverrideValidatedAgainstNamespaceOverride() throws Exception { + // Each layer is valid against the broker defaults, but combined they invert the + // hysteresis invariant: ns raises mergeMsgRateIn to 5000, topic lowers + // splitMsgRateIn to 2000 → split <= merge. The topic-level set must see the + // current namespace override and reject with 412. + String namespace = getNamespace(); + String topic = newScalableTopic(); + try { + admin.namespaces().setScalableTopicAutoScalePolicy(namespace, + AutoScalePolicyOverride.builder().mergeMsgRateInThreshold(5_000.0).build()); + + assertThrows(PulsarAdminException.PreconditionFailedException.class, + () -> admin.scalableTopics().setAutoScalePolicy(topic, + AutoScalePolicyOverride.builder() + .splitMsgRateInThreshold(2_000.0).build())); + + // The same topic override is accepted once the conflicting namespace layer + // is gone. + admin.namespaces().removeScalableTopicAutoScalePolicy(namespace); + admin.scalableTopics().setAutoScalePolicy(topic, + AutoScalePolicyOverride.builder().splitMsgRateInThreshold(2_000.0).build()); + } finally { + admin.namespaces().removeScalableTopicAutoScalePolicy(namespace); + } + } + @Test public void testTopicLevelOnMissingTopicIs404() { String missing = "topic://" + getNamespace() + "/does-not-exist"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java index 3457155085f2d..6b1fc90769615 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java @@ -302,6 +302,33 @@ public void testTopicOverrideSurvivesSplit() throws Exception { assertEquals(activeSegmentCount(), 3, "capped at the per-topic maxSegments"); } + @Test + public void testInvalidOverrideCombinationFallsBackToDisabled() throws Exception { + // Namespace and topic overrides that are each valid against the broker defaults but + // invalid combined: the namespace raises the merge threshold, the topic lowers the + // matching split threshold below it (hysteresis inversion). The controller must not + // fail the evaluation chain — it treats auto split/merge as disabled until fixed. + namespacePolicies = new Policies(); + namespacePolicies.scalableTopicAutoScalePolicy = AutoScalePolicyOverride.builder() + .mergeMsgRateInThreshold(5_000.0) + .build(); + + startController(2); + resources.updateScalableTopicAsync(topicName, md -> { + md.setAutoScalePolicy(AutoScalePolicyOverride.builder() + .splitMsgRateInThreshold(2_000.0) + .build()); + return md; + }).get(); + + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + // Must complete normally (no IllegalArgumentException) and take no action. + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 2, + "invalid override combination must disable auto split/merge, not split"); + } + @Test public void testConsumerBurstConvergesWithoutTicks() throws Exception { // A group of consumers joining in quick succession must converge to one segment From 215234013c7de1803b5bb0c0a51fa0ee3dfc358b Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 12 Jun 2026 08:40:50 -0700 Subject: [PATCH 4/4] Address review: document the 204 no-override response on the policy GET endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both GET endpoints resume with null when no override is set, which the JAX-RS layer turns into a 204 with no body — the OpenAPI text claimed a 200 with an empty body. Document the 204 explicitly at both levels (the Java admin client already maps it to null). --- .../java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 3 ++- .../org/apache/pulsar/broker/admin/v2/ScalableTopics.java | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index ac0fa75292d47..33f4eb76858b3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -886,8 +886,9 @@ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse @Operation(summary = "Get the scalable-topic auto split/merge policy override for a namespace") @ApiResponses(value = { @ApiResponse(responseCode = "200", - description = "Get the scalable-topic auto split/merge policy override for a namespace", + description = "The scalable-topic auto split/merge policy override for the namespace", content = @Content(schema = @Schema(implementation = AutoScalePolicyOverride.class))), + @ApiResponse(responseCode = "204", description = "No override is set on this namespace"), @ApiResponse(responseCode = "403", description = "Don't have admin permission"), @ApiResponse(responseCode = "404", description = "Tenant or namespace doesn't exist")}) public void getScalableTopicAutoScalePolicy(@Suspended AsyncResponse asyncResponse, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java index 86f9be6a73c62..5a365ffa5239a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java @@ -487,9 +487,9 @@ public void getScalableTopicMetadata( @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy") @Operation(summary = "Get the per-topic auto split/merge policy override.") @ApiResponses(value = { - @ApiResponse(responseCode = "200", description = "The per-topic auto split/merge policy override; " - + "empty body if no override is set.", + @ApiResponse(responseCode = "200", description = "The per-topic auto split/merge policy override.", content = @Content(schema = @Schema(implementation = AutoScalePolicyOverride.class))), + @ApiResponse(responseCode = "204", description = "No override is set on this topic"), @ApiResponse(responseCode = "401", description = "Don't have permission to administrate resources on this tenant"), @ApiResponse(responseCode = "403", description = "Don't have admin permission on the namespace"),