diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java index 8af852316514f..228e2c25e6518 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java @@ -24,6 +24,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.scalable.SegmentInfo; /** @@ -51,4 +52,10 @@ public class ScalableTopicMetadata { /** User-defined topic properties. */ @Builder.Default private Map properties = Map.of(); + + /** + * Per-topic auto split/merge policy override (PIP-483). {@code null} means no override: + * the namespace policy and then the broker configuration apply. + */ + private AutoScalePolicyOverride autoScalePolicy; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index baff684bd08c8..5b14c909d7c16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -66,6 +66,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.scalable.AutoScaleConfig; import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter; import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache; import org.apache.pulsar.broker.web.RestException; @@ -84,6 +85,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -1308,6 +1310,36 @@ protected CompletableFuture internalSetAutoTopicCreationAsync( })); } + protected CompletableFuture internalGetScalableTopicAutoScalePolicyAsync() { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SCALABLE_TOPIC_AUTO_SCALE, + PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenApply(policies -> policies.scalableTopicAutoScalePolicy); + } + + protected CompletableFuture internalSetScalableTopicAutoScalePolicyAsync( + AutoScalePolicyOverride override) { + return validateNamespacePolicyOperationAsync(namespaceName, + PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenAccept(__ -> { + if (override != null) { + // The override only has to be valid in combination with the broker + // defaults — resolve and let the invariant checks reject bad values + // (e.g. zero split thresholds, split <= merge hysteresis inversion). + try { + AutoScaleConfig.resolve(pulsar().getConfig(), override, null); + } catch (IllegalArgumentException e) { + throw new RestException(Status.PRECONDITION_FAILED, e.getMessage()); + } + } + }) + .thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { + policies.scalableTopicAutoScalePolicy = override; + return policies; + })); + } + protected CompletableFuture internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) { // Force to read the data s.t. the watch to the cache content is setup. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 77c18f4169d17..33f4eb76858b3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -60,6 +60,7 @@ import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -880,6 +881,102 @@ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse }); } + @GET + @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy") + @Operation(summary = "Get the scalable-topic auto split/merge policy override for a namespace") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", + description = "The scalable-topic auto split/merge policy override for the namespace", + content = @Content(schema = @Schema(implementation = AutoScalePolicyOverride.class))), + @ApiResponse(responseCode = "204", description = "No override is set on this namespace"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission"), + @ApiResponse(responseCode = "404", description = "Tenant or namespace doesn't exist")}) + public void getScalableTopicAutoScalePolicy(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalGetScalableTopicAutoScalePolicyAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error() + .attr("namespace", namespaceName) + .exception(ex) + .log("Failed to get scalableTopicAutoScalePolicy for namespace"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy") + @Operation(summary = "Override the broker's scalable-topic auto split/merge settings for a namespace") + @ApiResponses(value = { + @ApiResponse(responseCode = "204", description = "Operation successful"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission"), + @ApiResponse(responseCode = "404", description = "Tenant or cluster or namespace doesn't exist"), + @ApiResponse(responseCode = "412", + description = "The resolved auto split/merge policy violates an invariant")}) + public void setScalableTopicAutoScalePolicy( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + @RequestBody(description = "Auto split/merge policy override", required = true) + AutoScalePolicyOverride override) { + validateNamespaceName(tenant, namespace); + internalSetScalableTopicAutoScalePolicyAsync(override) + .thenAccept(__ -> { + log.info() + .attr("namespace", namespaceName) + .log("Successfully set scalableTopicAutoScalePolicy on namespace"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); + log.error() + .attr("namespace", namespaceName) + .exception(ex) + .log("Failed to set scalableTopicAutoScalePolicy on namespace"); + if (ex instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, ex); + } + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy") + @Operation(summary = "Remove the scalable-topic auto split/merge policy override from a namespace") + @ApiResponses(value = { + @ApiResponse(responseCode = "204", description = "Operation successful"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission"), + @ApiResponse(responseCode = "404", description = "Tenant or cluster or namespace doesn't exist") }) + public void removeScalableTopicAutoScalePolicy(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalSetScalableTopicAutoScalePolicyAsync(null) + .thenAccept(__ -> { + log.info() + .attr("namespace", namespaceName) + .log("Successfully removed scalableTopicAutoScalePolicy on namespace"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); + log.error() + .attr("namespace", namespaceName) + .exception(ex) + .log("Failed to remove scalableTopicAutoScalePolicy on namespace"); + if (ex instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, ex); + } + return null; + }); + } + @POST @Path("/{tenant}/{namespace}/autoSubscriptionCreation") @Operation(summary = "Override broker's allowAutoSubscriptionCreation setting for a namespace") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java index 13afb2abe2e62..5a365ffa5239a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java @@ -55,13 +55,17 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.resources.ScalableTopicMetadata; import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.broker.service.scalable.AutoScaleConfig; import org.apache.pulsar.broker.service.scalable.ScalableTopicController; import org.apache.pulsar.broker.service.scalable.ScalableTopicService; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.scalable.ScalableTopicConstants; @@ -477,6 +481,153 @@ public void getScalableTopicMetadata( }); } + // --- Auto split/merge policy override (PIP-483) --- + + @GET + @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy") + @Operation(summary = "Get the per-topic auto split/merge policy override.") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "The per-topic auto split/merge policy override.", + content = @Content(schema = @Schema(implementation = AutoScalePolicyOverride.class))), + @ApiResponse(responseCode = "204", description = "No override is set on this topic"), + @ApiResponse(responseCode = "401", + description = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission on the namespace"), + @ApiResponse(responseCode = "404", description = "Scalable topic doesn't exist"), + @ApiResponse(responseCode = "500", description = "Internal server error")}) + public void getAutoScalePolicy( + @Suspended final AsyncResponse asyncResponse, + @Parameter(description = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @Parameter(description = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @Parameter(description = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic) { + validateNamespaceName(tenant, namespace); + TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, encodedTopic); + + validateTopicPolicyOperationAsync(tn, PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.READ) + .thenCompose(__ -> resources().getScalableTopicMetadataAsync(tn)) + .thenAccept(optMd -> { + if (optMd.isEmpty()) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, + "Scalable topic not found: " + tn)); + } else { + asyncResponse.resume(optMd.get().getAutoScalePolicy()); + } + }) + .exceptionally(ex -> { + log.error().attr("clientAppId", clientAppId()).attr("topic", tn) + .exception(ex).log("Failed to get autoScalePolicy for scalable topic"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy") + @Operation(summary = "Set the per-topic auto split/merge policy override.") + @ApiResponses(value = { + @ApiResponse(responseCode = "204", description = "Override set successfully"), + @ApiResponse(responseCode = "401", + description = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission on the namespace"), + @ApiResponse(responseCode = "404", description = "Scalable topic doesn't exist"), + @ApiResponse(responseCode = "412", + description = "The resolved auto split/merge policy violates an invariant"), + @ApiResponse(responseCode = "500", description = "Internal server error")}) + public void setAutoScalePolicy( + @Suspended final AsyncResponse asyncResponse, + @Parameter(description = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @Parameter(description = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @Parameter(description = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @RequestBody(description = "Auto split/merge policy override", required = true) + AutoScalePolicyOverride override) { + validateNamespaceName(tenant, namespace); + TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, encodedTopic); + internalSetAutoScalePolicy(asyncResponse, tn, override); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy") + @Operation(summary = "Remove the per-topic auto split/merge policy override.") + @ApiResponses(value = { + @ApiResponse(responseCode = "204", description = "Override removed successfully"), + @ApiResponse(responseCode = "401", + description = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(responseCode = "403", description = "Don't have admin permission on the namespace"), + @ApiResponse(responseCode = "404", description = "Scalable topic doesn't exist"), + @ApiResponse(responseCode = "500", description = "Internal server error")}) + public void removeAutoScalePolicy( + @Suspended final AsyncResponse asyncResponse, + @Parameter(description = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @Parameter(description = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @Parameter(description = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic) { + validateNamespaceName(tenant, namespace); + TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, encodedTopic); + internalSetAutoScalePolicy(asyncResponse, tn, null); + } + + private void internalSetAutoScalePolicy(AsyncResponse asyncResponse, TopicName tn, + AutoScalePolicyOverride override) { + validateTopicPolicyOperationAsync(tn, PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.WRITE) + .thenCompose(__ -> { + if (override == null) { + return CompletableFuture.completedFuture(null); + } + // Validate the override in combination with the layers it will actually + // be resolved with: the broker defaults AND the current namespace + // override — two layers that are each valid against the defaults can + // still combine into an invalid policy (e.g. the namespace raises a + // merge threshold and the topic lowers the matching split threshold). + // This check is best-effort: the namespace override can still change + // afterwards, and broker defaults can differ across restarts — the + // controller handles a combination that has become invalid by falling + // back to disabled (see ScalableTopicController.resolveAutoScaleConfig). + return pulsar().getPulsarResources().getNamespaceResources() + .getPoliciesAsync(namespaceName) + .thenAccept(optPolicies -> { + AutoScalePolicyOverride nsOverride = optPolicies + .map(p -> p.scalableTopicAutoScalePolicy) + .orElse(null); + try { + AutoScaleConfig.resolve(pulsar().getConfig(), nsOverride, override); + } catch (IllegalArgumentException e) { + throw new RestException(Response.Status.PRECONDITION_FAILED, + e.getMessage()); + } + }); + }) + .thenCompose(__ -> resources().updateScalableTopicAsync(tn, md -> { + md.setAutoScalePolicy(override); + return md; + })) + .thenAccept(__ -> { + log.info().attr("clientAppId", clientAppId()).attr("topic", tn) + .attr("removed", override == null) + .log("Updated autoScalePolicy on scalable topic"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); + if (ex instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, + "Scalable topic not found: " + tn)); + return null; + } + log.error().attr("clientAppId", clientAppId()).attr("topic", tn) + .exception(ex).log("Failed to update autoScalePolicy for scalable topic"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + // --- Delete --- @DELETE diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java index b739b102a76b9..848b59cfa146b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java @@ -21,6 +21,7 @@ import java.time.Duration; import lombok.Builder; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; /** * Fully-resolved auto split/merge policy for a single scalable topic (PIP-483). @@ -81,6 +82,30 @@ public record AutoScaleConfig( * @return the resolved policy reflecting the {@code scalableTopic*} settings */ public static AutoScaleConfig fromBrokerConfig(ServiceConfiguration conf) { + return brokerDefaults(conf).validated(); + } + + /** + * Resolve the effective policy for a topic: broker defaults, overlaid with the namespace + * override, overlaid with the topic override (most-specific wins per field; {@code null} + * override fields fall through). + * + * @param conf the broker configuration (cluster-wide defaults) + * @param namespaceOverride the namespace-level override, or {@code null} if none + * @param topicOverride the topic-level override, or {@code null} if none + * @return the validated effective policy + * @throws IllegalArgumentException if the resolved policy violates an invariant + */ + public static AutoScaleConfig resolve(ServiceConfiguration conf, + AutoScalePolicyOverride namespaceOverride, + AutoScalePolicyOverride topicOverride) { + AutoScaleConfig config = brokerDefaults(conf); + config = applyOverride(config, namespaceOverride); + config = applyOverride(config, topicOverride); + return config.validated(); + } + + private static AutoScaleConfig brokerDefaults(ServiceConfiguration conf) { return AutoScaleConfig.builder() .enabled(conf.isScalableTopicAutoScaleEnabled()) .maxSegments(conf.getScalableTopicMaxSegments()) @@ -97,8 +122,60 @@ public static AutoScaleConfig fromBrokerConfig(ServiceConfiguration conf) { .mergeBytesRateIn(conf.getScalableTopicMergeBytesRateInThreshold()) .mergeMsgRateOut(conf.getScalableTopicMergeMsgRateOutThreshold()) .mergeBytesRateOut(conf.getScalableTopicMergeBytesRateOutThreshold()) - .build() - .validated(); + .build(); + } + + private static AutoScaleConfig applyOverride(AutoScaleConfig base, AutoScalePolicyOverride o) { + if (o == null) { + return base; + } + AutoScaleConfigBuilder b = base.toBuilder(); + if (o.getEnabled() != null) { + b.enabled(o.getEnabled()); + } + if (o.getMaxSegments() != null) { + b.maxSegments(o.getMaxSegments()); + } + if (o.getMinSegments() != null) { + b.minSegments(o.getMinSegments()); + } + if (o.getMaxDagDepth() != null) { + b.maxDagDepth(o.getMaxDagDepth()); + } + if (o.getSplitCooldownSeconds() != null) { + b.splitCooldown(Duration.ofSeconds(o.getSplitCooldownSeconds())); + } + if (o.getMergeCooldownSeconds() != null) { + b.mergeCooldown(Duration.ofSeconds(o.getMergeCooldownSeconds())); + } + if (o.getMergeWindowSeconds() != null) { + b.mergeWindow(Duration.ofSeconds(o.getMergeWindowSeconds())); + } + if (o.getSplitMsgRateInThreshold() != null) { + b.splitMsgRateIn(o.getSplitMsgRateInThreshold()); + } + if (o.getSplitBytesRateInThreshold() != null) { + b.splitBytesRateIn(o.getSplitBytesRateInThreshold()); + } + if (o.getSplitMsgRateOutThreshold() != null) { + b.splitMsgRateOut(o.getSplitMsgRateOutThreshold()); + } + if (o.getSplitBytesRateOutThreshold() != null) { + b.splitBytesRateOut(o.getSplitBytesRateOutThreshold()); + } + if (o.getMergeMsgRateInThreshold() != null) { + b.mergeMsgRateIn(o.getMergeMsgRateInThreshold()); + } + if (o.getMergeBytesRateInThreshold() != null) { + b.mergeBytesRateIn(o.getMergeBytesRateInThreshold()); + } + if (o.getMergeMsgRateOutThreshold() != null) { + b.mergeMsgRateOut(o.getMergeMsgRateOutThreshold()); + } + if (o.getMergeBytesRateOutThreshold() != null) { + b.mergeBytesRateOut(o.getMergeBytesRateOutThreshold()); + } + return b.build(); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index 0b8c70d66b19b..a85187df28c52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -44,6 +44,7 @@ import org.apache.pulsar.common.api.proto.ScalableConsumerType; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.scalable.HashRange; @@ -359,10 +360,6 @@ private CompletableFuture evaluateAndAct(String trigger) { if (brokerConfig == null) { return CompletableFuture.completedFuture(null); } - AutoScaleConfig config = AutoScaleConfig.fromBrokerConfig(brokerConfig); - if (!config.enabled()) { - return CompletableFuture.completedFuture(null); - } if (!autoScaleInFlight.compareAndSet(false, true)) { // Another evaluation or auto operation is already running. Don't drop the // trigger: mark it pending so the in-flight run re-evaluates on completion — @@ -372,11 +369,18 @@ private CompletableFuture evaluateAndAct(String trigger) { return CompletableFuture.completedFuture(null); } try { - return collectConsumerCounts() - .thenCombine(collectLoadSamples(), (consumers, load) -> - AutoScalePolicyEvaluator.decide(currentLayout, load, consumers, config, - clock.millis(), lastSplitAtMs, lastMergeAtMs)) - .thenCompose(decision -> dispatch(decision, config, trigger)) + return resolveAutoScaleConfig(brokerConfig) + .thenCompose(config -> { + if (!config.enabled()) { + return CompletableFuture.completedFuture(null); + } + return collectConsumerCounts() + .thenCombine(collectLoadSamples(), (consumers, load) -> + AutoScalePolicyEvaluator.decide(currentLayout, load, + consumers, config, clock.millis(), + lastSplitAtMs, lastMergeAtMs)) + .thenCompose(decision -> dispatch(decision, config, trigger)); + }) .whenComplete((__, ex) -> { autoScaleInFlight.set(false); if (autoScaleReEvaluate.getAndSet(false)) { @@ -393,6 +397,45 @@ private CompletableFuture evaluateAndAct(String trigger) { } } + /** + * Resolve the effective auto split/merge policy for this topic: broker defaults overlaid + * with the namespace-level override ({@code Policies.scalableTopicAutoScalePolicy}) and + * then the per-topic override ({@code ScalableTopicMetadata.autoScalePolicy}). Both reads + * are metadata-cache-backed, so this is cheap per evaluation and override changes take + * effect on the next tick without controller restarts. + * + *

Set-time validation is best-effort only (the namespace override can change after a + * topic override was validated against it, and broker defaults can change across + * restarts), so the stored combination can be invalid here. In that case auto split/merge + * is treated as disabled for the topic — predictable, and loudly logged on every + * evaluation until an operator fixes the overrides — rather than failing the evaluation + * chain. + */ + private CompletableFuture resolveAutoScaleConfig( + ServiceConfiguration brokerConfig) { + CompletableFuture namespaceOverride = + brokerService.getPulsar().getPulsarResources().getNamespaceResources() + .getPoliciesAsync(topicName.getNamespaceObject()) + .thenApply(opt -> opt.map(p -> p.scalableTopicAutoScalePolicy) + .orElse(null)); + CompletableFuture topicOverride = + resources.getScalableTopicMetadataAsync(topicName) + .thenApply(opt -> opt.map(ScalableTopicMetadata::getAutoScalePolicy) + .orElse(null)); + return namespaceOverride.thenCombine(topicOverride, (ns, topic) -> { + try { + return AutoScaleConfig.resolve(brokerConfig, ns, topic); + } catch (IllegalArgumentException e) { + log.warn().attr("reason", e.getMessage()) + .log("Resolved auto split/merge policy is invalid; treating auto " + + "split/merge as disabled for this topic until the namespace " + + "or topic override is fixed"); + return AutoScaleConfig.fromBrokerConfig(brokerConfig) + .toBuilder().enabled(false).build(); + } + }); + } + private CompletableFuture dispatch(AutoScaleDecision decision, AutoScaleConfig config, String trigger) { if (decision instanceof AutoScaleDecision.Split split) { @@ -657,7 +700,7 @@ public CompletableFuture splitSegment(long segmentId) { .thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md -> { SegmentLayout latest = SegmentLayout.fromMetadata(md); SegmentLayout updated = latest.splitSegment(segmentId, nowMs); - return updated.toMetadata(md.getProperties()); + return updated.toMetadata(md); })) .thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) .thenCompose(optMd -> { @@ -706,7 +749,7 @@ public CompletableFuture mergeSegments(long segmentId1, long segm .thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md -> { SegmentLayout latest = SegmentLayout.fromMetadata(md); SegmentLayout updated = latest.mergeSegments(segmentId1, segmentId2, nowMs); - return updated.toMetadata(md.getProperties()); + return updated.toMetadata(md); })) .thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) .thenCompose(optMd -> { @@ -1192,7 +1235,7 @@ private CompletableFuture pruneAllAsync(List drained) { updated = updated.pruneSegment(s.segmentId()); } } - return updated == latest ? md : updated.toMetadata(md.getProperties()); + return updated == latest ? md : updated.toMetadata(md); }).thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) .thenCompose(optMd -> { currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java index a3f1f0c5ef101..c823fb75e918a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java @@ -276,14 +276,17 @@ public SegmentLayout pruneSegment(long segmentId) { } /** - * Convert back to metadata for persistence. + * Convert back to metadata for persistence, carrying over the non-layout fields + * (properties, per-topic auto-scale policy) from the record being replaced. Layout + * mutations must never silently drop fields they don't model. */ - public ScalableTopicMetadata toMetadata(Map properties) { + public ScalableTopicMetadata toMetadata(ScalableTopicMetadata original) { return ScalableTopicMetadata.builder() .epoch(epoch) .nextSegmentId(nextSegmentId) .segments(new LinkedHashMap<>(allSegments)) - .properties(properties) + .properties(original.getProperties()) + .autoScalePolicy(original.getAutoScalePolicy()) .build(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java new file mode 100644 index 0000000000000..85c65f9b9ec8d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import java.util.UUID; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; +import org.testng.annotations.Test; + +/** + * End-to-end coverage for the auto split/merge policy override admin API (PIP-483), at both + * levels, through the full HTTP path against a real shared broker. + * + *

Endpoints under test: + * + *

    + *
  • namespace: {@code /admin/v2/namespaces/{tenant}/{ns}/scalableTopicAutoScalePolicy}
  • + *
  • topic: {@code /admin/v2/scalable/{tenant}/{ns}/{topic}/autoScalePolicy}
  • + *
+ */ +public class ScalableTopicAutoScalePolicyTest extends SharedPulsarBaseTest { + + private String newScalableTopic() throws Exception { + String topic = "topic://" + getNamespace() + "/autoscale-" + + UUID.randomUUID().toString().substring(0, 8); + admin.scalableTopics().createScalableTopic(topic, 1); + return topic; + } + + @Test + public void testTopicLevelRoundTrip() throws Exception { + String topic = newScalableTopic(); + + // No override initially. + assertNull(admin.scalableTopics().getAutoScalePolicy(topic)); + + AutoScalePolicyOverride override = AutoScalePolicyOverride.builder() + .enabled(true) + .maxSegments(8) + .splitMsgRateInThreshold(5_000.0) + .build(); + admin.scalableTopics().setAutoScalePolicy(topic, override); + assertEquals(admin.scalableTopics().getAutoScalePolicy(topic), override); + + admin.scalableTopics().removeAutoScalePolicy(topic); + assertNull(admin.scalableTopics().getAutoScalePolicy(topic)); + } + + @Test + public void testNamespaceLevelRoundTrip() throws Exception { + String namespace = getNamespace(); + + assertNull(admin.namespaces().getScalableTopicAutoScalePolicy(namespace)); + + AutoScalePolicyOverride override = AutoScalePolicyOverride.builder() + .enabled(false) + .build(); + admin.namespaces().setScalableTopicAutoScalePolicy(namespace, override); + assertEquals(admin.namespaces().getScalableTopicAutoScalePolicy(namespace), override); + + admin.namespaces().removeScalableTopicAutoScalePolicy(namespace); + assertNull(admin.namespaces().getScalableTopicAutoScalePolicy(namespace)); + } + + @Test + public void testInvalidOverrideRejected() throws Exception { + String topic = newScalableTopic(); + + // minSegments above the broker-default maxSegments breaks min <= max on resolution. + AutoScalePolicyOverride bad = AutoScalePolicyOverride.builder() + .minSegments(1_000_000) + .build(); + assertThrows(PulsarAdminException.PreconditionFailedException.class, + () -> admin.scalableTopics().setAutoScalePolicy(topic, bad)); + assertThrows(PulsarAdminException.PreconditionFailedException.class, + () -> admin.namespaces().setScalableTopicAutoScalePolicy(getNamespace(), bad)); + } + + @Test + public void testTopicOverrideValidatedAgainstNamespaceOverride() throws Exception { + // Each layer is valid against the broker defaults, but combined they invert the + // hysteresis invariant: ns raises mergeMsgRateIn to 5000, topic lowers + // splitMsgRateIn to 2000 → split <= merge. The topic-level set must see the + // current namespace override and reject with 412. + String namespace = getNamespace(); + String topic = newScalableTopic(); + try { + admin.namespaces().setScalableTopicAutoScalePolicy(namespace, + AutoScalePolicyOverride.builder().mergeMsgRateInThreshold(5_000.0).build()); + + assertThrows(PulsarAdminException.PreconditionFailedException.class, + () -> admin.scalableTopics().setAutoScalePolicy(topic, + AutoScalePolicyOverride.builder() + .splitMsgRateInThreshold(2_000.0).build())); + + // The same topic override is accepted once the conflicting namespace layer + // is gone. + admin.namespaces().removeScalableTopicAutoScalePolicy(namespace); + admin.scalableTopics().setAutoScalePolicy(topic, + AutoScalePolicyOverride.builder().splitMsgRateInThreshold(2_000.0).build()); + } finally { + admin.namespaces().removeScalableTopicAutoScalePolicy(namespace); + } + } + + @Test + public void testTopicLevelOnMissingTopicIs404() { + String missing = "topic://" + getNamespace() + "/does-not-exist"; + assertThrows(PulsarAdminException.NotFoundException.class, + () -> admin.scalableTopics().setAutoScalePolicy(missing, + AutoScalePolicyOverride.builder().enabled(false).build())); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java index acc104c2773c5..7a991834461eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java @@ -55,6 +55,52 @@ public void testDefaultsMatchBrokerConfig() { assertTrue(c.splitBytesRateOut() > c.mergeBytesRateOut()); } + @Test + public void testResolveLayersOverrides() { + ServiceConfiguration conf = new ServiceConfiguration(); + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride ns = + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder() + .maxSegments(16) + .splitMsgRateInThreshold(20_000.0) + .build(); + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride topic = + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder() + .maxSegments(4) + .splitCooldownSeconds(5L) + .build(); + + AutoScaleConfig c = AutoScaleConfig.resolve(conf, ns, topic); + // Topic wins where both set. + assertEquals(c.maxSegments(), 4); + // Namespace applies where the topic is silent. + assertEquals(c.splitMsgRateIn(), 20_000.0); + // Topic-only field applies. + assertEquals(c.splitCooldown(), Duration.ofSeconds(5)); + // Untouched fields fall through to the broker defaults. + assertEquals(c.mergeCooldown(), Duration.ofMinutes(5)); + assertTrue(c.enabled()); + } + + @Test + public void testResolveNullOverridesEqualsBrokerConfig() { + ServiceConfiguration conf = new ServiceConfiguration(); + assertEquals(AutoScaleConfig.resolve(conf, null, null), + AutoScaleConfig.fromBrokerConfig(conf)); + } + + @Test + public void testResolveRejectsInvalidCombination() { + // The override is only invalid in combination: a merge threshold raised above the + // broker-default split threshold breaks the hysteresis invariant. + ServiceConfiguration conf = new ServiceConfiguration(); + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride bad = + org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder() + .mergeMsgRateInThreshold(conf.getScalableTopicSplitMsgRateInThreshold()) + .build(); + assertThrows(IllegalArgumentException.class, + () -> AutoScaleConfig.resolve(conf, null, bad)); + } + @Test public void testValidationRejectsBadConfig() { // Zero split threshold: the evaluator scores rate/threshold, so 0 would yield diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java index 74ad5e8a5a347..6b1fc90769615 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java @@ -32,6 +32,8 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.resources.ScalableTopicResources; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.TransportCnx; @@ -39,7 +41,10 @@ import org.apache.pulsar.client.admin.ScalableTopics; import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.common.api.proto.ScalableConsumerType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.scalable.SegmentLoadStats; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.coordination.CoordinationService; @@ -70,6 +75,10 @@ public class ScalableTopicControllerAutoScaleTest { private PulsarService pulsar; private ServiceConfiguration config; private ScalableTopics scalableTopics; + private PulsarResources pulsarResources; + private NamespaceResources namespaceResources; + /** Namespace policies served by the mocked resources; null = namespace has no policies. */ + private Policies namespacePolicies; private ScalableTopicController controller; private TopicName topicName; @@ -104,6 +113,19 @@ public void setUp() throws Exception { when(pulsar.getBrokerId()).thenReturn(BROKER_ID); when(pulsar.getExecutor()).thenReturn(scheduler); when(pulsar.getConfig()).thenReturn(config); + + // Namespace policies feed the per-namespace auto-scale override resolution. + // Default: no policies → broker config applies. Tests install overrides via + // namespacePolicies. Reset explicitly — TestNG reuses the test instance. + namespacePolicies = null; + pulsarResources = mock(PulsarResources.class); + namespaceResources = mock(NamespaceResources.class); + when(pulsar.getPulsarResources()).thenReturn(pulsarResources); + when(pulsarResources.getNamespaceResources()).thenReturn(namespaceResources); + when(namespaceResources.getPoliciesAsync(any(NamespaceName.class))) + .thenAnswer(__ -> CompletableFuture.completedFuture( + Optional.ofNullable(namespacePolicies))); + when(pulsar.getAdminClient()).thenReturn(admin); when(admin.topics()).thenReturn(topics); when(admin.scalableTopics()).thenReturn(scalableTopics); @@ -205,6 +227,108 @@ public void testConsumerDrivenSplit() throws Exception { "2 consumers on 1 segment should drive a split")); } + @Test + public void testNamespaceOverrideDisablesAutoScale() throws Exception { + namespacePolicies = new Policies(); + namespacePolicies.scalableTopicAutoScalePolicy = + AutoScalePolicyOverride.builder().enabled(false).build(); + + startController(2); + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 2, + "namespace override enabled=false must suppress the split"); + } + + @Test + public void testTopicOverrideWinsOverNamespace() throws Exception { + // Namespace disables auto-scale; the topic explicitly re-enables it. + namespacePolicies = new Policies(); + namespacePolicies.scalableTopicAutoScalePolicy = + AutoScalePolicyOverride.builder().enabled(false).build(); + + startController(2); + resources.updateScalableTopicAsync(topicName, md -> { + md.setAutoScalePolicy(AutoScalePolicyOverride.builder().enabled(true).build()); + return md; + }).get(); + + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, + "topic override enabled=true must win over the namespace disable"); + } + + @Test + public void testTopicOverrideMaxSegmentsCapsSplit() throws Exception { + startController(2); + resources.updateScalableTopicAsync(topicName, md -> { + md.setAutoScalePolicy(AutoScalePolicyOverride.builder().maxSegments(2).build()); + return md; + }).get(); + + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 2, + "per-topic maxSegments=2 must cap the split despite the hot segment"); + } + + @Test + public void testTopicOverrideSurvivesSplit() throws Exception { + // The override must survive a layout mutation (toMetadata must carry it over). + startController(2); + AutoScalePolicyOverride override = + AutoScalePolicyOverride.builder().maxSegments(3).build(); + resources.updateScalableTopicAsync(topicName, md -> { + md.setAutoScalePolicy(override); + return md; + }).get(); + + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, "first split allowed up to maxSegments=3"); + assertEquals(resources.getScalableTopicMetadataAsync(topicName).get() + .orElseThrow().getAutoScalePolicy(), override, + "the per-topic override must survive the split's metadata rewrite"); + + // At the cap now — further hot reports must not split. + resources.reportSegmentLoadAsync(topicName, 1, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 3, "capped at the per-topic maxSegments"); + } + + @Test + public void testInvalidOverrideCombinationFallsBackToDisabled() throws Exception { + // Namespace and topic overrides that are each valid against the broker defaults but + // invalid combined: the namespace raises the merge threshold, the topic lowers the + // matching split threshold below it (hysteresis inversion). The controller must not + // fail the evaluation chain — it treats auto split/merge as disabled until fixed. + namespacePolicies = new Policies(); + namespacePolicies.scalableTopicAutoScalePolicy = AutoScalePolicyOverride.builder() + .mergeMsgRateInThreshold(5_000.0) + .build(); + + startController(2); + resources.updateScalableTopicAsync(topicName, md -> { + md.setAutoScalePolicy(AutoScalePolicyOverride.builder() + .splitMsgRateInThreshold(2_000.0) + .build()); + return md; + }).get(); + + resources.reportSegmentLoadAsync(topicName, 0, + new SegmentLoadStats(20_000, 0, 0, 0)).get(); + // Must complete normally (no IllegalArgumentException) and take no action. + controller.evaluateAutoScaleForTest().get(); + assertEquals(activeSegmentCount(), 2, + "invalid override combination must disable auto split/merge, not split"); + } + @Test public void testConsumerBurstConvergesWithoutTicks() throws Exception { // A group of consumers joining in quick succession must converge to one segment diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java index 364c9299a1124..afcab4c8b9861 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java @@ -269,14 +269,19 @@ public void testGetLineage() { @Test public void testToMetadata() { ScalableTopicMetadata metadata = ScalableTopicController.createInitialMetadata(2, Map.of("key", "value")); + // A layout mutation must round-trip every non-layout field, not just properties. + metadata.setAutoScalePolicy(org.apache.pulsar.common.policies.data.AutoScalePolicyOverride + .builder().enabled(false).maxSegments(8).build()); SegmentLayout layout = SegmentLayout.fromMetadata(metadata); SegmentLayout afterSplit = layout.splitSegment(0, 0L); - ScalableTopicMetadata restored = afterSplit.toMetadata(Map.of("key", "value")); + ScalableTopicMetadata restored = afterSplit.toMetadata(metadata); assertEquals(restored.getEpoch(), 1); assertEquals(restored.getNextSegmentId(), 4); assertEquals(restored.getSegments().size(), 4); assertEquals(restored.getProperties().get("key"), "value"); + assertEquals(restored.getAutoScalePolicy(), metadata.getAutoScalePolicy(), + "split/merge must not drop the per-topic auto-scale policy"); } @Test diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index cc3f5a9296403..b1063e8b542cb 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -1366,6 +1367,72 @@ CompletableFuture setAutoTopicCreationAsync( */ CompletableFuture removeAutoTopicCreationAsync(String namespace); + /** + * Sets the scalable-topic auto split/merge policy override for a namespace (PIP-483), + * overriding the broker's defaults for every scalable topic in the namespace that does + * not carry its own per-topic override. + * + * @param namespace + * Namespace name + * @param override + * the override; unset fields fall through to the broker configuration + * @throws PulsarAdminException + * Unexpected error + */ + void setScalableTopicAutoScalePolicy(String namespace, AutoScalePolicyOverride override) + throws PulsarAdminException; + + /** + * Sets the scalable-topic auto split/merge policy override for a namespace asynchronously. + * + * @param namespace + * Namespace name + * @param override + * the override; unset fields fall through to the broker configuration + */ + CompletableFuture setScalableTopicAutoScalePolicyAsync( + String namespace, AutoScalePolicyOverride override); + + /** + * Get the scalable-topic auto split/merge policy override for a namespace. + * + * @param namespace + * Namespace name + * @return the override, or {@code null} if none is set + * @throws PulsarAdminException + * Unexpected error + */ + AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String namespace) throws PulsarAdminException; + + /** + * Get the scalable-topic auto split/merge policy override for a namespace asynchronously. + * + * @param namespace + * Namespace name + * @return the override, or {@code null} if none is set + */ + CompletableFuture getScalableTopicAutoScalePolicyAsync(String namespace); + + /** + * Removes the scalable-topic auto split/merge policy override from a namespace, letting + * the broker configuration apply. + * + * @param namespace + * Namespace name + * @throws PulsarAdminException + * Unexpected error + */ + void removeScalableTopicAutoScalePolicy(String namespace) throws PulsarAdminException; + + /** + * Removes the scalable-topic auto split/merge policy override from a namespace + * asynchronously. + * + * @param namespace + * Namespace name + */ + CompletableFuture removeScalableTopicAutoScalePolicyAsync(String namespace); + /** * Sets the autoSubscriptionCreation policy for a given namespace, overriding broker settings. *

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java index debf54231b951..b2f7632951e7c 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.ScalableSubscriptionType; import org.apache.pulsar.common.policies.data.ScalableTopicMetadata; import org.apache.pulsar.common.policies.data.ScalableTopicStats; @@ -145,6 +146,54 @@ CompletableFuture createScalableTopicAsync(String topic, int numInitialSeg */ CompletableFuture getMetadataAsync(String topic); + /** + * Set the per-topic auto split/merge policy override (PIP-483). Overrides the namespace + * policy and the broker defaults for this topic; unset fields fall through. + * + * @param topic Topic name in the format "tenant/namespace/topic" + * @param override the override to apply + */ + void setAutoScalePolicy(String topic, AutoScalePolicyOverride override) throws PulsarAdminException; + + /** + * Set the per-topic auto split/merge policy override asynchronously. + * + * @param topic Topic name in the format "tenant/namespace/topic" + * @param override the override to apply + */ + CompletableFuture setAutoScalePolicyAsync(String topic, AutoScalePolicyOverride override); + + /** + * Get the per-topic auto split/merge policy override. + * + * @param topic Topic name in the format "tenant/namespace/topic" + * @return the override, or {@code null} if none is set + */ + AutoScalePolicyOverride getAutoScalePolicy(String topic) throws PulsarAdminException; + + /** + * Get the per-topic auto split/merge policy override asynchronously. + * + * @param topic Topic name in the format "tenant/namespace/topic" + * @return the override, or {@code null} if none is set + */ + CompletableFuture getAutoScalePolicyAsync(String topic); + + /** + * Remove the per-topic auto split/merge policy override, letting the namespace policy + * and broker defaults apply. + * + * @param topic Topic name in the format "tenant/namespace/topic" + */ + void removeAutoScalePolicy(String topic) throws PulsarAdminException; + + /** + * Remove the per-topic auto split/merge policy override asynchronously. + * + * @param topic Topic name in the format "tenant/namespace/topic" + */ + CompletableFuture removeAutoScalePolicyAsync(String topic); + /** * Delete a scalable topic and all its underlying segment topics. * diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java new file mode 100644 index 0000000000000..30ecd78e5a7ab --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Override of the scalable-topic auto split/merge policy (PIP-483), settable at the + * namespace level (field on {@link Policies}) and at the topic level (field on the + * scalable-topic metadata). + * + *

Every field is optional: an unset ({@code null}) field falls through to the next + * resolution layer — topic override → namespace override → broker configuration. Setting + * {@code enabled = false} opts the namespace or topic out of auto split/merge entirely. + * + *

The resolved policy must satisfy the same invariants as the broker configuration + * (positive split thresholds, split thresholds strictly above merge thresholds, + * {@code minSegments <= maxSegments}, non-negative cooldowns); an override that would + * violate them is rejected when it is set. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public final class AutoScalePolicyOverride { + + /** Master switch; {@code false} opts this namespace/topic out of auto split/merge. */ + private Boolean enabled; + + /** Hard ceiling on active segments; splits stop once reached. */ + private Integer maxSegments; + + /** Hard floor on active segments; merges stop once reached. */ + private Integer minSegments; + + /** Max merges in a segment's lineage before merges are disabled for it. */ + private Integer maxDagDepth; + + /** Minimum seconds between automatic splits (coalesces bursts). */ + private Long splitCooldownSeconds; + + /** Minimum seconds between automatic merges. */ + private Long mergeCooldownSeconds; + + /** Seconds a segment pair must stay cold before becoming merge-eligible. */ + private Long mergeWindowSeconds; + + /** Inbound msg/s above which a segment is split. */ + private Double splitMsgRateInThreshold; + + /** Inbound bytes/s above which a segment is split. */ + private Long splitBytesRateInThreshold; + + /** Outbound (dispatched) msg/s above which a segment is split. */ + private Double splitMsgRateOutThreshold; + + /** Outbound bytes/s above which a segment is split. */ + private Long splitBytesRateOutThreshold; + + /** Inbound msg/s below which a segment counts as cold for merging. */ + private Double mergeMsgRateInThreshold; + + /** Inbound bytes/s below which a segment counts as cold for merging. */ + private Long mergeBytesRateInThreshold; + + /** Outbound msg/s below which a segment counts as cold for merging. */ + private Double mergeMsgRateOutThreshold; + + /** Outbound bytes/s below which a segment counts as cold for merging. */ + private Long mergeBytesRateOutThreshold; +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index db0aa0620d56b..df3cf53a3ca59 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -61,6 +61,8 @@ public class Policies { public AutoTopicCreationOverride autoTopicCreationOverride = null; // If set, it will override the broker settings for allowing auto subscription creation public AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = null; + // If set, it will override the broker's scalable-topic auto split/merge settings (PIP-483) + public AutoScalePolicyOverride scalableTopicAutoScalePolicy = null; public Map publishMaxMessageRate = new HashMap<>(); @SuppressWarnings("checkstyle:MemberName") @@ -158,7 +160,7 @@ public int hashCode() { backlog_quota_map, publishMaxMessageRate, clusterDispatchRate, topicDispatchRate, subscriptionDispatchRate, replicatorDispatchRate, clusterSubscribeRate, deduplicationEnabled, autoTopicCreationOverride, - autoSubscriptionCreationOverride, persistence, + autoSubscriptionCreationOverride, scalableTopicAutoScalePolicy, persistence, bundles, latency_stats_sample_rate, message_ttl_in_seconds, subscription_expiration_time_minutes, retention_policies, encryption_required, delayed_delivery_policies, inactive_topic_policies, @@ -198,6 +200,7 @@ public boolean equals(Object obj) { && Objects.equals(deduplicationEnabled, other.deduplicationEnabled) && Objects.equals(autoTopicCreationOverride, other.autoTopicCreationOverride) && Objects.equals(autoSubscriptionCreationOverride, other.autoSubscriptionCreationOverride) + && Objects.equals(scalableTopicAutoScalePolicy, other.scalableTopicAutoScalePolicy) && Objects.equals(persistence, other.persistence) && Objects.equals(bundles, other.bundles) && Objects.equals(latency_stats_sample_rate, other.latency_stats_sample_rate) && Objects.equals(message_ttl_in_seconds, diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java index 6f3123228f167..ce246a1340d66 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java @@ -54,6 +54,12 @@ public class ScalableTopicMetadata { @Builder.Default private Map properties = Map.of(); + /** + * Per-topic auto split/merge policy override (PIP-483). {@code null} means no override: + * the namespace policy and then the broker configuration apply. + */ + private AutoScalePolicyOverride autoScalePolicy; + /** * Describes a single segment in a scalable topic's DAG. */ diff --git a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java index 4784ea9b1b985..6259d9799a46c 100644 --- a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java +++ b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java @@ -79,13 +79,16 @@ public void testAllArgsConstructor() { segments.put(0L, activeSegment(0L, 0, 0xFFFF, 0L)); Map props = Map.of("retention", "7d"); + AutoScalePolicyOverride autoScalePolicy = + AutoScalePolicyOverride.builder().enabled(false).build(); - ScalableTopicMetadata md = new ScalableTopicMetadata(3L, 1L, segments, props); + ScalableTopicMetadata md = new ScalableTopicMetadata(3L, 1L, segments, props, autoScalePolicy); assertEquals(md.getEpoch(), 3L); assertEquals(md.getNextSegmentId(), 1L); assertEquals(md.getSegments(), segments); assertEquals(md.getProperties(), props); + assertEquals(md.getAutoScalePolicy(), autoScalePolicy); } @Test diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 75a331a45dbd6..fd0e5c5bec00c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -566,6 +567,45 @@ public CompletableFuture removeAutoTopicCreationAsync(String namespace) { return asyncDeleteRequest(path); } + @Override + public void setScalableTopicAutoScalePolicy(String namespace, + AutoScalePolicyOverride override) throws PulsarAdminException { + sync(() -> setScalableTopicAutoScalePolicyAsync(namespace, override)); + } + + @Override + public CompletableFuture setScalableTopicAutoScalePolicyAsync( + String namespace, AutoScalePolicyOverride override) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "scalableTopicAutoScalePolicy"); + return asyncPostRequest(path, Entity.entity(override, MediaType.APPLICATION_JSON)); + } + + @Override + public AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String namespace) + throws PulsarAdminException { + return sync(() -> getScalableTopicAutoScalePolicyAsync(namespace)); + } + + @Override + public CompletableFuture getScalableTopicAutoScalePolicyAsync(String namespace) { + return asyncGetNamespaceParts(new FutureCallback() { + }, namespace, + "scalableTopicAutoScalePolicy"); + } + + @Override + public void removeScalableTopicAutoScalePolicy(String namespace) throws PulsarAdminException { + sync(() -> removeScalableTopicAutoScalePolicyAsync(namespace)); + } + + @Override + public CompletableFuture removeScalableTopicAutoScalePolicyAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "scalableTopicAutoScalePolicy"); + return asyncDeleteRequest(path); + } + @Override public void setAutoSubscriptionCreation(String namespace, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) throws PulsarAdminException { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java index e11c6cfe74711..e4905aa63c82d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride; import org.apache.pulsar.common.policies.data.ScalableTopicMetadata; import org.apache.pulsar.common.policies.data.ScalableTopicStats; @@ -141,6 +142,43 @@ public CompletableFuture getMetadataAsync(String topic) { return asyncGetRequest(topicPath(tn), ScalableTopicMetadata.class); } + // --- Auto split/merge policy override (PIP-483) --- + + @Override + public void setAutoScalePolicy(String topic, AutoScalePolicyOverride override) + throws PulsarAdminException { + sync(() -> setAutoScalePolicyAsync(topic, override)); + } + + @Override + public CompletableFuture setAutoScalePolicyAsync(String topic, AutoScalePolicyOverride override) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn).path("autoScalePolicy"); + return asyncPostRequest(path, Entity.entity(override, MediaType.APPLICATION_JSON)); + } + + @Override + public AutoScalePolicyOverride getAutoScalePolicy(String topic) throws PulsarAdminException { + return sync(() -> getAutoScalePolicyAsync(topic)); + } + + @Override + public CompletableFuture getAutoScalePolicyAsync(String topic) { + TopicName tn = validateTopic(topic); + return asyncGetRequest(topicPath(tn).path("autoScalePolicy"), AutoScalePolicyOverride.class); + } + + @Override + public void removeAutoScalePolicy(String topic) throws PulsarAdminException { + sync(() -> removeAutoScalePolicyAsync(topic)); + } + + @Override + public CompletableFuture removeAutoScalePolicyAsync(String topic) { + TopicName tn = validateTopic(topic); + return asyncDeleteRequest(topicPath(tn).path("autoScalePolicy")); + } + // --- Delete --- @Override diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java index fd05ee40e76b7..2a2b56c599b32 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java @@ -55,6 +55,7 @@ public enum PolicyName { DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, ALLOW_CLUSTERS, ALLOW_CUSTOM_METRIC_LABELS, + SCALABLE_TOPIC_AUTO_SCALE, // cluster policies CLUSTER_MIGRATION,