Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.scalable.SegmentInfo;

/**
Expand Down Expand Up @@ -51,4 +52,10 @@ public class ScalableTopicMetadata {
/** User-defined topic properties. */
@Builder.Default
private Map<String, String> properties = Map.of();

/**
* Per-topic auto split/merge policy override (PIP-483). {@code null} means no override:
* the namespace policy and then the broker configuration apply.
*/
private AutoScalePolicyOverride autoScalePolicy;
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.scalable.AutoScaleConfig;
import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter;
import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
import org.apache.pulsar.broker.web.RestException;
Expand All @@ -84,6 +85,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -1308,6 +1310,36 @@ protected CompletableFuture<Void> internalSetAutoTopicCreationAsync(
}));
}

protected CompletableFuture<AutoScalePolicyOverride> internalGetScalableTopicAutoScalePolicyAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SCALABLE_TOPIC_AUTO_SCALE,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.scalableTopicAutoScalePolicy);
}

protected CompletableFuture<Void> internalSetScalableTopicAutoScalePolicyAsync(
AutoScalePolicyOverride override) {
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> {
if (override != null) {
// The override only has to be valid in combination with the broker
// defaults — resolve and let the invariant checks reject bad values
// (e.g. zero split thresholds, split <= merge hysteresis inversion).
try {
AutoScaleConfig.resolve(pulsar().getConfig(), override, null);
} catch (IllegalArgumentException e) {
throw new RestException(Status.PRECONDITION_FAILED, e.getMessage());
}
}
})
.thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
policies.scalableTopicAutoScalePolicy = override;
return policies;
}));
}

protected CompletableFuture<Void> internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride
autoSubscriptionCreationOverride) {
// Force to read the data s.t. the watch to the cache content is setup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -880,6 +881,102 @@ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse
});
}

@GET
@Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy")
@Operation(summary = "Get the scalable-topic auto split/merge policy override for a namespace")
@ApiResponses(value = {
@ApiResponse(responseCode = "200",
description = "The scalable-topic auto split/merge policy override for the namespace",
content = @Content(schema = @Schema(implementation = AutoScalePolicyOverride.class))),
@ApiResponse(responseCode = "204", description = "No override is set on this namespace"),
@ApiResponse(responseCode = "403", description = "Don't have admin permission"),
@ApiResponse(responseCode = "404", description = "Tenant or namespace doesn't exist")})
public void getScalableTopicAutoScalePolicy(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalGetScalableTopicAutoScalePolicyAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error()
.attr("namespace", namespaceName)
.exception(ex)
.log("Failed to get scalableTopicAutoScalePolicy for namespace");
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy")
@Operation(summary = "Override the broker's scalable-topic auto split/merge settings for a namespace")
@ApiResponses(value = {
@ApiResponse(responseCode = "204", description = "Operation successful"),
@ApiResponse(responseCode = "403", description = "Don't have admin permission"),
@ApiResponse(responseCode = "404", description = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(responseCode = "412",
description = "The resolved auto split/merge policy violates an invariant")})
public void setScalableTopicAutoScalePolicy(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@RequestBody(description = "Auto split/merge policy override", required = true)
AutoScalePolicyOverride override) {
validateNamespaceName(tenant, namespace);
internalSetScalableTopicAutoScalePolicyAsync(override)
.thenAccept(__ -> {
log.info()
.attr("namespace", namespaceName)
.log("Successfully set scalableTopicAutoScalePolicy on namespace");
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error()
.attr("namespace", namespaceName)
.exception(ex)
.log("Failed to set scalableTopicAutoScalePolicy on namespace");
if (ex instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy")
@Operation(summary = "Remove the scalable-topic auto split/merge policy override from a namespace")
@ApiResponses(value = {
@ApiResponse(responseCode = "204", description = "Operation successful"),
@ApiResponse(responseCode = "403", description = "Don't have admin permission"),
@ApiResponse(responseCode = "404", description = "Tenant or cluster or namespace doesn't exist") })
public void removeScalableTopicAutoScalePolicy(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetScalableTopicAutoScalePolicyAsync(null)
.thenAccept(__ -> {
log.info()
.attr("namespace", namespaceName)
.log("Successfully removed scalableTopicAutoScalePolicy on namespace");
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error()
.attr("namespace", namespaceName)
.exception(ex)
.log("Failed to remove scalableTopicAutoScalePolicy on namespace");
if (ex instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/autoSubscriptionCreation")
@Operation(summary = "Override broker's allowAutoSubscriptionCreation setting for a namespace")
Expand Down
Loading
Loading