From b35516c22cf864e117d67017e7110f41fd9245e1 Mon Sep 17 00:00:00 2001 From: Karan Tripathi Date: Tue, 14 Apr 2026 11:44:06 +0530 Subject: [PATCH] Add guard rails to PerInstanceAccessor for instance deletion and capacity updates Guard rails prevent known failure scenarios where instance operations cause rebalance failures: 1. deleteInstance(): Run siblingNodesActiveReplicaCheckWithDetails() before dropping an instance to verify min-active-replica constraints won't be violated. Returns 400 with specific resource/partition/replica details when blocked. 2. updateInstanceConfig(): Validate INSTANCE_CAPACITY_MAP changes don't break WAGED rebalancer constraints by simulating the merged config and checking required capacity keys remain present. Both guard rails support ?force=true query parameter to allow operators to bypass validation when they are certain the operation is safe. Also changed IllegalArgumentException handling in updateInstanceConfig() from 500 to 400 status code since these are client input validation errors. CICP-3788 Made-with: Cursor --- .../resources/helix/PerInstanceAccessor.java | 84 ++++++- .../rest/server/TestPerInstanceAccessor.java | 221 ++++++++++++++++++ 2 files changed, 302 insertions(+), 3 deletions(-) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index cdcfecdaa6..7587b87900 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -70,7 +70,11 @@ import org.apache.helix.rest.server.filters.ClusterAuth; import org.apache.helix.rest.server.json.instance.InstanceInfo; import org.apache.helix.rest.server.json.instance.StoppableCheck; +import org.apache.helix.controller.rebalancer.util.WagedValidationUtil; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.util.InstanceUtil; +import org.apache.helix.util.InstanceValidationUtil; +import org.apache.helix.util.MinActiveReplicaCheckResult; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; @@ -564,10 +568,30 @@ public Response updateInstance(@PathParam("clusterId") String clusterId, @Timed(name = HttpConstants.WRITE_REQUEST) @DELETE public Response deleteInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName) { + @PathParam("instanceName") String instanceName, + @DefaultValue("false") @QueryParam("force") boolean force) { HelixAdmin admin = getHelixAdmin(); try { InstanceConfig instanceConfig = admin.getInstanceConfig(clusterId, instanceName); + if (instanceConfig == null) { + return notFound( + String.format("Instance %s not found in cluster %s", instanceName, clusterId)); + } + + if (!force) { + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + MinActiveReplicaCheckResult checkResult = + InstanceValidationUtil.siblingNodesActiveReplicaCheckWithDetails( + dataAccessor, instanceName, Collections.emptySet()); + if (!checkResult.isPassed()) { + LOG.warn("Guard rail blocked deletion of instance {} in cluster {}: {}", + instanceName, clusterId, checkResult); + return badRequest(String.format( + "Cannot drop instance %s: %s. Use force=true query param to override.", + instanceName, checkResult)); + } + } + admin.dropInstance(clusterId, instanceConfig); } catch (HelixException e) { return badRequest(e.getMessage()); @@ -599,6 +623,7 @@ public Response getInstanceConfig(@PathParam("clusterId") String clusterId, @Path("configs") public Response updateInstanceConfig(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @QueryParam("command") String commandStr, + @DefaultValue("false") @QueryParam("force") boolean force, String content) { Command command; if (commandStr == null || commandStr.isEmpty()) { @@ -631,6 +656,10 @@ record = toZNRecord(content); */ validateDeltaTopologySettingInInstanceConfig(clusterId, instanceName, configAccessor, instanceConfig, command); + if (!force) { + validateInstanceCapacityChange(clusterId, instanceName, configAccessor, + instanceConfig); + } configAccessor.updateInstanceConfig(clusterId, instanceName, instanceConfig); break; case delete: @@ -645,8 +674,8 @@ record = toZNRecord(content); return badRequest(String.format("Unsupported command: %s", command)); } } catch (IllegalArgumentException ex) { - LOG.error("Invalid topology setting for Instance : {}. Fail the config update", instanceName, ex); - return serverError(ex); + LOG.error("Invalid config for Instance: {}. Fail the config update", instanceName, ex); + return badRequest(ex.getMessage()); } catch (HelixException ex) { return notFound(ex.getMessage()); } catch (Exception ex) { @@ -929,4 +958,53 @@ private boolean validateDeltaTopologySettingInInstanceConfig(String clusterName, instanceName); } + /** + * Validates that an instance config update does not break WAGED rebalancer capacity constraints. + * Only triggers when the update includes INSTANCE_CAPACITY_MAP changes that reduce a capacity + * value. Simulates the merged config and checks that all required capacity keys remain present. + * + * @throws IllegalArgumentException if the capacity change would break WAGED constraints + */ + private void validateInstanceCapacityChange(String clusterName, String instanceName, + ConfigAccessor configAccessor, InstanceConfig newInstanceConfig) { + Map newCapacityMap = newInstanceConfig.getInstanceCapacityMap(); + if (newCapacityMap.isEmpty()) { + return; + } + + InstanceConfig existingConfig = configAccessor.getInstanceConfig(clusterName, instanceName); + if (existingConfig == null) { + return; + } + Map currentCapacityMap = existingConfig.getInstanceCapacityMap(); + + boolean capacityReduced = false; + for (Map.Entry entry : newCapacityMap.entrySet()) { + Integer currentValue = currentCapacityMap.get(entry.getKey()); + if (currentValue != null && entry.getValue() < currentValue) { + capacityReduced = true; + break; + } + } + + if (!capacityReduced) { + return; + } + + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + if (clusterConfig == null) { + return; + } + + try { + InstanceConfig mergedConfig = new InstanceConfig(existingConfig.getRecord()); + mergedConfig.getRecord().update(newInstanceConfig.getRecord()); + WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, mergedConfig); + } catch (HelixException e) { + throw new IllegalArgumentException(String.format( + "Capacity update for instance %s would break WAGED rebalancer constraints: %s. " + + "Use force=true query param to override.", instanceName, e.getMessage())); + } + } + } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java index 468c8a527d..6ad3bb5ce8 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java @@ -1547,4 +1547,225 @@ private Map getInstanceCurrentStates(String instanceName) { } return assignment; } + + // --- Guard Rail Tests for deleteInstance --- + + /** + * Test that deleting an instance that has no partitions in ExternalView passes the guard rail. + * We add a fresh instance (not live, not in any ExternalView) and verify delete succeeds. + */ + @Test + public void testDeleteInstanceGuardRailPassesForUnassignedInstance() { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String testInstance = CLUSTER_NAME + "localhost_19999"; + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, testInstance); + + // Instance is not live and has no ExternalView assignments, guard rail should pass + delete("clusters/" + CLUSTER_NAME + "/instances/" + testInstance, + Response.Status.OK.getStatusCode()); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + /** + * Test that deleting an instance with active replicas that would violate minActiveReplicas + * is blocked by the guard rail. The test cluster has minActiveReplicas=3 with only 2 replicas, + * so removing any instance with active partitions should fail. + */ + @Test + public void testDeleteInstanceGuardRailBlocksWhenMinActiveReplicaViolated() { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + // Use a separate cluster (TestCluster_0) to avoid interfering with other tests. + // TestCluster_0 has resources with minActiveReplicas set and active instances. + String clusterName = "TestCluster_0"; + String testInstance = clusterName + "localhost_19998"; + + // Add the instance and set up an ExternalView entry that makes it look active + _gSetupTool.addInstanceToCluster(clusterName, testInstance); + String resource = clusterName + "_db_0"; + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, + _baseAccessor); + ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView(resource)); + if (ev != null) { + // Inject the test instance into a partition's state map with SLAVE state + String partition = ev.getPartitionSet().iterator().next(); + Map stateMap = new HashMap<>(ev.getStateMap(partition)); + stateMap.put(testInstance, "SLAVE"); + ev.setStateMap(partition, stateMap); + accessor.setProperty(accessor.keyBuilder().externalView(resource), ev); + } + + // Attempt to delete without force -- should be blocked + Response response = target("clusters/" + clusterName + "/instances/" + testInstance) + .request().delete(); + Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); + String body = response.readEntity(String.class); + Assert.assertTrue(body.contains("MIN_ACTIVE_REPLICA_CHECK_FAILED"), + "Error should contain min active replica check failure details. Got: " + body); + + // Clean up: remove the injected entry and drop the instance + if (ev != null) { + String partition = ev.getPartitionSet().iterator().next(); + Map stateMap = new HashMap<>(ev.getStateMap(partition)); + stateMap.remove(testInstance); + ev.setStateMap(partition, stateMap); + accessor.setProperty(accessor.keyBuilder().externalView(resource), ev); + } + _gSetupTool.getClusterManagementTool().dropInstance(clusterName, + _gSetupTool.getClusterManagementTool().getInstanceConfig(clusterName, testInstance)); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + /** + * Test that deleteInstance with force=true bypasses the guard rail even when + * minActiveReplica would be violated. + */ + @Test + public void testDeleteInstanceGuardRailBypassWithForce() { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String testInstance = CLUSTER_NAME + "localhost_19997"; + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, testInstance); + + // Delete with force=true should always succeed for an offline instance + delete("clusters/" + CLUSTER_NAME + "/instances/" + testInstance + "?force=true", + Response.Status.OK.getStatusCode()); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + // --- Guard Rail Tests for updateInstanceConfig capacity changes --- + + /** + * Test that updating instance config with a capacity map reduction that breaks + * WAGED rebalancer constraints is blocked. + */ + @Test + public void testUpdateInstanceConfigCapacityReductionBlocked() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String clusterName = "TestCluster_1"; + String instanceName = clusterName + "localhost_12918"; + + // Set up cluster config with required capacity keys + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); + clusterConfig.setInstanceCapacityKeys(Arrays.asList("cpu", "memory", "disk")); + _configAccessor.setClusterConfig(clusterName, clusterConfig); + + // Set instance capacity with all required keys + InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(clusterName, instanceName); + Map capacityMap = new HashMap<>(); + capacityMap.put("cpu", 100); + capacityMap.put("memory", 256); + capacityMap.put("disk", 500); + instanceConfig.setInstanceCapacityMap(capacityMap); + _configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig); + + // Now try to update capacity map to remove the "disk" key (reduce to only cpu and memory) + // with a reduced cpu value. The cluster has no default for "disk", so after merge + // the required key "disk" would be missing from the update payload. + ZNRecord updateRecord = new ZNRecord(instanceName); + Map reducedCapacity = new HashMap<>(); + reducedCapacity.put("cpu", "50"); + reducedCapacity.put("memory", "128"); + updateRecord.setMapField("INSTANCE_CAPACITY_MAP", reducedCapacity); + + Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(updateRecord), + MediaType.APPLICATION_JSON_TYPE); + + // This should return 400 because the merged config would be missing the "disk" key + // Note: the update merges maps, so the existing "disk" key should remain. + // Instead, let's test with a cluster that has no defaults and the instance loses a key. + // Actually, the merge in configAccessor.updateInstanceConfig merges at the ZNRecord level, + // and map fields merge at the map level too. So setting a new map with only cpu+memory + // would replace the INSTANCE_CAPACITY_MAP field entirely. + new JerseyUriRequestBuilder("clusters/{}/instances/{}/configs?command=update") + .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode()) + .format(clusterName, instanceName).post(this, entity); + + // Clean up: restore the original capacity and remove capacity keys requirement + instanceConfig = _configAccessor.getInstanceConfig(clusterName, instanceName); + instanceConfig.setInstanceCapacityMap(capacityMap); + _configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig); + clusterConfig = _configAccessor.getClusterConfig(clusterName); + clusterConfig.setInstanceCapacityKeys(Collections.emptyList()); + _configAccessor.setClusterConfig(clusterName, clusterConfig); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + /** + * Test that updating instance config with force=true bypasses the capacity guard rail. + */ + @Test + public void testUpdateInstanceConfigCapacityReductionForceBypass() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String clusterName = "TestCluster_2"; + String instanceName = clusterName + "localhost_12918"; + + // Set up cluster config with required capacity keys + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); + clusterConfig.setInstanceCapacityKeys(Arrays.asList("cpu", "memory", "disk")); + _configAccessor.setClusterConfig(clusterName, clusterConfig); + + // Set instance capacity with all required keys + InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(clusterName, instanceName); + Map capacityMap = new HashMap<>(); + capacityMap.put("cpu", 100); + capacityMap.put("memory", 256); + capacityMap.put("disk", 500); + instanceConfig.setInstanceCapacityMap(capacityMap); + _configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig); + + // Update with reduced capacity but force=true should succeed + ZNRecord updateRecord = new ZNRecord(instanceName); + Map reducedCapacity = new HashMap<>(); + reducedCapacity.put("cpu", "50"); + reducedCapacity.put("memory", "128"); + updateRecord.setMapField("INSTANCE_CAPACITY_MAP", reducedCapacity); + + Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(updateRecord), + MediaType.APPLICATION_JSON_TYPE); + new JerseyUriRequestBuilder("clusters/{}/instances/{}/configs?command=update&force=true") + .format(clusterName, instanceName).post(this, entity); + + // Clean up + instanceConfig = _configAccessor.getInstanceConfig(clusterName, instanceName); + instanceConfig.setInstanceCapacityMap(capacityMap); + _configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig); + clusterConfig = _configAccessor.getClusterConfig(clusterName); + clusterConfig.setInstanceCapacityKeys(Collections.emptyList()); + _configAccessor.setClusterConfig(clusterName, clusterConfig); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + /** + * Test that updating a non-capacity field in instance config does not trigger + * the capacity guard rail. + */ + @Test + public void testUpdateInstanceConfigNonCapacityChangePasses() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String clusterName = "TestCluster_3"; + String instanceName = clusterName + "localhost_12918"; + + // Update a simple field (not capacity-related) + ZNRecord updateRecord = new ZNRecord(instanceName); + updateRecord.getSimpleFields().put("CUSTOM_TAG", "test-value"); + + Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(updateRecord), + MediaType.APPLICATION_JSON_TYPE); + + // Should succeed since no capacity change + new JerseyUriRequestBuilder("clusters/{}/instances/{}/configs?command=update") + .format(clusterName, instanceName).post(this, entity); + + // Verify the field was set + InstanceConfig config = _configAccessor.getInstanceConfig(clusterName, instanceName); + Assert.assertEquals(config.getRecord().getSimpleField("CUSTOM_TAG"), "test-value"); + + // Clean up + ZNRecord deleteRecord = new ZNRecord(instanceName); + deleteRecord.getSimpleFields().put("CUSTOM_TAG", "test-value"); + entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(deleteRecord), + MediaType.APPLICATION_JSON_TYPE); + new JerseyUriRequestBuilder("clusters/{}/instances/{}/configs?command=delete") + .format(clusterName, instanceName).post(this, entity); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } }