diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/evacuation/PartitionExclusionHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/evacuation/PartitionExclusionHelper.java index 3e084785ac..4e99462c96 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/evacuation/PartitionExclusionHelper.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/evacuation/PartitionExclusionHelper.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -159,16 +160,18 @@ private static boolean shouldExcludePartition(PartitionInfo partition, } /** - * Gets partitions from current states for customized resources that are still assigned - * to the instance (i.e., not yet reassigned to other instances). - * This is used for offline instances. + * Gets partitions that block evacuation for an offline instance with customized resources. + * Uses union semantics: a partition blocks evacuation if it exists in CurrentState (data still + * on the instance) OR if it is assigned to this instance in IdealState (assignment not moved). + * This prevents premature evacuation completion when partition names rotate in IdealState + * (e.g., segment generation changes) while the instance still holds data. * * @param currentStates List of current states * @param idealStates List of ideal states * @param instanceName The instance being checked * @param allowedResources Set of allowed resources (already filtered) * @param filters Exclusion filters to apply - * @return List of partitions that are still assigned to the instance (after exclusions) + * @return List of partitions blocking evacuation (union of CurrentState and IdealState) */ public static List getCustomizedPartitionsStillOnInstance( List currentStates, List idealStates, String instanceName, @@ -178,44 +181,59 @@ public static List getCustomizedPartitionsStillOnInstance( return Collections.emptyList(); } - // Create a map of resourceName -> CurrentState Map currentStateMap = currentStates.stream() .collect(Collectors.toMap(CurrentState::getResourceName, cs -> cs)); - // Create a map of resourceName -> IdealState for CUSTOMIZED resources Map customizedIdealStateMap = idealStates.stream() - .filter(is -> is.getRebalanceMode() == IdealState.RebalanceMode.CUSTOMIZED && - currentStateMap.containsKey(is.getResourceName()) && - allowedResources.contains(is.getResourceName())) + .filter(is -> is.getRebalanceMode() == IdealState.RebalanceMode.CUSTOMIZED + && allowedResources.contains(is.getResourceName())) .collect(Collectors.toMap(IdealState::getResourceName, is -> is)); List partitionsStillOnInstance = new ArrayList<>(); - // For each customized resource, check which partitions are still assigned to the instance for (Map.Entry entry : customizedIdealStateMap.entrySet()) { String resourceName = entry.getKey(); IdealState idealState = entry.getValue(); CurrentState cs = currentStateMap.get(resourceName); + Set seenPartitions = new HashSet<>(); - if (cs == null || cs.getPartitionStateMap() == null) { - continue; - } + // Any non-excluded partition in CurrentState blocks evacuation, regardless of whether + // it appears in IdealState. This handles partition name rotation (e.g., segment + // generation changes) where old-gen partitions in CS no longer match new-gen IS names. + if (cs != null && cs.getPartitionStateMap() != null) { + for (Map.Entry partitionEntry : cs.getPartitionStateMap().entrySet()) { + String partition = partitionEntry.getKey(); + String state = partitionEntry.getValue(); - for (Map.Entry partitionEntry : cs.getPartitionStateMap().entrySet()) { - String partition = partitionEntry.getKey(); - String state = partitionEntry.getValue(); + PartitionInfo partitionInfo = new PartitionInfo(partition, state, resourceName); - PartitionInfo partitionInfo = new PartitionInfo(partition, state, resourceName); + if (shouldExcludePartition(partitionInfo, filters)) { + continue; + } - // Apply exclusion filters - if (shouldExcludePartition(partitionInfo, filters)) { - continue; // Skip excluded partitions + if (seenPartitions.add(partition)) { + partitionsStillOnInstance.add(partitionInfo); + } } + } - // Check if this partition is still assigned to the instance in IdealState - Map instanceStateMap = idealState.getInstanceStateMap(partition); - if (instanceStateMap != null && instanceStateMap.containsKey(instanceName)) { - partitionsStillOnInstance.add(partitionInfo); + // Any partition assigned to this instance in IdealState also blocks evacuation, even if + // not present in CurrentState (e.g., new assignment the offline instance hasn't picked up). + Map> mapFields = idealState.getRecord().getMapFields(); + if (mapFields != null) { + for (Map.Entry> isEntry : mapFields.entrySet()) { + String partition = isEntry.getKey(); + if (seenPartitions.contains(partition)) { + continue; + } + Map instanceStateMap = isEntry.getValue(); + if (instanceStateMap != null && instanceStateMap.containsKey(instanceName)) { + String desiredState = instanceStateMap.get(instanceName); + if (seenPartitions.add(partition)) { + partitionsStillOnInstance.add( + new PartitionInfo(partition, desiredState, resourceName)); + } + } } } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestEvacuateWithExclusions.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestEvacuateWithExclusions.java index ded6fa4e64..e29ec84300 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestEvacuateWithExclusions.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestEvacuateWithExclusions.java @@ -309,7 +309,9 @@ public void testDisabledResourceExclusion() throws Exception { } /** - * Test offline instance with CUSTOMIZED resources + * Test offline instance with CUSTOMIZED resources uses union semantics. + * Evacuation remains in-progress if partitions still exist in CurrentState, + * even after the instance is removed from IdealState. */ @Test public void testOfflineInstanceWithCustomizedResource() throws Exception { @@ -362,9 +364,10 @@ public void testOfflineInstanceWithCustomizedResource() throws Exception { _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, customDB, newIdealState); Assert.assertTrue(_clusterVerifier.verifyByPolling()); - // Now evacuation SHOULD be finished - Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate, Collections.emptySet()), - "Evacuation should be finished after removing instance from CUSTOMIZED IdealState"); + // With union semantics, evacuation is still NOT finished because the offline instance + // still has CurrentState entries for this resource. + Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate, Collections.emptySet()), + "Evacuation should remain in progress while CurrentState still has partitions, even if IdealState no longer assigns the instance"); // Cleanup _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, customDB);