Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@
import org.apache.helix.rest.server.filters.ClusterAuth;
import org.apache.helix.rest.server.json.instance.InstanceInfo;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.util.InstanceUtil;
import org.apache.helix.util.InstanceValidationUtil;
import org.apache.helix.util.MinActiveReplicaCheckResult;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -564,10 +568,30 @@ public Response updateInstance(@PathParam("clusterId") String clusterId,
@Timed(name = HttpConstants.WRITE_REQUEST)
@DELETE
public Response deleteInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName) {
@PathParam("instanceName") String instanceName,
@DefaultValue("false") @QueryParam("force") boolean force) {
HelixAdmin admin = getHelixAdmin();
try {
InstanceConfig instanceConfig = admin.getInstanceConfig(clusterId, instanceName);
if (instanceConfig == null) {
return notFound(
String.format("Instance %s not found in cluster %s", instanceName, clusterId));
}

if (!force) {
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
MinActiveReplicaCheckResult checkResult =
InstanceValidationUtil.siblingNodesActiveReplicaCheckWithDetails(
dataAccessor, instanceName, Collections.emptySet());
if (!checkResult.isPassed()) {
LOG.warn("Guard rail blocked deletion of instance {} in cluster {}: {}",
instanceName, clusterId, checkResult);
return badRequest(String.format(
"Cannot drop instance %s: %s. Use force=true query param to override.",
instanceName, checkResult));
}
}

admin.dropInstance(clusterId, instanceConfig);
} catch (HelixException e) {
return badRequest(e.getMessage());
Expand Down Expand Up @@ -599,6 +623,7 @@ public Response getInstanceConfig(@PathParam("clusterId") String clusterId,
@Path("configs")
public Response updateInstanceConfig(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName, @QueryParam("command") String commandStr,
@DefaultValue("false") @QueryParam("force") boolean force,
String content) {
Command command;
if (commandStr == null || commandStr.isEmpty()) {
Expand Down Expand Up @@ -631,6 +656,10 @@ record = toZNRecord(content);
*/
validateDeltaTopologySettingInInstanceConfig(clusterId, instanceName, configAccessor,
instanceConfig, command);
if (!force) {
validateInstanceCapacityChange(clusterId, instanceName, configAccessor,
instanceConfig);
}
configAccessor.updateInstanceConfig(clusterId, instanceName, instanceConfig);
break;
case delete:
Expand All @@ -645,8 +674,8 @@ record = toZNRecord(content);
return badRequest(String.format("Unsupported command: %s", command));
}
} catch (IllegalArgumentException ex) {
LOG.error("Invalid topology setting for Instance : {}. Fail the config update", instanceName, ex);
return serverError(ex);
LOG.error("Invalid config for Instance: {}. Fail the config update", instanceName, ex);
return badRequest(ex.getMessage());
} catch (HelixException ex) {
return notFound(ex.getMessage());
} catch (Exception ex) {
Expand Down Expand Up @@ -929,4 +958,53 @@ private boolean validateDeltaTopologySettingInInstanceConfig(String clusterName,
instanceName);
}

/**
* Validates that an instance config update does not break WAGED rebalancer capacity constraints.
* Only triggers when the update includes INSTANCE_CAPACITY_MAP changes that reduce a capacity
* value. Simulates the merged config and checks that all required capacity keys remain present.
*
* @throws IllegalArgumentException if the capacity change would break WAGED constraints
*/
private void validateInstanceCapacityChange(String clusterName, String instanceName,
ConfigAccessor configAccessor, InstanceConfig newInstanceConfig) {
Map<String, Integer> newCapacityMap = newInstanceConfig.getInstanceCapacityMap();
if (newCapacityMap.isEmpty()) {
return;
}

InstanceConfig existingConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
if (existingConfig == null) {
return;
}
Map<String, Integer> currentCapacityMap = existingConfig.getInstanceCapacityMap();

boolean capacityReduced = false;
for (Map.Entry<String, Integer> entry : newCapacityMap.entrySet()) {
Integer currentValue = currentCapacityMap.get(entry.getKey());
if (currentValue != null && entry.getValue() < currentValue) {
capacityReduced = true;
break;
}
}

if (!capacityReduced) {
return;
}

ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
if (clusterConfig == null) {
return;
}

try {
InstanceConfig mergedConfig = new InstanceConfig(existingConfig.getRecord());
mergedConfig.getRecord().update(newInstanceConfig.getRecord());
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, mergedConfig);
} catch (HelixException e) {
throw new IllegalArgumentException(String.format(
"Capacity update for instance %s would break WAGED rebalancer constraints: %s. "
+ "Use force=true query param to override.", instanceName, e.getMessage()));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1547,4 +1547,225 @@ private Map<String, String> getInstanceCurrentStates(String instanceName) {
}
return assignment;
}

// --- Guard Rail Tests for deleteInstance ---

/**
* Test that deleting an instance that has no partitions in ExternalView passes the guard rail.
* We add a fresh instance (not live, not in any ExternalView) and verify delete succeeds.
*/
@Test
public void testDeleteInstanceGuardRailPassesForUnassignedInstance() {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String testInstance = CLUSTER_NAME + "localhost_19999";
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, testInstance);

// Instance is not live and has no ExternalView assignments, guard rail should pass
delete("clusters/" + CLUSTER_NAME + "/instances/" + testInstance,
Response.Status.OK.getStatusCode());
System.out.println("End test :" + TestHelper.getTestMethodName());
}

/**
* Test that deleting an instance with active replicas that would violate minActiveReplicas
* is blocked by the guard rail. The test cluster has minActiveReplicas=3 with only 2 replicas,
* so removing any instance with active partitions should fail.
*/
@Test
public void testDeleteInstanceGuardRailBlocksWhenMinActiveReplicaViolated() {
System.out.println("Start test :" + TestHelper.getTestMethodName());
// Use a separate cluster (TestCluster_0) to avoid interfering with other tests.
// TestCluster_0 has resources with minActiveReplicas set and active instances.
String clusterName = "TestCluster_0";
String testInstance = clusterName + "localhost_19998";

// Add the instance and set up an ExternalView entry that makes it look active
_gSetupTool.addInstanceToCluster(clusterName, testInstance);
String resource = clusterName + "_db_0";
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseAccessor);
ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView(resource));
if (ev != null) {
// Inject the test instance into a partition's state map with SLAVE state
String partition = ev.getPartitionSet().iterator().next();
Map<String, String> stateMap = new HashMap<>(ev.getStateMap(partition));
stateMap.put(testInstance, "SLAVE");
ev.setStateMap(partition, stateMap);
accessor.setProperty(accessor.keyBuilder().externalView(resource), ev);
}

// Attempt to delete without force -- should be blocked
Response response = target("clusters/" + clusterName + "/instances/" + testInstance)
.request().delete();
Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
String body = response.readEntity(String.class);
Assert.assertTrue(body.contains("MIN_ACTIVE_REPLICA_CHECK_FAILED"),
"Error should contain min active replica check failure details. Got: " + body);

// Clean up: remove the injected entry and drop the instance
if (ev != null) {
String partition = ev.getPartitionSet().iterator().next();
Map<String, String> stateMap = new HashMap<>(ev.getStateMap(partition));
stateMap.remove(testInstance);
ev.setStateMap(partition, stateMap);
accessor.setProperty(accessor.keyBuilder().externalView(resource), ev);
}
_gSetupTool.getClusterManagementTool().dropInstance(clusterName,
_gSetupTool.getClusterManagementTool().getInstanceConfig(clusterName, testInstance));
System.out.println("End test :" + TestHelper.getTestMethodName());
}

/**
* Test that deleteInstance with force=true bypasses the guard rail even when
* minActiveReplica would be violated.
*/
@Test
public void testDeleteInstanceGuardRailBypassWithForce() {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String testInstance = CLUSTER_NAME + "localhost_19997";
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, testInstance);

// Delete with force=true should always succeed for an offline instance
delete("clusters/" + CLUSTER_NAME + "/instances/" + testInstance + "?force=true",
Response.Status.OK.getStatusCode());
System.out.println("End test :" + TestHelper.getTestMethodName());
}

// --- Guard Rail Tests for updateInstanceConfig capacity changes ---

/**
* Test that updating instance config with a capacity map reduction that breaks
* WAGED rebalancer constraints is blocked.
*/
@Test
public void testUpdateInstanceConfigCapacityReductionBlocked() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String clusterName = "TestCluster_1";
String instanceName = clusterName + "localhost_12918";

// Set up cluster config with required capacity keys
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setInstanceCapacityKeys(Arrays.asList("cpu", "memory", "disk"));
_configAccessor.setClusterConfig(clusterName, clusterConfig);

// Set instance capacity with all required keys
InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(clusterName, instanceName);
Map<String, Integer> capacityMap = new HashMap<>();
capacityMap.put("cpu", 100);
capacityMap.put("memory", 256);
capacityMap.put("disk", 500);
instanceConfig.setInstanceCapacityMap(capacityMap);
_configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig);

// Now try to update capacity map to remove the "disk" key (reduce to only cpu and memory)
// with a reduced cpu value. The cluster has no default for "disk", so after merge
// the required key "disk" would be missing from the update payload.
ZNRecord updateRecord = new ZNRecord(instanceName);
Map<String, String> reducedCapacity = new HashMap<>();
reducedCapacity.put("cpu", "50");
reducedCapacity.put("memory", "128");
updateRecord.setMapField("INSTANCE_CAPACITY_MAP", reducedCapacity);

Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(updateRecord),
MediaType.APPLICATION_JSON_TYPE);

// This should return 400 because the merged config would be missing the "disk" key
// Note: the update merges maps, so the existing "disk" key should remain.
// Instead, let's test with a cluster that has no defaults and the instance loses a key.
// Actually, the merge in configAccessor.updateInstanceConfig merges at the ZNRecord level,
// and map fields merge at the map level too. So setting a new map with only cpu+memory
// would replace the INSTANCE_CAPACITY_MAP field entirely.
new JerseyUriRequestBuilder("clusters/{}/instances/{}/configs?command=update")
.expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode())
.format(clusterName, instanceName).post(this, entity);

// Clean up: restore the original capacity and remove capacity keys requirement
instanceConfig = _configAccessor.getInstanceConfig(clusterName, instanceName);
instanceConfig.setInstanceCapacityMap(capacityMap);
_configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig);
clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setInstanceCapacityKeys(Collections.emptyList());
_configAccessor.setClusterConfig(clusterName, clusterConfig);
System.out.println("End test :" + TestHelper.getTestMethodName());
}

/**
* Test that updating instance config with force=true bypasses the capacity guard rail.
*/
@Test
public void testUpdateInstanceConfigCapacityReductionForceBypass() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String clusterName = "TestCluster_2";
String instanceName = clusterName + "localhost_12918";

// Set up cluster config with required capacity keys
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setInstanceCapacityKeys(Arrays.asList("cpu", "memory", "disk"));
_configAccessor.setClusterConfig(clusterName, clusterConfig);

// Set instance capacity with all required keys
InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(clusterName, instanceName);
Map<String, Integer> capacityMap = new HashMap<>();
capacityMap.put("cpu", 100);
capacityMap.put("memory", 256);
capacityMap.put("disk", 500);
instanceConfig.setInstanceCapacityMap(capacityMap);
_configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig);

// Update with reduced capacity but force=true should succeed
ZNRecord updateRecord = new ZNRecord(instanceName);
Map<String, String> reducedCapacity = new HashMap<>();
reducedCapacity.put("cpu", "50");
reducedCapacity.put("memory", "128");
updateRecord.setMapField("INSTANCE_CAPACITY_MAP", reducedCapacity);

Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(updateRecord),
MediaType.APPLICATION_JSON_TYPE);
new JerseyUriRequestBuilder("clusters/{}/instances/{}/configs?command=update&force=true")
.format(clusterName, instanceName).post(this, entity);

// Clean up
instanceConfig = _configAccessor.getInstanceConfig(clusterName, instanceName);
instanceConfig.setInstanceCapacityMap(capacityMap);
_configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig);
clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setInstanceCapacityKeys(Collections.emptyList());
_configAccessor.setClusterConfig(clusterName, clusterConfig);
System.out.println("End test :" + TestHelper.getTestMethodName());
}

/**
* Test that updating a non-capacity field in instance config does not trigger
* the capacity guard rail.
*/
@Test
public void testUpdateInstanceConfigNonCapacityChangePasses() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String clusterName = "TestCluster_3";
String instanceName = clusterName + "localhost_12918";

// Update a simple field (not capacity-related)
ZNRecord updateRecord = new ZNRecord(instanceName);
updateRecord.getSimpleFields().put("CUSTOM_TAG", "test-value");

Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(updateRecord),
MediaType.APPLICATION_JSON_TYPE);

// Should succeed since no capacity change
new JerseyUriRequestBuilder("clusters/{}/instances/{}/configs?command=update")
.format(clusterName, instanceName).post(this, entity);

// Verify the field was set
InstanceConfig config = _configAccessor.getInstanceConfig(clusterName, instanceName);
Assert.assertEquals(config.getRecord().getSimpleField("CUSTOM_TAG"), "test-value");

// Clean up
ZNRecord deleteRecord = new ZNRecord(instanceName);
deleteRecord.getSimpleFields().put("CUSTOM_TAG", "test-value");
entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(deleteRecord),
MediaType.APPLICATION_JSON_TYPE);
new JerseyUriRequestBuilder("clusters/{}/instances/{}/configs?command=delete")
.format(clusterName, instanceName).post(this, entity);
System.out.println("End test :" + TestHelper.getTestMethodName());
}
}
Loading