From b3b81c5489fbac70dbaa514936a1adb7cd70be71 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Fri, 8 Aug 2025 17:58:12 +0530 Subject: [PATCH 1/7] Fixed the bug if virtual topology zone config not present then exclude that instance from selection --- .vscode/settings.json | 3 + ZkDataExplorer.java | 0 config/external | 1 + .../topology/MissingFaultZoneException.java | 36 +++ .../rebalancer/topology/Topology.java | 14 +- .../apache/helix/examples/ZkDataExplorer.java | 0 ...tCrushRebalanceMissingVirtualTopology.java | 264 +++++++++++++++++ ...hRebalanceMissingVirtualTopologyFixed.java | 274 ++++++++++++++++++ .../TestDistClusterControllerStateModel.java | 0 9 files changed, 590 insertions(+), 2 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 ZkDataExplorer.java create mode 160000 config/external create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/MissingFaultZoneException.java create mode 100644 helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java create mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java create mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java create mode 100644 helix-core/src/test/java/org/apache/helix/participant/TestDistClusterControllerStateModel.java diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000000..7b016a89fb --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.compile.nullAnalysis.mode": "automatic" +} \ No newline at end of file diff --git a/ZkDataExplorer.java b/ZkDataExplorer.java new file mode 100644 index 0000000000..e69de29bb2 diff --git a/config/external b/config/external new file mode 160000 index 0000000000..c98a5a57fc --- /dev/null +++ b/config/external @@ -0,0 +1 @@ +Subproject commit c98a5a57fc0763c010233517c019f87a1c6bc428 diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/MissingFaultZoneException.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/MissingFaultZoneException.java new file mode 100644 index 0000000000..bb5520a4d7 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/MissingFaultZoneException.java @@ -0,0 +1,36 @@ +package org.apache.helix.controller.rebalancer.topology; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Exception thrown when an instance is missing required fault zone configuration. + * This exception is used to exclude instances from rebalancing to prevent + * singleton fault zones that can cause partition imbalance. + */ +public class MissingFaultZoneException extends IllegalArgumentException { + + public MissingFaultZoneException(String message) { + super(message); + } + + public MissingFaultZoneException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java index 2618275b13..38e8da5292 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -19,6 +19,7 @@ * under the License. */ +import io.netty.util.internal.StringUtil; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -203,6 +204,16 @@ private Node createClusterTree(ClusterConfig clusterConfig, boolean faultZoneLev // the topology tree. The topology tree only requires FaultZoneType and EndNodeType. unnecessaryTopoKeys.forEach(instanceTopologyMap::remove); } + // Check if fault zone is missing from original domain configuration - exclude instance to prevent singleton fault zones + String faultZoneType = _clusterTopologyConfig.getFaultZoneType(); + Map originalDomain = insConfig.getDomainAsMap(); + + if (!originalDomain.containsKey(faultZoneType) || StringUtil.isNullOrEmpty(originalDomain.get(faultZoneType))) { + logger.warn("Instance '{}' excluded from topology: fault zone '{}' is missing from original domain configuration. " + + "Domain: '{}'. This prevents singleton fault zones.", + instanceName, faultZoneType, insConfig.getDomainAsString()); + continue; // Skip this instance + } addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances); } catch (IllegalArgumentException e) { if (insConfig.getInstanceEnabled()) { @@ -257,9 +268,8 @@ private static LinkedHashMap computeInstanceTopologyMapHelper( } int numOfMatchedKeys = 0; for (String key : clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()) { - // if a key does not exist in the instance domain config, using the default domain value. String value = domainAsMap.get(key); - if (value == null || value.length() == 0) { + if (value == null || value.isEmpty()) { value = clusterTopologyConfig.getTopologyKeyDefaultValue().get(key); } else { numOfMatchedKeys++; diff --git a/helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java b/helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java new file mode 100644 index 0000000000..e69de29bb2 diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java new file mode 100644 index 0000000000..1425e3444b --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java @@ -0,0 +1,264 @@ +package org.apache.helix.integration.rebalancer.CrushRebalancers; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Test case reproducing the Venice production incident where missing virtual topology configuration + * leads to disproportionate partition assignment in CRUSH rebalancer. + * + * Background: Venice HAR cluster with topology /mz_virtualZone/host/applicationInstanceId + * had one instance join with missing mz_virtualZone key, causing it to receive 3.26x more + * partitions than expected (2,166 vs 664) due to becoming a singleton fault zone. + */ +public class TestCrushRebalanceMissingVirtualTopology extends ZkTestBase { + + private final String className = getShortClassName(); + private final String clusterName = CLUSTER_PREFIX + "_" + className; + + // Venice-like configuration + private static final String TOPOLOGY = "/mz_virtualZone/host/applicationInstanceId"; + private static final String FAULT_ZONE_TYPE = "mz_virtualZone"; + private static final int PARTITIONS = 500; // Reduced for more dramatic effect + private static final int REPLICAS = 3; + private static final String DB_NAME = "VeniceTest"; + + private ClusterControllerManager controller; + private List participants = new ArrayList<>(); + private List nodes = new ArrayList<>(); + private Set allDBs = new HashSet<>(); + + @BeforeClass + @Override + public void beforeClass() throws Exception { + super.beforeClass(); + System.out.println("START " + className + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(clusterName, true); + + // Configure Venice-like virtual topology cluster + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setTopology(TOPOLOGY); + clusterConfig.setFaultZoneType(FAULT_ZONE_TYPE); + clusterConfig.setTopologyAwareEnabled(true); + configAccessor.setClusterConfig(clusterName, clusterConfig); + + // Add "good" instances with proper virtual topology configuration + addGoodInstances(); + + // Add the "bad" instance with missing virtual topology key + addBadInstance(); + + // Start all participants + for (String node : nodes) { + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, node); + participant.syncStart(); + participants.add(participant); + } + + // Start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + controller = new ClusterControllerManager(ZK_ADDR, clusterName, controllerName); + controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, clusterName, true); + enableTopologyAwareRebalance(_gZkClient, clusterName, true); + } + + /** + * Add "good" instances that have proper virtual topology configuration. + * These instances will be distributed across multiple virtual zones as expected. + */ + private void addGoodInstances() { + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + + // Add instances across 2 virtual zones with fewer instances to amplify the singleton effect + int portBase = 12000; + int globalAppId = 0; // Global counter to ensure unique applicationInstanceId + for (int zoneId = 0; zoneId < 2; zoneId++) { + for (int hostId = 0; hostId < 2; hostId++) { + // Use hostname_port format so Helix can properly parse hostname and port + int port = portBase + (zoneId * 100) + (hostId * 10); + String instanceName = "host_" + zoneId + "_" + hostId + "_" + port; + nodes.add(instanceName); + + _gSetupTool.addInstanceToCluster(clusterName, instanceName); + + // Get the existing config and modify it + InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName); + + // Set the critical virtual topology configuration + // Use globalAppId to ensure unique applicationInstanceId (leaf node must be globally unique) + instanceConfig.setDomain("mz_virtualZone=zone_" + zoneId + ",host=host_" + zoneId + "_" + hostId + + ",applicationInstanceId=app_" + globalAppId); + + _gSetupTool.getClusterManagementTool().setInstanceConfig(clusterName, instanceName, instanceConfig); + globalAppId++; // Increment for next instance + } + } + } + + /** + * Add the "bad" instance that is missing the virtual topology key. + * This simulates the Venice production incident where one instance joined + * without proper mz_virtualZone configuration. + */ + private void addBadInstance() { + // Use hostname_port format so Helix can properly parse hostname and port + String badInstanceName = "bad_host_9999"; + nodes.add(badInstanceName); + + _gSetupTool.addInstanceToCluster(clusterName, badInstanceName); + + // Get the existing config and modify it + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + InstanceConfig badInstanceConfig = configAccessor.getInstanceConfig(clusterName, badInstanceName); + + // This is the problem: missing mz_virtualZone in domain configuration + // When mz_virtualZone is missing, Topology.java will assign default value + // "Helix_default_mz_virtualZone", creating a singleton fault zone + // Use unique applicationInstanceId (leaf node must be globally unique) + badInstanceConfig.setDomain("host=bad_host,applicationInstanceId=bad_app_999"); + + _gSetupTool.getClusterManagementTool().setInstanceConfig(clusterName, badInstanceName, badInstanceConfig); + } + + @Test + public void testMissingVirtualTopologyOverAllocation() { + // Add resource with CRUSH rebalancer strategy + _gSetupTool.addResourceToCluster(clusterName, DB_NAME, PARTITIONS, + BuiltInStateModelDefinitions.OnlineOffline.name(), + RebalanceMode.FULL_AUTO.name(), CrushRebalanceStrategy.class.getName()); + allDBs.add(DB_NAME); + + _gSetupTool.rebalanceStorageCluster(clusterName, DB_NAME, REPLICAS); + + // Wait for cluster to stabilize + ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName) + .setZkClient(_gZkClient).setResources(allDBs).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + // Analyze partition distribution + ExternalView externalView = _gSetupTool.getClusterManagementTool().getResourceExternalView(clusterName, DB_NAME); + Map partitionCounts = countPartitionsPerInstance(externalView); + + // Print distribution for debugging + System.out.println("Partition distribution:"); + for (Map.Entry entry : partitionCounts.entrySet()) { + System.out.println(" " + entry.getKey() + ": " + entry.getValue() + " partitions"); + } + + // Validate the Venice incident: bad instance should get disproportionate allocation + String badInstanceName = "bad_host_9999"; + int badInstancePartitions = partitionCounts.getOrDefault(badInstanceName, 0); + + // Calculate expected partitions per instance if distributed evenly + int totalInstances = nodes.size(); + int expectedPartitionsPerInstance = (PARTITIONS * REPLICAS) / totalInstances; + + System.out.println("Expected partitions per instance: " + expectedPartitionsPerInstance); + System.out.println("Bad instance got: " + badInstancePartitions); + + // The bad instance should receive significantly more than expected due to singleton fault zone + // In Venice's case, it was 3.26x more (2,166 vs 664) + // With CRUSH and singleton fault zone, we expect at least 1.5x over-allocation + double overAllocationRatio = (double) badInstancePartitions / expectedPartitionsPerInstance; + System.out.println("Over-allocation ratio: " + overAllocationRatio + "x"); + + Assert.assertTrue(overAllocationRatio > 1.5, + "Bad instance should receive significantly more partitions due to singleton fault zone. " + + "Expected >1.5x, got " + overAllocationRatio + "x"); + + // Verify that good instances have more balanced distribution + for (String instance : nodes) { + if (!instance.equals(badInstanceName)) { + int goodInstancePartitions = partitionCounts.getOrDefault(instance, 0); + double goodRatio = (double) goodInstancePartitions / expectedPartitionsPerInstance; + Assert.assertTrue(goodRatio < 1.5, + "Good instances should have more balanced allocation. Instance " + instance + + " has ratio " + goodRatio + "x"); + } + } + + // Demonstrate the root cause: verify topology mapping shows default values + validateTopologyMapping(); + } + + /** + * Count how many partitions each instance is assigned to. + */ + private Map countPartitionsPerInstance(ExternalView externalView) { + Map counts = new HashMap<>(); + + for (String partition : externalView.getPartitionSet()) { + Map stateMap = externalView.getStateMap(partition); + for (String instance : stateMap.keySet()) { + counts.put(instance, counts.getOrDefault(instance, 0) + 1); + } + } + + return counts; + } + + /** + * Validate that the topology mapping shows the root cause of the issue. + * The bad instance should have default values for missing topology keys. + */ + private void validateTopologyMapping() { + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + + // Check instance configurations to see the topology mapping + for (String instance : nodes) { + InstanceConfig config = configAccessor.getInstanceConfig(clusterName, instance); + String domain = config.getDomainAsString(); + System.out.println("Instance " + instance + " domain: " + domain); + + if (instance.contains("bad_host")) { + // The bad instance should be missing mz_virtualZone in its domain + Assert.assertFalse(domain.contains("mz_virtualZone"), + "Bad instance should not have mz_virtualZone configured"); + } else { + // Good instances should have proper mz_virtualZone + Assert.assertTrue(domain.contains("mz_virtualZone"), + "Good instances should have mz_virtualZone configured"); + } + } + } + + @AfterClass + public void afterClass() { + if (controller != null && controller.isConnected()) { + controller.syncStop(); + } + for (MockParticipantManager participant : participants) { + if (participant != null && participant.isConnected()) { + participant.syncStop(); + } + } + deleteCluster(clusterName); + System.out.println("END " + className + " at " + new Date(System.currentTimeMillis())); + } +} \ No newline at end of file diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java new file mode 100644 index 0000000000..72377f7ca8 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java @@ -0,0 +1,274 @@ +package org.apache.helix.integration.rebalancer.CrushRebalancers; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Test case verifying the fix for the Venice production incident where missing virtual topology + * configuration led to disproportionate partition assignment in CRUSH rebalancer. + * + * Background: Venice HAR cluster with topology /mz_virtualZone/host/applicationInstanceId + * had one instance join with missing mz_virtualZone key, causing it to receive 3.26x more + * partitions than expected (2,166 vs 664) due to becoming a singleton fault zone. + * + * Fix: Instances missing required fault zone configuration are now excluded from rebalancing + * instead of creating singleton fault zones, preventing partition imbalance. + */ +public class TestCrushRebalanceMissingVirtualTopologyFixed extends ZkTestBase { + + private final String className = getShortClassName(); + private final String clusterName = CLUSTER_PREFIX + "_" + className; + + // Venice-like configuration + private static final String TOPOLOGY = "/mz_virtualZone/host/applicationInstanceId"; + private static final String FAULT_ZONE_TYPE = "mz_virtualZone"; + private static final int PARTITIONS = 500; // Reduced for more dramatic effect + private static final int REPLICAS = 3; + private static final String DB_NAME = "VeniceTest"; + + private ClusterControllerManager controller; + private List participants = new ArrayList<>(); + private List nodes = new ArrayList<>(); + private Set allDBs = new HashSet<>(); + + @BeforeClass + @Override + public void beforeClass() throws Exception { + super.beforeClass(); + System.out.println("START " + className + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(clusterName, true); + + // Configure Venice-like virtual topology cluster + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setTopology(TOPOLOGY); + clusterConfig.setFaultZoneType(FAULT_ZONE_TYPE); + clusterConfig.setTopologyAwareEnabled(true); + configAccessor.setClusterConfig(clusterName, clusterConfig); + + // Add "good" instances with proper virtual topology configuration + addGoodInstances(); + + // Add the "bad" instance with missing virtual topology key + addBadInstance(); + + // Start all participants + for (String node : nodes) { + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, node); + participant.syncStart(); + participants.add(participant); + } + + // Start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + controller = new ClusterControllerManager(ZK_ADDR, clusterName, controllerName); + controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, clusterName, true); + enableTopologyAwareRebalance(_gZkClient, clusterName, true); + } + + /** + * Add "good" instances that have proper virtual topology configuration. + * These instances will be distributed across multiple virtual zones as expected. + * Creates 3 zones with 2 instances each to ensure we have enough fault zones for 3 replicas. + */ + private void addGoodInstances() { + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + + // Add instances across 3 virtual zones to support 3 replicas + int portBase = 12000; + int globalAppId = 0; // Global counter to ensure unique applicationInstanceId + for (int zoneId = 0; zoneId < 3; zoneId++) { + for (int hostId = 0; hostId < 2; hostId++) { + // Use hostname_port format so Helix can properly parse hostname and port + int port = portBase + (zoneId * 100) + (hostId * 10); + String instanceName = "host_" + zoneId + "_" + hostId + "_" + port; + nodes.add(instanceName); + + _gSetupTool.addInstanceToCluster(clusterName, instanceName); + + // Get the existing config and modify it + InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName); + + // Set the critical virtual topology configuration + // Use globalAppId to ensure unique applicationInstanceId (leaf node must be globally unique) + instanceConfig.setDomain("mz_virtualZone=zone_" + zoneId + ",host=host_" + zoneId + "_" + hostId + + ",applicationInstanceId=app_" + globalAppId); + + _gSetupTool.getClusterManagementTool().setInstanceConfig(clusterName, instanceName, instanceConfig); + globalAppId++; // Increment for next instance + } + } + } + + /** + * Add the "bad" instance that is missing the virtual topology key. + * This simulates the Venice production incident where one instance joined + * without proper mz_virtualZone configuration. + */ + private void addBadInstance() { + // Use hostname_port format so Helix can properly parse hostname and port + String badInstanceName = "bad_host_9999"; + nodes.add(badInstanceName); + + _gSetupTool.addInstanceToCluster(clusterName, badInstanceName); + + // Get the existing config and modify it + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + InstanceConfig badInstanceConfig = configAccessor.getInstanceConfig(clusterName, badInstanceName); + + // This is the problem: missing mz_virtualZone in domain configuration + // With the fix, this instance will be excluded from rebalancing + // Use unique applicationInstanceId (leaf node must be globally unique) + badInstanceConfig.setDomain("host=bad_host,applicationInstanceId=bad_app_999"); + + _gSetupTool.getClusterManagementTool().setInstanceConfig(clusterName, badInstanceName, badInstanceConfig); + } + + @Test + public void testMissingVirtualTopologyExcluded() { + // Add resource with CRUSH rebalancer strategy + _gSetupTool.addResourceToCluster(clusterName, DB_NAME, PARTITIONS, + BuiltInStateModelDefinitions.OnlineOffline.name(), + RebalanceMode.FULL_AUTO.name(), CrushRebalanceStrategy.class.getName()); + allDBs.add(DB_NAME); + + _gSetupTool.rebalanceStorageCluster(clusterName, DB_NAME, REPLICAS); + + // Wait for cluster to stabilize + ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName) + .setZkClient(_gZkClient).setResources(allDBs).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + // Analyze partition distribution + ExternalView externalView = _gSetupTool.getClusterManagementTool().getResourceExternalView(clusterName, DB_NAME); + Map partitionCounts = countPartitionsPerInstance(externalView); + + // Print distribution for debugging + System.out.println("Partition distribution (with fix applied):"); + for (Map.Entry entry : partitionCounts.entrySet()) { + System.out.println(" " + entry.getKey() + ": " + entry.getValue() + " partitions"); + } + + // Validate the fix: bad instance should be excluded from rebalancing (get 0 partitions) + String badInstanceName = "bad_host_9999"; + int badInstancePartitions = partitionCounts.getOrDefault(badInstanceName, 0); + + // Calculate expected partitions per instance if distributed evenly among valid instances + int validInstances = nodes.size() - 1; // Exclude the bad instance + int expectedPartitionsPerInstance = (PARTITIONS * REPLICAS) / validInstances; + + System.out.println("Total instances: " + nodes.size()); + System.out.println("Valid instances: " + validInstances); + System.out.println("Expected partitions per valid instance: " + expectedPartitionsPerInstance); + System.out.println("Bad instance got: " + badInstancePartitions + " partitions"); + + // The bad instance should receive 0 partitions since it's excluded from rebalancing + Assert.assertEquals(badInstancePartitions, 0, + "Bad instance should be excluded from rebalancing and receive 0 partitions due to missing fault zone configuration"); + + // Verify that good instances have balanced distribution + int totalPartitionsAssigned = 0; + for (String instance : nodes) { + if (!instance.equals(badInstanceName)) { + int goodInstancePartitions = partitionCounts.getOrDefault(instance, 0); + totalPartitionsAssigned += goodInstancePartitions; + double goodRatio = (double) goodInstancePartitions / expectedPartitionsPerInstance; + System.out.println("Instance " + instance + ": " + goodInstancePartitions + " partitions (ratio: " + goodRatio + "x)"); + + Assert.assertTrue(goodRatio > 0.5 && goodRatio < 1.5, + "Good instances should have balanced allocation. Instance " + instance + + " has ratio " + goodRatio + "x, partitions: " + goodInstancePartitions + + ", expected: " + expectedPartitionsPerInstance); + } + } + + // Verify all partitions are still assigned (just distributed among valid instances) + int expectedTotalPartitions = PARTITIONS * REPLICAS; + Assert.assertEquals(totalPartitionsAssigned, expectedTotalPartitions, + "All partitions should be assigned to valid instances. Expected: " + expectedTotalPartitions + + ", Actual: " + totalPartitionsAssigned); + + // Demonstrate the root cause: verify topology mapping shows default values + validateTopologyMapping(); + } + + /** + * Count how many partitions each instance is assigned to. + */ + private Map countPartitionsPerInstance(ExternalView externalView) { + Map counts = new HashMap<>(); + + for (String partition : externalView.getPartitionSet()) { + Map stateMap = externalView.getStateMap(partition); + for (String instance : stateMap.keySet()) { + counts.put(instance, counts.getOrDefault(instance, 0) + 1); + } + } + + return counts; + } + + /** + * Validate that the topology mapping shows the root cause of the issue. + * The bad instance should have default values for missing topology keys. + */ + private void validateTopologyMapping() { + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + + // Check instance configurations to see the topology mapping + for (String instance : nodes) { + InstanceConfig config = configAccessor.getInstanceConfig(clusterName, instance); + String domain = config.getDomainAsString(); + System.out.println("Instance " + instance + " domain: " + domain); + + if (instance.contains("bad_host")) { + // The bad instance should be missing mz_virtualZone in its domain + Assert.assertFalse(domain.contains("mz_virtualZone"), + "Bad instance should not have mz_virtualZone configured"); + } else { + // Good instances should have proper mz_virtualZone + Assert.assertTrue(domain.contains("mz_virtualZone"), + "Good instances should have mz_virtualZone configured"); + } + } + } + + @AfterClass + public void afterClass() { + if (controller != null && controller.isConnected()) { + controller.syncStop(); + } + for (MockParticipantManager participant : participants) { + if (participant != null && participant.isConnected()) { + participant.syncStop(); + } + } + deleteCluster(clusterName); + System.out.println("END " + className + " at " + new Date(System.currentTimeMillis())); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistClusterControllerStateModel.java b/helix-core/src/test/java/org/apache/helix/participant/TestDistClusterControllerStateModel.java new file mode 100644 index 0000000000..e69de29bb2 From ee4f90a222c5397203f66b08a24807e3006433a2 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Mon, 18 Aug 2025 13:28:51 +0530 Subject: [PATCH 2/7] rebased with dev --- .vscode/settings.json | 3 --- ZkDataExplorer.java | 0 config/external | 1 - .../helix/participant/TestDistClusterControllerStateModel.java | 0 4 files changed, 4 deletions(-) delete mode 100644 .vscode/settings.json delete mode 100644 ZkDataExplorer.java delete mode 160000 config/external delete mode 100644 helix-core/src/test/java/org/apache/helix/participant/TestDistClusterControllerStateModel.java diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 7b016a89fb..0000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "java.compile.nullAnalysis.mode": "automatic" -} \ No newline at end of file diff --git a/ZkDataExplorer.java b/ZkDataExplorer.java deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/config/external b/config/external deleted file mode 160000 index c98a5a57fc..0000000000 --- a/config/external +++ /dev/null @@ -1 +0,0 @@ -Subproject commit c98a5a57fc0763c010233517c019f87a1c6bc428 diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistClusterControllerStateModel.java b/helix-core/src/test/java/org/apache/helix/participant/TestDistClusterControllerStateModel.java deleted file mode 100644 index e69de29bb2..0000000000 From f37af989abfefc6d0440c0d081ec5971b35bde87 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Mon, 18 Aug 2025 13:30:14 +0530 Subject: [PATCH 3/7] rebased with dev --- .../src/main/java/org/apache/helix/examples/ZkDataExplorer.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java diff --git a/helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java b/helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java deleted file mode 100644 index e69de29bb2..0000000000 From 11d2aec12fae5152fb7133414b69cfab6b55f974 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Mon, 18 Aug 2025 14:00:27 +0530 Subject: [PATCH 4/7] fixed the test cases --- .../topology/MissingFaultZoneException.java | 36 ------------------- .../rebalancer/topology/Topology.java | 11 ------ ...hRebalanceMissingVirtualTopologyFixed.java | 8 ++--- 3 files changed, 4 insertions(+), 51 deletions(-) delete mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/MissingFaultZoneException.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/MissingFaultZoneException.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/MissingFaultZoneException.java deleted file mode 100644 index bb5520a4d7..0000000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/MissingFaultZoneException.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.helix.controller.rebalancer.topology; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * Exception thrown when an instance is missing required fault zone configuration. - * This exception is used to exclude instances from rebalancing to prevent - * singleton fault zones that can cause partition imbalance. - */ -public class MissingFaultZoneException extends IllegalArgumentException { - - public MissingFaultZoneException(String message) { - super(message); - } - - public MissingFaultZoneException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java index 62c7847659..66e684e718 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -19,7 +19,6 @@ * under the License. */ -import io.netty.util.internal.StringUtil; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -204,16 +203,6 @@ private Node createClusterTree(ClusterConfig clusterConfig, boolean faultZoneLev // the topology tree. The topology tree only requires FaultZoneType and EndNodeType. unnecessaryTopoKeys.forEach(instanceTopologyMap::remove); } - // Check if fault zone is missing from original domain configuration - exclude instance to prevent singleton fault zones - String faultZoneType = _clusterTopologyConfig.getFaultZoneType(); - Map originalDomain = insConfig.getDomainAsMap(); - - if (!originalDomain.containsKey(faultZoneType) || StringUtil.isNullOrEmpty(originalDomain.get(faultZoneType))) { - logger.warn("Instance '{}' excluded from topology: fault zone '{}' is missing from original domain configuration. " + - "Domain: '{}'. This prevents singleton fault zones.", - instanceName, faultZoneType, insConfig.getDomainAsString()); - continue; // Skip this instance - } addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances); } catch (InstanceConfigMismatchException e) { logger.warn("Topology setting {} for instance {} is unset or invalid due to mismatch with cluster topology " diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java index 72377f7ca8..e2e23f7990 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java @@ -41,7 +41,6 @@ public class TestCrushRebalanceMissingVirtualTopologyFixed extends ZkTestBase { private final String className = getShortClassName(); private final String clusterName = CLUSTER_PREFIX + "_" + className; - // Venice-like configuration private static final String TOPOLOGY = "/mz_virtualZone/host/applicationInstanceId"; private static final String FAULT_ZONE_TYPE = "mz_virtualZone"; private static final int PARTITIONS = 500; // Reduced for more dramatic effect @@ -67,6 +66,7 @@ public void beforeClass() throws Exception { clusterConfig.setTopology(TOPOLOGY); clusterConfig.setFaultZoneType(FAULT_ZONE_TYPE); clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setRequiredInstanceTopologyKeys(List.of(FAULT_ZONE_TYPE)); configAccessor.setClusterConfig(clusterName, clusterConfig); // Add "good" instances with proper virtual topology configuration @@ -199,10 +199,10 @@ public void testMissingVirtualTopologyExcluded() { totalPartitionsAssigned += goodInstancePartitions; double goodRatio = (double) goodInstancePartitions / expectedPartitionsPerInstance; System.out.println("Instance " + instance + ": " + goodInstancePartitions + " partitions (ratio: " + goodRatio + "x)"); - + Assert.assertTrue(goodRatio > 0.5 && goodRatio < 1.5, "Good instances should have balanced allocation. Instance " + instance + - " has ratio " + goodRatio + "x, partitions: " + goodInstancePartitions + + " has ratio " + goodRatio + "x, partitions: " + goodInstancePartitions + ", expected: " + expectedPartitionsPerInstance); } } @@ -210,7 +210,7 @@ public void testMissingVirtualTopologyExcluded() { // Verify all partitions are still assigned (just distributed among valid instances) int expectedTotalPartitions = PARTITIONS * REPLICAS; Assert.assertEquals(totalPartitionsAssigned, expectedTotalPartitions, - "All partitions should be assigned to valid instances. Expected: " + expectedTotalPartitions + + "All partitions should be assigned to valid instances. Expected: " + expectedTotalPartitions + ", Actual: " + totalPartitionsAssigned); // Demonstrate the root cause: verify topology mapping shows default values From c57bd48fcbec836d011350d88287319fac500df4 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Mon, 18 Aug 2025 14:01:21 +0530 Subject: [PATCH 5/7] fixed the test cases --- .../apache/helix/controller/rebalancer/topology/Topology.java | 1 + 1 file changed, 1 insertion(+) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java index 66e684e718..170464f218 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -261,6 +261,7 @@ private static LinkedHashMap computeInstanceTopologyMapHelper( int numOfMatchedKeys = 0; boolean shouldThrowExceptionDueToMissingConfigs = false; for (String key : clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()) { + // if a key does not exist in the instance domain config, using the default domain value. String value = domainAsMap.get(key); if (value == null || value.isEmpty()) { value = clusterTopologyConfig.getTopologyKeyDefaultValue().get(key); From e812c8d15392ac2409cb103be2d30695bf2ec64c Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Tue, 19 Aug 2025 10:49:46 +0530 Subject: [PATCH 6/7] Combined the virtual topology tests --- ...tCrushRebalanceMissingVirtualTopology.java | 346 ++++++++++++------ ...hRebalanceMissingVirtualTopologyFixed.java | 274 -------------- 2 files changed, 238 insertions(+), 382 deletions(-) delete mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java index 1425e3444b..c3a83bf4bd 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java @@ -21,84 +21,268 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; /** - * Test case reproducing the Venice production incident where missing virtual topology configuration + * Combined test case for missing virtual topology configuration * leads to disproportionate partition assignment in CRUSH rebalancer. - * - * Background: Venice HAR cluster with topology /mz_virtualZone/host/applicationInstanceId - * had one instance join with missing mz_virtualZone key, causing it to receive 3.26x more - * partitions than expected (2,166 vs 664) due to becoming a singleton fault zone. + *

+ * Background: cluster with topology /mz_virtualZone/host/applicationInstanceId + * had one instance join with missing mz_virtualZone key, causing it to receive significantly more + * partitions than expected due to becoming a singleton fault zone. + *

+ * This test contains both scenarios: + * 1. Original problem demonstration (showing over-allocation to bad instance) + * 2. Fixed behavior (showing bad instance exclusion from rebalancing) */ public class TestCrushRebalanceMissingVirtualTopology extends ZkTestBase { private final String className = getShortClassName(); - private final String clusterName = CLUSTER_PREFIX + "_" + className; + private String clusterName; - // Venice-like configuration private static final String TOPOLOGY = "/mz_virtualZone/host/applicationInstanceId"; private static final String FAULT_ZONE_TYPE = "mz_virtualZone"; private static final int PARTITIONS = 500; // Reduced for more dramatic effect private static final int REPLICAS = 3; - private static final String DB_NAME = "VeniceTest"; + private static final String DB_NAME = "TestDB"; private ClusterControllerManager controller; private List participants = new ArrayList<>(); private List nodes = new ArrayList<>(); private Set allDBs = new HashSet<>(); - @BeforeClass - @Override - public void beforeClass() throws Exception { - super.beforeClass(); - System.out.println("START " + className + " at " + new Date(System.currentTimeMillis())); + @BeforeMethod + public void beforeMethod() { + // Create unique cluster name for each test + clusterName = CLUSTER_PREFIX + "_" + className + "_" + System.currentTimeMillis(); + // Reset collections for each test + participants.clear(); + nodes.clear(); + allDBs.clear(); + + System.out.println("START test in " + className + " at " + new Date(System.currentTimeMillis())); + } + + @AfterMethod + public void afterMethod() { + if (controller != null && controller.isConnected()) { + controller.syncStop(); + controller = null; + } + for (MockParticipantManager participant : participants) { + if (participant != null && participant.isConnected()) { + participant.syncStop(); + } + } + if (clusterName != null) { + deleteCluster(clusterName); + } + System.out.println("END test in " + className + " at " + new Date(System.currentTimeMillis())); + } + + /** + * Test the original problem: missing virtual topology configuration leads to over-allocation. + * This demonstrates a bad instance received ~3x more partitions. + */ + @Test + public void testMissingVirtualConfig_whenRequiredInstanceTopologyKeysNotPresent_TopologyOverAllocationToBadInstance() { + setupClusterWithoutRequiredInstanceTopologyKeys(); + + // Add resource with CRUSH rebalancer strategy + _gSetupTool.addResourceToCluster(clusterName, DB_NAME, PARTITIONS, + BuiltInStateModelDefinitions.OnlineOffline.name(), + RebalanceMode.FULL_AUTO.name(), CrushRebalanceStrategy.class.getName()); + allDBs.add(DB_NAME); + + _gSetupTool.rebalanceStorageCluster(clusterName, DB_NAME, REPLICAS); + + // Wait for cluster to stabilize + ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName) + .setZkClient(_gZkClient).setResources(allDBs).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + // Analyze partition distribution + ExternalView externalView = _gSetupTool.getClusterManagementTool().getResourceExternalView(clusterName, DB_NAME); + Map partitionCounts = countPartitionsPerInstance(externalView); + + // Print distribution for debugging + System.out.println("Partition distribution (original problem):"); + for (Map.Entry entry : partitionCounts.entrySet()) { + System.out.println(" " + entry.getKey() + ": " + entry.getValue() + " partitions"); + } + + // Validate: bad instance should get disproportionate allocation + String badInstanceName = "bad_host_9999"; + int badInstancePartitions = partitionCounts.getOrDefault(badInstanceName, 0); + + // Calculate expected partitions per instance if distributed evenly + int totalInstances = nodes.size(); + int expectedPartitionsPerInstance = (PARTITIONS * REPLICAS) / totalInstances; + + System.out.println("Expected partitions per instance: " + expectedPartitionsPerInstance); + System.out.println("Bad instance got: " + badInstancePartitions); + + // The bad instance should receive significantly more than expected due to singleton fault zone + // With CRUSH and singleton fault zone, we expect at least 1.5x over-allocation + double overAllocationRatio = (double) badInstancePartitions / expectedPartitionsPerInstance; + System.out.println("Over-allocation ratio: " + overAllocationRatio + "x"); + + Assert.assertTrue(overAllocationRatio > 1.5, + "Bad instance should receive significantly more partitions due to singleton fault zone. " + + "Expected >1.5x, got " + overAllocationRatio + "x"); + + // Verify that good instances have more balanced distribution + for (String instance : nodes) { + if (!instance.equals(badInstanceName)) { + int goodInstancePartitions = partitionCounts.getOrDefault(instance, 0); + double goodRatio = (double) goodInstancePartitions / expectedPartitionsPerInstance; + Assert.assertTrue(goodRatio < 1.5, + "Good instances should have more balanced allocation. Instance " + instance + + " has ratio " + goodRatio + "x"); + } + } + + // Demonstrate the root cause: verify topology mapping shows default values + validateTopologyMapping(); + } + + /** + * Test the fixed behavior: missing virtual topology configuration leads to instance exclusion. + * This demonstrates the fix where bad instances are excluded from rebalancing. + */ + @Test + public void testMissingVirtualConfig_whenRequiredInstanceTopologyKeysIsPresent_BadInstanceExcludedFromRebalancing() { + setupClusterWithFaultZoneTypeAsRequiredInstanceTopologyKeys(); + + // Add resource with CRUSH rebalancer strategy + _gSetupTool.addResourceToCluster(clusterName, DB_NAME, PARTITIONS, + BuiltInStateModelDefinitions.OnlineOffline.name(), + RebalanceMode.FULL_AUTO.name(), CrushRebalanceStrategy.class.getName()); + allDBs.add(DB_NAME); + + _gSetupTool.rebalanceStorageCluster(clusterName, DB_NAME, REPLICAS); + + // Wait for cluster to stabilize + ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName) + .setZkClient(_gZkClient).setResources(allDBs).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + // Analyze partition distribution + ExternalView externalView = _gSetupTool.getClusterManagementTool().getResourceExternalView(clusterName, DB_NAME); + Map partitionCounts = countPartitionsPerInstance(externalView); + + // Print distribution for debugging + System.out.println("Partition distribution (with fix applied):"); + for (Map.Entry entry : partitionCounts.entrySet()) { + System.out.println(" " + entry.getKey() + ": " + entry.getValue() + " partitions"); + } + + // Validate the fix: bad instance should be excluded from rebalancing (get 0 partitions) + String badInstanceName = "bad_host_9999"; + int badInstancePartitions = partitionCounts.getOrDefault(badInstanceName, 0); + + // Calculate expected partitions per instance if distributed evenly among valid instances + int validInstances = nodes.size() - 1; // Exclude the bad instance + int expectedPartitionsPerInstance = (PARTITIONS * REPLICAS) / validInstances; + + System.out.println("Total instances: " + nodes.size()); + System.out.println("Valid instances: " + validInstances); + System.out.println("Expected partitions per valid instance: " + expectedPartitionsPerInstance); + System.out.println("Bad instance got: " + badInstancePartitions + " partitions"); + + // The bad instance should receive 0 partitions since it's excluded from rebalancing + Assert.assertEquals(badInstancePartitions, 0, + "Bad instance should be excluded from rebalancing and receive 0 partitions due to missing fault zone configuration"); + + // Verify that good instances have balanced distribution + int totalPartitionsAssigned = 0; + for (String instance : nodes) { + if (!instance.equals(badInstanceName)) { + int goodInstancePartitions = partitionCounts.getOrDefault(instance, 0); + totalPartitionsAssigned += goodInstancePartitions; + double goodRatio = (double) goodInstancePartitions / expectedPartitionsPerInstance; + System.out.println("Instance " + instance + ": " + goodInstancePartitions + " partitions (ratio: " + goodRatio + "x)"); + + Assert.assertTrue(goodRatio > 0.5 && goodRatio < 1.5, + "Good instances should have balanced allocation. Instance " + instance + + " has ratio " + goodRatio + "x, partitions: " + goodInstancePartitions + + ", expected: " + expectedPartitionsPerInstance); + } + } + + // Verify all partitions are still assigned (just distributed among valid instances) + int expectedTotalPartitions = PARTITIONS * REPLICAS; + Assert.assertEquals(totalPartitionsAssigned, expectedTotalPartitions, + "All partitions should be assigned to valid instances. Expected: " + expectedTotalPartitions + + ", Actual: " + totalPartitionsAssigned); + + // Demonstrate the root cause: verify topology mapping shows default values + validateTopologyMapping(); + } + + /** + * Setup cluster configuration without the fix (original problem scenario). + * Uses 2 fault zones which can cause over-allocation to singleton zone. + */ + private void setupClusterWithoutRequiredInstanceTopologyKeys() { _gSetupTool.addCluster(clusterName, true); - // Configure Venice-like virtual topology cluster + // Configure virtual topology cluster (without requiredInstanceTopologyKeys) ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); clusterConfig.setTopology(TOPOLOGY); clusterConfig.setFaultZoneType(FAULT_ZONE_TYPE); clusterConfig.setTopologyAwareEnabled(true); + // Note: NOT setting requiredInstanceTopologyKeys to reproduce original problem configAccessor.setClusterConfig(clusterName, clusterConfig); - // Add "good" instances with proper virtual topology configuration - addGoodInstances(); + // Add "good" instances with proper virtual topology configuration (2 zones) + addGoodInstances(2); // Add the "bad" instance with missing virtual topology key addBadInstance(); - // Start all participants - for (String node : nodes) { - MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, node); - participant.syncStart(); - participants.add(participant); - } + startParticipantsAndController(); + } - // Start controller - String controllerName = CONTROLLER_PREFIX + "_0"; - controller = new ClusterControllerManager(ZK_ADDR, clusterName, controllerName); - controller.syncStart(); + /** + * Setup cluster configuration with the fix (exclusion scenario). + * Uses 3 fault zones and requiredInstanceTopologyKeys to exclude bad instances. + */ + private void setupClusterWithFaultZoneTypeAsRequiredInstanceTopologyKeys() { + _gSetupTool.addCluster(clusterName, true); - enablePersistBestPossibleAssignment(_gZkClient, clusterName, true); - enableTopologyAwareRebalance(_gZkClient, clusterName, true); + // Configure virtual topology cluster (with requiredInstanceTopologyKeys fix) + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setTopology(TOPOLOGY); + clusterConfig.setFaultZoneType(FAULT_ZONE_TYPE); + clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setRequiredInstanceTopologyKeys(List.of(FAULT_ZONE_TYPE)); // This is the fix + configAccessor.setClusterConfig(clusterName, clusterConfig); + + // Add "good" instances with proper virtual topology configuration (3 zones) + addGoodInstances(3); + + // Add the "bad" instance with missing virtual topology key + addBadInstance(); + + startParticipantsAndController(); } /** - * Add "good" instances that have proper virtual topology configuration. - * These instances will be distributed across multiple virtual zones as expected. + * Add "good" instances for fixed scenario. */ - private void addGoodInstances() { + private void addGoodInstances(int zoneCount) { ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); - // Add instances across 2 virtual zones with fewer instances to amplify the singleton effect + // Add instances across 3 virtual zones to support 3 replicas int portBase = 12000; int globalAppId = 0; // Global counter to ensure unique applicationInstanceId - for (int zoneId = 0; zoneId < 2; zoneId++) { + for (int zoneId = 0; zoneId < zoneCount; zoneId++) { for (int hostId = 0; hostId < 2; hostId++) { // Use hostname_port format so Helix can properly parse hostname and port int port = portBase + (zoneId * 100) + (hostId * 10); @@ -123,7 +307,7 @@ private void addGoodInstances() { /** * Add the "bad" instance that is missing the virtual topology key. - * This simulates the Venice production incident where one instance joined + * This simulates the incident where one instance joined * without proper mz_virtualZone configuration. */ private void addBadInstance() { @@ -138,73 +322,33 @@ private void addBadInstance() { InstanceConfig badInstanceConfig = configAccessor.getInstanceConfig(clusterName, badInstanceName); // This is the problem: missing mz_virtualZone in domain configuration - // When mz_virtualZone is missing, Topology.java will assign default value - // "Helix_default_mz_virtualZone", creating a singleton fault zone + // When mz_virtualZone is missing, Topology.java will either: + // 1. Assign default value "Helix_default_mz_virtualZone" (original behavior - creates singleton fault zone) + // 2. Exclude the instance from rebalancing (fixed behavior - with requiredInstanceTopologyKeys) // Use unique applicationInstanceId (leaf node must be globally unique) badInstanceConfig.setDomain("host=bad_host,applicationInstanceId=bad_app_999"); _gSetupTool.getClusterManagementTool().setInstanceConfig(clusterName, badInstanceName, badInstanceConfig); } - @Test - public void testMissingVirtualTopologyOverAllocation() { - // Add resource with CRUSH rebalancer strategy - _gSetupTool.addResourceToCluster(clusterName, DB_NAME, PARTITIONS, - BuiltInStateModelDefinitions.OnlineOffline.name(), - RebalanceMode.FULL_AUTO.name(), CrushRebalanceStrategy.class.getName()); - allDBs.add(DB_NAME); - - _gSetupTool.rebalanceStorageCluster(clusterName, DB_NAME, REPLICAS); - - // Wait for cluster to stabilize - ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName) - .setZkClient(_gZkClient).setResources(allDBs).build(); - Assert.assertTrue(verifier.verifyByPolling()); - - // Analyze partition distribution - ExternalView externalView = _gSetupTool.getClusterManagementTool().getResourceExternalView(clusterName, DB_NAME); - Map partitionCounts = countPartitionsPerInstance(externalView); - - // Print distribution for debugging - System.out.println("Partition distribution:"); - for (Map.Entry entry : partitionCounts.entrySet()) { - System.out.println(" " + entry.getKey() + ": " + entry.getValue() + " partitions"); + /** + * Start all participants and controller. + */ + private void startParticipantsAndController() { + // Start all participants + for (String node : nodes) { + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, node); + participant.syncStart(); + participants.add(participant); } - // Validate the Venice incident: bad instance should get disproportionate allocation - String badInstanceName = "bad_host_9999"; - int badInstancePartitions = partitionCounts.getOrDefault(badInstanceName, 0); - - // Calculate expected partitions per instance if distributed evenly - int totalInstances = nodes.size(); - int expectedPartitionsPerInstance = (PARTITIONS * REPLICAS) / totalInstances; - - System.out.println("Expected partitions per instance: " + expectedPartitionsPerInstance); - System.out.println("Bad instance got: " + badInstancePartitions); - - // The bad instance should receive significantly more than expected due to singleton fault zone - // In Venice's case, it was 3.26x more (2,166 vs 664) - // With CRUSH and singleton fault zone, we expect at least 1.5x over-allocation - double overAllocationRatio = (double) badInstancePartitions / expectedPartitionsPerInstance; - System.out.println("Over-allocation ratio: " + overAllocationRatio + "x"); - - Assert.assertTrue(overAllocationRatio > 1.5, - "Bad instance should receive significantly more partitions due to singleton fault zone. " + - "Expected >1.5x, got " + overAllocationRatio + "x"); - - // Verify that good instances have more balanced distribution - for (String instance : nodes) { - if (!instance.equals(badInstanceName)) { - int goodInstancePartitions = partitionCounts.getOrDefault(instance, 0); - double goodRatio = (double) goodInstancePartitions / expectedPartitionsPerInstance; - Assert.assertTrue(goodRatio < 1.5, - "Good instances should have more balanced allocation. Instance " + instance + - " has ratio " + goodRatio + "x"); - } - } + // Start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + controller = new ClusterControllerManager(ZK_ADDR, clusterName, controllerName); + controller.syncStart(); - // Demonstrate the root cause: verify topology mapping shows default values - validateTopologyMapping(); + enablePersistBestPossibleAssignment(_gZkClient, clusterName, true); + enableTopologyAwareRebalance(_gZkClient, clusterName, true); } /** @@ -247,18 +391,4 @@ private void validateTopologyMapping() { } } } - - @AfterClass - public void afterClass() { - if (controller != null && controller.isConnected()) { - controller.syncStop(); - } - for (MockParticipantManager participant : participants) { - if (participant != null && participant.isConnected()) { - participant.syncStop(); - } - } - deleteCluster(clusterName); - System.out.println("END " + className + " at " + new Date(System.currentTimeMillis())); - } } \ No newline at end of file diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java deleted file mode 100644 index e2e23f7990..0000000000 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopologyFixed.java +++ /dev/null @@ -1,274 +0,0 @@ -package org.apache.helix.integration.rebalancer.CrushRebalancers; - -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.helix.ConfigAccessor; -import org.apache.helix.common.ZkTestBase; -import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.helix.integration.manager.ClusterControllerManager; -import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.model.BuiltInStateModelDefinitions; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState.RebalanceMode; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; -import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -/** - * Test case verifying the fix for the Venice production incident where missing virtual topology - * configuration led to disproportionate partition assignment in CRUSH rebalancer. - * - * Background: Venice HAR cluster with topology /mz_virtualZone/host/applicationInstanceId - * had one instance join with missing mz_virtualZone key, causing it to receive 3.26x more - * partitions than expected (2,166 vs 664) due to becoming a singleton fault zone. - * - * Fix: Instances missing required fault zone configuration are now excluded from rebalancing - * instead of creating singleton fault zones, preventing partition imbalance. - */ -public class TestCrushRebalanceMissingVirtualTopologyFixed extends ZkTestBase { - - private final String className = getShortClassName(); - private final String clusterName = CLUSTER_PREFIX + "_" + className; - - private static final String TOPOLOGY = "/mz_virtualZone/host/applicationInstanceId"; - private static final String FAULT_ZONE_TYPE = "mz_virtualZone"; - private static final int PARTITIONS = 500; // Reduced for more dramatic effect - private static final int REPLICAS = 3; - private static final String DB_NAME = "VeniceTest"; - - private ClusterControllerManager controller; - private List participants = new ArrayList<>(); - private List nodes = new ArrayList<>(); - private Set allDBs = new HashSet<>(); - - @BeforeClass - @Override - public void beforeClass() throws Exception { - super.beforeClass(); - System.out.println("START " + className + " at " + new Date(System.currentTimeMillis())); - - _gSetupTool.addCluster(clusterName, true); - - // Configure Venice-like virtual topology cluster - ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); - ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); - clusterConfig.setTopology(TOPOLOGY); - clusterConfig.setFaultZoneType(FAULT_ZONE_TYPE); - clusterConfig.setTopologyAwareEnabled(true); - clusterConfig.setRequiredInstanceTopologyKeys(List.of(FAULT_ZONE_TYPE)); - configAccessor.setClusterConfig(clusterName, clusterConfig); - - // Add "good" instances with proper virtual topology configuration - addGoodInstances(); - - // Add the "bad" instance with missing virtual topology key - addBadInstance(); - - // Start all participants - for (String node : nodes) { - MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, node); - participant.syncStart(); - participants.add(participant); - } - - // Start controller - String controllerName = CONTROLLER_PREFIX + "_0"; - controller = new ClusterControllerManager(ZK_ADDR, clusterName, controllerName); - controller.syncStart(); - - enablePersistBestPossibleAssignment(_gZkClient, clusterName, true); - enableTopologyAwareRebalance(_gZkClient, clusterName, true); - } - - /** - * Add "good" instances that have proper virtual topology configuration. - * These instances will be distributed across multiple virtual zones as expected. - * Creates 3 zones with 2 instances each to ensure we have enough fault zones for 3 replicas. - */ - private void addGoodInstances() { - ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); - - // Add instances across 3 virtual zones to support 3 replicas - int portBase = 12000; - int globalAppId = 0; // Global counter to ensure unique applicationInstanceId - for (int zoneId = 0; zoneId < 3; zoneId++) { - for (int hostId = 0; hostId < 2; hostId++) { - // Use hostname_port format so Helix can properly parse hostname and port - int port = portBase + (zoneId * 100) + (hostId * 10); - String instanceName = "host_" + zoneId + "_" + hostId + "_" + port; - nodes.add(instanceName); - - _gSetupTool.addInstanceToCluster(clusterName, instanceName); - - // Get the existing config and modify it - InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName); - - // Set the critical virtual topology configuration - // Use globalAppId to ensure unique applicationInstanceId (leaf node must be globally unique) - instanceConfig.setDomain("mz_virtualZone=zone_" + zoneId + ",host=host_" + zoneId + "_" + hostId - + ",applicationInstanceId=app_" + globalAppId); - - _gSetupTool.getClusterManagementTool().setInstanceConfig(clusterName, instanceName, instanceConfig); - globalAppId++; // Increment for next instance - } - } - } - - /** - * Add the "bad" instance that is missing the virtual topology key. - * This simulates the Venice production incident where one instance joined - * without proper mz_virtualZone configuration. - */ - private void addBadInstance() { - // Use hostname_port format so Helix can properly parse hostname and port - String badInstanceName = "bad_host_9999"; - nodes.add(badInstanceName); - - _gSetupTool.addInstanceToCluster(clusterName, badInstanceName); - - // Get the existing config and modify it - ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); - InstanceConfig badInstanceConfig = configAccessor.getInstanceConfig(clusterName, badInstanceName); - - // This is the problem: missing mz_virtualZone in domain configuration - // With the fix, this instance will be excluded from rebalancing - // Use unique applicationInstanceId (leaf node must be globally unique) - badInstanceConfig.setDomain("host=bad_host,applicationInstanceId=bad_app_999"); - - _gSetupTool.getClusterManagementTool().setInstanceConfig(clusterName, badInstanceName, badInstanceConfig); - } - - @Test - public void testMissingVirtualTopologyExcluded() { - // Add resource with CRUSH rebalancer strategy - _gSetupTool.addResourceToCluster(clusterName, DB_NAME, PARTITIONS, - BuiltInStateModelDefinitions.OnlineOffline.name(), - RebalanceMode.FULL_AUTO.name(), CrushRebalanceStrategy.class.getName()); - allDBs.add(DB_NAME); - - _gSetupTool.rebalanceStorageCluster(clusterName, DB_NAME, REPLICAS); - - // Wait for cluster to stabilize - ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName) - .setZkClient(_gZkClient).setResources(allDBs).build(); - Assert.assertTrue(verifier.verifyByPolling()); - - // Analyze partition distribution - ExternalView externalView = _gSetupTool.getClusterManagementTool().getResourceExternalView(clusterName, DB_NAME); - Map partitionCounts = countPartitionsPerInstance(externalView); - - // Print distribution for debugging - System.out.println("Partition distribution (with fix applied):"); - for (Map.Entry entry : partitionCounts.entrySet()) { - System.out.println(" " + entry.getKey() + ": " + entry.getValue() + " partitions"); - } - - // Validate the fix: bad instance should be excluded from rebalancing (get 0 partitions) - String badInstanceName = "bad_host_9999"; - int badInstancePartitions = partitionCounts.getOrDefault(badInstanceName, 0); - - // Calculate expected partitions per instance if distributed evenly among valid instances - int validInstances = nodes.size() - 1; // Exclude the bad instance - int expectedPartitionsPerInstance = (PARTITIONS * REPLICAS) / validInstances; - - System.out.println("Total instances: " + nodes.size()); - System.out.println("Valid instances: " + validInstances); - System.out.println("Expected partitions per valid instance: " + expectedPartitionsPerInstance); - System.out.println("Bad instance got: " + badInstancePartitions + " partitions"); - - // The bad instance should receive 0 partitions since it's excluded from rebalancing - Assert.assertEquals(badInstancePartitions, 0, - "Bad instance should be excluded from rebalancing and receive 0 partitions due to missing fault zone configuration"); - - // Verify that good instances have balanced distribution - int totalPartitionsAssigned = 0; - for (String instance : nodes) { - if (!instance.equals(badInstanceName)) { - int goodInstancePartitions = partitionCounts.getOrDefault(instance, 0); - totalPartitionsAssigned += goodInstancePartitions; - double goodRatio = (double) goodInstancePartitions / expectedPartitionsPerInstance; - System.out.println("Instance " + instance + ": " + goodInstancePartitions + " partitions (ratio: " + goodRatio + "x)"); - - Assert.assertTrue(goodRatio > 0.5 && goodRatio < 1.5, - "Good instances should have balanced allocation. Instance " + instance + - " has ratio " + goodRatio + "x, partitions: " + goodInstancePartitions + - ", expected: " + expectedPartitionsPerInstance); - } - } - - // Verify all partitions are still assigned (just distributed among valid instances) - int expectedTotalPartitions = PARTITIONS * REPLICAS; - Assert.assertEquals(totalPartitionsAssigned, expectedTotalPartitions, - "All partitions should be assigned to valid instances. Expected: " + expectedTotalPartitions + - ", Actual: " + totalPartitionsAssigned); - - // Demonstrate the root cause: verify topology mapping shows default values - validateTopologyMapping(); - } - - /** - * Count how many partitions each instance is assigned to. - */ - private Map countPartitionsPerInstance(ExternalView externalView) { - Map counts = new HashMap<>(); - - for (String partition : externalView.getPartitionSet()) { - Map stateMap = externalView.getStateMap(partition); - for (String instance : stateMap.keySet()) { - counts.put(instance, counts.getOrDefault(instance, 0) + 1); - } - } - - return counts; - } - - /** - * Validate that the topology mapping shows the root cause of the issue. - * The bad instance should have default values for missing topology keys. - */ - private void validateTopologyMapping() { - ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); - - // Check instance configurations to see the topology mapping - for (String instance : nodes) { - InstanceConfig config = configAccessor.getInstanceConfig(clusterName, instance); - String domain = config.getDomainAsString(); - System.out.println("Instance " + instance + " domain: " + domain); - - if (instance.contains("bad_host")) { - // The bad instance should be missing mz_virtualZone in its domain - Assert.assertFalse(domain.contains("mz_virtualZone"), - "Bad instance should not have mz_virtualZone configured"); - } else { - // Good instances should have proper mz_virtualZone - Assert.assertTrue(domain.contains("mz_virtualZone"), - "Good instances should have mz_virtualZone configured"); - } - } - } - - @AfterClass - public void afterClass() { - if (controller != null && controller.isConnected()) { - controller.syncStop(); - } - for (MockParticipantManager participant : participants) { - if (participant != null && participant.isConnected()) { - participant.syncStop(); - } - } - deleteCluster(clusterName); - System.out.println("END " + className + " at " + new Date(System.currentTimeMillis())); - } -} From 673b8fd9c8355e8d27611cd8ba2d41c9e426f534 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Tue, 19 Aug 2025 10:53:03 +0530 Subject: [PATCH 7/7] Combined the virtual topology tests --- .../TestCrushRebalanceMissingVirtualTopology.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java index c3a83bf4bd..f56ebfaab6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushRebalanceMissingVirtualTopology.java @@ -382,11 +382,11 @@ private void validateTopologyMapping() { if (instance.contains("bad_host")) { // The bad instance should be missing mz_virtualZone in its domain - Assert.assertFalse(domain.contains("mz_virtualZone"), + Assert.assertFalse(domain.contains(FAULT_ZONE_TYPE), "Bad instance should not have mz_virtualZone configured"); } else { // Good instances should have proper mz_virtualZone - Assert.assertTrue(domain.contains("mz_virtualZone"), + Assert.assertTrue(domain.contains(FAULT_ZONE_TYPE), "Good instances should have mz_virtualZone configured"); } }