Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -159,16 +160,18 @@ private static boolean shouldExcludePartition(PartitionInfo partition,
}

/**
* Gets partitions from current states for customized resources that are still assigned
* to the instance (i.e., not yet reassigned to other instances).
* This is used for offline instances.
* Gets partitions that block evacuation for an offline instance with customized resources.
* Uses union semantics: a partition blocks evacuation if it exists in CurrentState (data still
* on the instance) OR if it is assigned to this instance in IdealState (assignment not moved).
* This prevents premature evacuation completion when partition names rotate in IdealState
* (e.g., segment generation changes) while the instance still holds data.
*
* @param currentStates List of current states
* @param idealStates List of ideal states
* @param instanceName The instance being checked
* @param allowedResources Set of allowed resources (already filtered)
* @param filters Exclusion filters to apply
* @return List of partitions that are still assigned to the instance (after exclusions)
* @return List of partitions blocking evacuation (union of CurrentState and IdealState)
*/
public static List<PartitionInfo> getCustomizedPartitionsStillOnInstance(
List<CurrentState> currentStates, List<IdealState> idealStates, String instanceName,
Expand All @@ -178,44 +181,59 @@ public static List<PartitionInfo> getCustomizedPartitionsStillOnInstance(
return Collections.emptyList();
}

// Create a map of resourceName -> CurrentState
Map<String, CurrentState> currentStateMap = currentStates.stream()
.collect(Collectors.toMap(CurrentState::getResourceName, cs -> cs));

// Create a map of resourceName -> IdealState for CUSTOMIZED resources
Map<String, IdealState> customizedIdealStateMap = idealStates.stream()
.filter(is -> is.getRebalanceMode() == IdealState.RebalanceMode.CUSTOMIZED &&
currentStateMap.containsKey(is.getResourceName()) &&
allowedResources.contains(is.getResourceName()))
.filter(is -> is.getRebalanceMode() == IdealState.RebalanceMode.CUSTOMIZED
&& allowedResources.contains(is.getResourceName()))
.collect(Collectors.toMap(IdealState::getResourceName, is -> is));

List<PartitionInfo> partitionsStillOnInstance = new ArrayList<>();

// For each customized resource, check which partitions are still assigned to the instance
for (Map.Entry<String, IdealState> entry : customizedIdealStateMap.entrySet()) {
String resourceName = entry.getKey();
IdealState idealState = entry.getValue();
CurrentState cs = currentStateMap.get(resourceName);
Set<String> seenPartitions = new HashSet<>();

if (cs == null || cs.getPartitionStateMap() == null) {
continue;
}
// Any non-excluded partition in CurrentState blocks evacuation, regardless of whether
// it appears in IdealState. This handles partition name rotation (e.g., segment
// generation changes) where old-gen partitions in CS no longer match new-gen IS names.
if (cs != null && cs.getPartitionStateMap() != null) {
for (Map.Entry<String, String> partitionEntry : cs.getPartitionStateMap().entrySet()) {
String partition = partitionEntry.getKey();
String state = partitionEntry.getValue();

for (Map.Entry<String, String> partitionEntry : cs.getPartitionStateMap().entrySet()) {
String partition = partitionEntry.getKey();
String state = partitionEntry.getValue();
PartitionInfo partitionInfo = new PartitionInfo(partition, state, resourceName);

PartitionInfo partitionInfo = new PartitionInfo(partition, state, resourceName);
if (shouldExcludePartition(partitionInfo, filters)) {
continue;
}

// Apply exclusion filters
if (shouldExcludePartition(partitionInfo, filters)) {
continue; // Skip excluded partitions
if (seenPartitions.add(partition)) {
partitionsStillOnInstance.add(partitionInfo);
}
}
}

// Check if this partition is still assigned to the instance in IdealState
Map<String, String> instanceStateMap = idealState.getInstanceStateMap(partition);
if (instanceStateMap != null && instanceStateMap.containsKey(instanceName)) {
partitionsStillOnInstance.add(partitionInfo);
// Any partition assigned to this instance in IdealState also blocks evacuation, even if
// not present in CurrentState (e.g., new assignment the offline instance hasn't picked up).
Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
if (mapFields != null) {
for (Map.Entry<String, Map<String, String>> isEntry : mapFields.entrySet()) {
String partition = isEntry.getKey();
if (seenPartitions.contains(partition)) {
continue;
}
Map<String, String> instanceStateMap = isEntry.getValue();
if (instanceStateMap != null && instanceStateMap.containsKey(instanceName)) {
String desiredState = instanceStateMap.get(instanceName);
if (seenPartitions.add(partition)) {
partitionsStillOnInstance.add(
new PartitionInfo(partition, desiredState, resourceName));
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ public void testDisabledResourceExclusion() throws Exception {
}

/**
* Test offline instance with CUSTOMIZED resources
* Test offline instance with CUSTOMIZED resources uses union semantics.
* Evacuation remains in-progress if partitions still exist in CurrentState,
* even after the instance is removed from IdealState.
*/
@Test
public void testOfflineInstanceWithCustomizedResource() throws Exception {
Expand Down Expand Up @@ -362,9 +364,10 @@ public void testOfflineInstanceWithCustomizedResource() throws Exception {
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, customDB, newIdealState);
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// Now evacuation SHOULD be finished
Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate, Collections.emptySet()),
"Evacuation should be finished after removing instance from CUSTOMIZED IdealState");
// With union semantics, evacuation is still NOT finished because the offline instance
// still has CurrentState entries for this resource.
Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate, Collections.emptySet()),
"Evacuation should remain in progress while CurrentState still has partitions, even if IdealState no longer assigns the instance");

// Cleanup
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, customDB);
Expand Down
Loading