From aed7056bc1c8409b4696faadc634ef49edca959b Mon Sep 17 00:00:00 2001 From: Ravi Shankar Date: Wed, 3 Jun 2026 22:58:23 -0700 Subject: [PATCH 1/2] feat: Empty Block Complementing Signed-off-by: Ravi Shankar --- pkg/providers/infiniband/bm_test.go | 6 +- pkg/topology/domain.go | 38 ++- pkg/topology/domain_test.go | 9 +- pkg/translate/block.go | 17 +- pkg/translate/block_complement.go | 188 +++++++++++ pkg/translate/block_complement_test.go | 413 +++++++++++++++++++++++++ pkg/translate/block_test.go | 127 ++++++++ pkg/translate/block_tree.go | 337 ++++++++++++++++++++ pkg/translate/block_tree_test.go | 220 +++++++++++++ pkg/translate/topology.go | 9 +- pkg/translate/yaml.go | 16 +- pkg/translate/yaml_test.go | 56 ++++ 12 files changed, 1410 insertions(+), 26 deletions(-) create mode 100644 pkg/translate/block_complement.go create mode 100644 pkg/translate/block_complement_test.go create mode 100644 pkg/translate/block_tree.go create mode 100644 pkg/translate/block_tree_test.go diff --git a/pkg/providers/infiniband/bm_test.go b/pkg/providers/infiniband/bm_test.go index df63ff84..b700372c 100644 --- a/pkg/providers/infiniband/bm_test.go +++ b/pkg/providers/infiniband/bm_test.go @@ -29,9 +29,9 @@ func TestPopulateDomainsFromPdshOutput(t *testing.T) { node-09: ClusterUUID : 50000000-0000-0000-0000-000000000005 ` domainMap := topology.DomainMap{ - "50000000-0000-0000-0000-000000000004.4000000005": map[string]string{"node-07": "node-07", "node-08": "node-08"}, - "50000000-0000-0000-0000-000000000005.4000000004": map[string]string{"node-10": "node-10"}, - "50000000-0000-0000-0000-000000000005.4000000005": map[string]string{"node-09": "node-09"}, + "50000000-0000-0000-0000-000000000004.4000000005": map[string]*topology.HostInfo{"node-07": {Domain: "50000000-0000-0000-0000-000000000004.4000000005", HostName: "node-07", InstanceID: "node-07"}, "node-08": {Domain: "50000000-0000-0000-0000-000000000004.4000000005", HostName: "node-08", InstanceID: "node-08"}}, + "50000000-0000-0000-0000-000000000005.4000000004": map[string]*topology.HostInfo{"node-10": {Domain: "50000000-0000-0000-0000-000000000005.4000000004", HostName: "node-10", InstanceID: "node-10"}}, + "50000000-0000-0000-0000-000000000005.4000000005": map[string]*topology.HostInfo{"node-09": {Domain: "50000000-0000-0000-0000-000000000005.4000000005", HostName: "node-09", InstanceID: "node-09"}}, } testCases := []struct { diff --git a/pkg/topology/domain.go b/pkg/topology/domain.go index 36e4c1a9..6cbc9aaa 100644 --- a/pkg/topology/domain.go +++ b/pkg/topology/domain.go @@ -23,25 +23,21 @@ import ( "k8s.io/klog/v2" ) -// DomainMap maps domain name to a map of hostname:instance -type DomainMap map[string]map[string]string +type HostInfo struct { + Domain string + InstanceID string + HostName string +} + +// DomainMap maps accelerator domain name to host metadata. +type DomainMap map[string]map[string]*HostInfo func NewDomainMap() DomainMap { return make(DomainMap) } func (m DomainMap) AddHost(domain, instance, host string) { - if domain == "" { - klog.Warningf("skipping topology domain with empty name for host %q (instance %q)", host, instance) - return - } - - if hosts, ok := m[domain]; ok { - hosts[host] = instance - return - } - - m[domain] = map[string]string{host: instance} + m.AddHostInfo(&HostInfo{Domain: domain, InstanceID: instance, HostName: host}) } func (m DomainMap) String() string { @@ -52,3 +48,19 @@ func (m DomainMap) String() string { } return str.String() } + +func (m DomainMap) AddHostInfo(hostInfo *HostInfo) { + if hostInfo == nil { + return + } + if hostInfo.Domain == "" { + klog.Warningf("skipping topology domain with empty name for host %q (instance %q)", hostInfo.HostName, hostInfo.InstanceID) + return + } + + if hosts, ok := m[hostInfo.Domain]; ok { + hosts[hostInfo.HostName] = hostInfo + } else { + m[hostInfo.Domain] = map[string]*HostInfo{hostInfo.HostName: hostInfo} + } +} diff --git a/pkg/topology/domain_test.go b/pkg/topology/domain_test.go index 055f3505..a764f43e 100644 --- a/pkg/topology/domain_test.go +++ b/pkg/topology/domain_test.go @@ -31,7 +31,12 @@ func TestDomainMapAddHost(t *testing.T) { domainMap.AddHost("", "instance4", "host4") require.Equal(t, DomainMap{ - "domain1": map[string]string{"host1": "instance1", "host2": "instance2"}, - "domain2": map[string]string{"host3": "instance3"}, + "domain1": map[string]*HostInfo{ + "host1": {Domain: "domain1", InstanceID: "instance1", HostName: "host1"}, + "host2": {Domain: "domain1", InstanceID: "instance2", HostName: "host2"}, + }, + "domain2": map[string]*HostInfo{ + "host3": {Domain: "domain2", InstanceID: "instance3", HostName: "host3"}, + }, }, domainMap) } diff --git a/pkg/translate/block.go b/pkg/translate/block.go index 66688e28..254073f5 100644 --- a/pkg/translate/block.go +++ b/pkg/translate/block.go @@ -51,16 +51,27 @@ func getBlockSizes(blocks []*blockInfo, requestedBlockSizes []int) []int { } func (nt *NetworkTopology) toBlockTopology(wr io.Writer, skeletonOnly bool) *httperr.Error { - blockSizes := getBlockSizes(nt.blocks, nt.config.BlockSizes) + blocks := nt.complementBlocks(nt.blocks, nt.config.BlockSizes) + // Refresh nodeInfo.blockID so GetNodeTopologySpec returns IDs that match the + // emitted topology file. complementBlocks may renumber blocks when it splits + // a domain across multiple base blocks, invalidating the IDs set by initBlocks. + for _, b := range blocks { + for _, node := range b.nodes { + if info, ok := nt.nodeInfo[node]; ok { + info.blockID = b.id + } + } + } + blockSizes := getBlockSizes(blocks, nt.config.BlockSizes) - for _, bInfo := range nt.blocks { + for _, bInfo := range blocks { var comment string if len(bInfo.name) != 0 { comment = fmt.Sprintf("# %s=%s\n", bInfo.id, bInfo.name) } var err error - if skeletonOnly { + if skeletonOnly || len(bInfo.nodes) == 0 { _, err = fmt.Fprintf(wr, "%sBlockName=%s\n", comment, bInfo.id) } else { outputNodeNames := strings.Join(cluset.Compact(bInfo.nodes), ",") diff --git a/pkg/translate/block_complement.go b/pkg/translate/block_complement.go new file mode 100644 index 00000000..00be2073 --- /dev/null +++ b/pkg/translate/block_complement.go @@ -0,0 +1,188 @@ +/* + * Copyright 2026 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package translate + +import "github.com/NVIDIA/topograph/pkg/topology" + +// groupSizeFromDomains computes how many base blocks a fully-populated accelerator +// occupies, rounded up to the nearest power of two. It finds the maximum accelerator +// host count across all domains, then returns 2^n where 2^n * baseBlockSize is the +// smallest power-of-two multiple of baseBlockSize that is >= maxAcceleratorSize. +// Returns 1 when every accelerator fits within a single base block (no padding needed). +func groupSizeFromDomains(domains topology.DomainMap, baseBlockSize int) int { + maxNodes := 0 + for _, hosts := range domains { + if len(hosts) > maxNodes { + maxNodes = len(hosts) + } + } + groupSize := 1 + capacity := baseBlockSize + for capacity < maxNodes { + groupSize *= 2 + capacity *= 2 + } + return groupSize +} + +// complementBlocks builds a block tree shaped by BlockSizes, packs domain hosts into +// it, and returns the flat block list derived from low-level tree nodes. +// +// Only domains for accelerators present in blocks are used so per-partition YAML +// complementing is not masked by domains owned by other partitions in nt.domains. +// +// The group size is derived from the maximum accelerator host count: it is the smallest +// 2^n such that 2^n * baseBlockSize >= maxAcceleratorSize. Each accelerator's base +// block count is then padded to a multiple of that groupSize so every accelerator +// occupies complete aggregate groups within the tree. +func (nt *NetworkTopology) complementBlocks(blocks []*blockInfo, blockSizes []int) []*blockInfo { + fanouts, ok := fanoutsPerLevel(blockSizes) + if !ok || nt.domains == nil { + return blocks + } + + domains := domainsForBlocks(nt.domains, blocks) + if len(domains) == 0 { + return blocks + } + + byName := blocksByName(blocks) + baseBlockSize := blockSizes[0] + groupSize := groupSizeFromDomains(domains, baseBlockSize) + packed := packDomainsIntoBaseBlocks(domains, baseBlockSize, groupSize) + expandedFanouts := expandFanoutsForCapacity(fanouts, len(packed)) + tree := buildAggregateShape(expandedFanouts, baseBlockSize) + mergeBaseBlocksIntoTree(tree, packed) + + // When no padding was added (packed count equals input count), stop at the exact + // packed count so trailing tree-capacity slots do not falsely trigger complement usage. + required := len(packed) + if len(packed) != len(blocks) { + required = totalBaseBlockSlots(expandedFanouts) + } + + out := blocksFromShapedTree(tree, byName, required) + if !shouldUseComplementedBlocks(blocks, out) { + return blocks + } + return out +} + +// shouldUseComplementedBlocks reports whether the tree-derived list should replace the +// input. Two cases warrant replacement: +// - Interior empty slots: the tree has gaps where no accelerator was placed, meaning +// the shaped output carries structural information the flat input list cannot express. +// - Count increase: an accelerator had more hosts than baseBlockSize and was split +// into multiple base blocks, so the output is longer than the input. +// +// A shorter output is never used: domainsForBlocks may skip blocks whose domain is +// absent from the global map, producing fewer packed blocks than the input. Replacing +// the input in that case would silently drop blocks. +func shouldUseComplementedBlocks(input, out []*blockInfo) bool { + if len(out) < len(input) { + return false + } + if hasEmptyBlockSlots(out) { + return true + } + return len(out) > len(input) +} + +// domainsForBlocks returns a subset of the cluster domain map containing only the +// hosts that belong to the given partition-local blocks. For each block it intersects +// the global domain with the block's own node list, so that nodes owned by another +// partition in the same accelerator domain are never included. +func domainsForBlocks(all topology.DomainMap, blocks []*blockInfo) topology.DomainMap { + if all == nil { + return nil + } + local := topology.NewDomainMap() + for _, b := range blocks { + if b == nil || b.name == "" { + continue + } + hosts, ok := all[b.name] + if !ok { + continue + } + // Restrict to nodes the partition actually owns; a domain may span multiple + // partitions and the global map holds all of them. + partitionNodes := make(map[string]struct{}, len(b.nodes)) + for _, n := range b.nodes { + partitionNodes[n] = struct{}{} + } + for _, hi := range hosts { + if _, owned := partitionNodes[hi.HostName]; !owned { + continue + } + copy := *hi + local.AddHostInfo(©) + } + } + return local +} + +// blocksByName builds an accelerator-ID → blockInfo index used by baseBlockToBlockInfo +// to look up node lists when a block's leaves carry only an accelerator reference. +// Nil entries and blocks without a name are skipped, matching the guard in domainsForBlocks. +func blocksByName(blocks []*blockInfo) map[string]*blockInfo { + m := make(map[string]*blockInfo, len(blocks)) + for _, b := range blocks { + if b == nil || b.name == "" { + continue + } + m[b.name] = b + } + return m +} + +// hasEmptyBlockSlots reports whether any interior slot in blocks is empty. +// Trailing empty blocks are not considered complement slots because they arise from +// tree capacity rounding and carry no structural meaning for the scheduler. +// "Trailing" means everything after the last non-empty block; only empty slots +// that appear before that boundary are treated as structural gaps. +func hasEmptyBlockSlots(blocks []*blockInfo) bool { + lastNonEmpty := -1 + for i := len(blocks) - 1; i >= 0; i-- { + if !isEmptyBlock(blocks[i]) { + lastNonEmpty = i + break + } + } + for i := 0; i < lastNonEmpty; i++ { + if isEmptyBlock(blocks[i]) { + return true + } + } + return false +} + +// fanoutsPerLevel derives child segment counts from BlockSizes. +// Each ratio BlockSizes[i]/BlockSizes[i-1] must be a power of two greater than 1. +func fanoutsPerLevel(blockSizes []int) ([]int, bool) { + if len(blockSizes) < 2 { + return nil, false + } + prev := blockSizes[0] + if prev <= 0 { + return nil, false + } + out := make([]int, 0, len(blockSizes)-1) + for i := 1; i < len(blockSizes); i++ { + cur := blockSizes[i] + if cur <= prev || cur%prev != 0 { + return nil, false + } + ratio := cur / prev + // Power-of-two check: ratio & (ratio-1) is zero only for powers of two. + if ratio&(ratio-1) != 0 { + return nil, false + } + out = append(out, ratio) + prev = cur + } + return out, true +} diff --git a/pkg/translate/block_complement_test.go b/pkg/translate/block_complement_test.go new file mode 100644 index 00000000..b25c776a --- /dev/null +++ b/pkg/translate/block_complement_test.go @@ -0,0 +1,413 @@ +/* + * Copyright 2026 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package translate + +import ( + "bytes" + "fmt" + "strings" + "testing" + + "github.com/NVIDIA/topograph/pkg/topology" + "github.com/stretchr/testify/require" +) + +// TestComplementMissingBaseBlock verifies that when an accelerator domain is absent +// from the graph, the remaining blocks renumber sequentially with no empty placeholder. +// maxAcceleratorSize=3 ≤ baseBlockSize=4, so groupSize=1 (no padding) and complement +// is a no-op; the original 3-block list is returned unchanged. +func TestComplementMissingBaseBlock(t *testing.T) { + root, _ := getBlockWithIBTestSet() + delete(root.Domains, "B2") + + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{4, 8, 16}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + require.Nil(t, nt.toBlockTopology(&buf, false)) + + expected := strings.Join([]string{ + "# block001=B1", + "BlockName=block001 Nodes=Node[104-106]", + "# block002=B3", + "BlockName=block002 Nodes=Node[304-306]", + "# block003=B4", + "BlockName=block003 Nodes=Node[401-403]", + "BlockSizes=4,8,16", + "", + }, "\n") + require.Equal(t, expected, buf.String()) +} + +// TestComplementMissingLeafSegment verifies the asymmetric-spine case: one spine has +// 4 leaf switches and the other has 3. maxAcceleratorSize=3 ≤ baseBlockSize=4, so +// groupSize=1 (no padding) and complement is a no-op; all 7 accelerators appear as +// 7 sequential blocks with no trailing placeholders. +func TestComplementMissingLeafSegment(t *testing.T) { + root, _ := getBlockWithIBAsymmetricSpineTestSet() + + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{4, 16, 32}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + require.Nil(t, nt.toBlockTopology(&buf, false)) + + expected := strings.Join([]string{ + "# block001=B1", + "BlockName=block001 Nodes=Node[101-103]", + "# block002=B2", + "BlockName=block002 Nodes=Node[201-202,205]", + "# block003=B3", + "BlockName=block003 Nodes=Node[301-303]", + "# block004=B4", + "BlockName=block004 Nodes=Node[401-403]", + "# block005=B5", + "BlockName=block005 Nodes=Node[501-503]", + "# block006=B6", + "BlockName=block006 Nodes=Node[601-603]", + "# block007=B7", + "BlockName=block007 Nodes=Node[701-703]", + "BlockSizes=4,16,32", + "", + }, "\n") + require.Equal(t, expected, buf.String()) +} + +// TestNoComplementWithoutTree verifies that complementBlocks is a no-op when the graph +// has no Tiers (no switch tree). maxAcceleratorSize=3 ≤ baseBlockSize=4, so groupSize=1 +// and no structural padding is applied; the output contains no empty block slots. +func TestNoComplementWithoutTree(t *testing.T) { + root, _ := getBlockTestSet() + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{4, 8, 16}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + require.Nil(t, nt.toBlockTopology(&buf, false)) + require.NotContains(t, buf.String(), "BlockName=block002\n") + require.Contains(t, buf.String(), "BlockSizes=4,8,16") +} + +// TestNoComplementSingleBlockSize verifies that a single BlockSizes entry (no tiers) +// disables the complement path entirely; fanoutsPerLevel requires at least two sizes. +func TestNoComplementSingleBlockSize(t *testing.T) { + root, _ := getBlockWithIBTestSet() + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{3}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + require.Nil(t, nt.toBlockTopology(&buf, false)) + require.Equal(t, testBlockConfig1_2, buf.String()) +} + +func TestFanoutsPerLevel(t *testing.T) { + fanouts, ok := fanoutsPerLevel([]int{4, 8, 16}) + require.True(t, ok) + require.Equal(t, []int{2, 2}, fanouts) + + _, ok = fanoutsPerLevel([]int{4, 8, 12}) + require.False(t, ok) + + _, ok = fanoutsPerLevel([]int{4, 6, 12}) + require.False(t, ok) + + _, ok = fanoutsPerLevel([]int{4}) + require.False(t, ok) +} + +// TestHasEmptyBlockSlots verifies the interior-only rule: trailing empty blocks are +// not counted as complement slots (they arise from tree-capacity rounding), but an +// empty block that appears before the last non-empty block is a structural gap. +func TestHasEmptyBlockSlots(t *testing.T) { + require.False(t, hasEmptyBlockSlots([]*blockInfo{{name: "B1", nodes: []string{"n1"}}})) + // Single trailing empty: not a structural gap. + require.False(t, hasEmptyBlockSlots([]*blockInfo{{name: "B1", nodes: []string{"n1"}}, {}})) + // Multiple trailing empties: all are trailing, none are structural gaps. + require.False(t, hasEmptyBlockSlots([]*blockInfo{{name: "B1"}, {}, {}})) + // Interior empty between B1 and B3: structural gap. + require.True(t, hasEmptyBlockSlots([]*blockInfo{{name: "B1"}, {}, {name: "B3"}})) + // Interior empty followed by further trailing empties: gap is detected. + require.True(t, hasEmptyBlockSlots([]*blockInfo{{name: "B1"}, {}, {name: "B3"}, {}, {}})) + // Trailing empties after 7 filled blocks: tree-capacity artifact, not a gap. + require.False(t, hasEmptyBlockSlots([]*blockInfo{ + {name: "B1"}, {name: "B2"}, {name: "B3"}, {name: "B4"}, + {name: "B5"}, {name: "B6"}, {name: "B7"}, {}, {}, + })) +} + +// TestComplementKeepsSeparateAccelerators verifies that two undersized accelerators are +// never merged into a single base block. maxAcceleratorSize=3 ≤ baseBlockSize=8, so +// groupSize=1 and complement is a no-op; the original 2-block list is returned with +// each accelerator in its own separate block. +func TestComplementKeepsSeparateAccelerators(t *testing.T) { + domains := topology.NewDomainMap() + nodesB1 := []string{"Node101", "Node102", "Node103"} + nodesB2 := []string{"Node201", "Node202", "Node205"} + for _, n := range nodesB1 { + domains.AddHostInfo(&topology.HostInfo{Domain: "B1", HostName: n, InstanceID: n}) + } + for _, n := range nodesB2 { + domains.AddHostInfo(&topology.HostInfo{Domain: "B2", HostName: n, InstanceID: n}) + } + + nt := &NetworkTopology{ + domains: domains, + blocks: []*blockInfo{ + {name: "B1", nodes: nodesB1}, + {name: "B2", nodes: nodesB2}, + }, + } + + out := nt.complementBlocks(nt.blocks, []int{8, 16}) + require.Len(t, out, 2) + require.Equal(t, "B1", out[0].name) + require.Len(t, out[0].nodes, 3) + require.Equal(t, "B2", out[1].name) + require.Len(t, out[1].nodes, 3) +} + +// TestComplementExcessHostsPerAccelerator verifies the split path: when a single +// accelerator has more hosts than baseBlockSize it is split into multiple base blocks, +// each carrying the same accelerator name, and every host appears exactly once. +// maxAcceleratorSize=12, baseBlockSize=4 → groupSize=4 (2^2*4=16 ≥ 12); 3 real blocks +// padded to 4 (ceil(3/4)*4). +func TestComplementExcessHostsPerAccelerator(t *testing.T) { + domains := topology.NewDomainMap() + nodes := make([]string, 0, 12) + for i := 0; i < 12; i++ { + name := fmt.Sprintf("Node%03d", 100+i) + nodes = append(nodes, name) + domains.AddHostInfo(&topology.HostInfo{ + Domain: "B1", + HostName: name, + InstanceID: name, + }) + } + + nt := &NetworkTopology{ + domains: domains, + blocks: []*blockInfo{{ + id: "block001", + name: "B1", + nodes: nodes, + }}, + } + + out := nt.complementBlocks(nt.blocks, []int{4, 8, 16}) + // 3 base blocks (ceil(12/4)) padded to 4 (groupSize=2, ceil(3/2)*2=4). + require.Len(t, out, 4) + require.True(t, isEmptyBlock(out[3]), "out[3] should be the group-alignment padding block") + + seen := make(map[string]bool) + for _, b := range out[:3] { + require.Equal(t, "B1", b.name) + for _, n := range b.nodes { + seen[n] = true + } + } + require.Len(t, seen, 12) + for _, n := range nodes { + require.True(t, seen[n]) + } +} + +// TestComplementPartitionLocalDomainsOnly verifies that complementBlocks scopes domain +// lookup to the partition's own blocks. B2 exists in nt.domains but is excluded from +// partitionBlocks, so the complement result contains only B1, B3, and B4. With +// maxAcceleratorSize=3 ≤ baseBlockSize=4, groupSize=1 and no padding is applied. +func TestComplementPartitionLocalDomainsOnly(t *testing.T) { + root, _ := getBlockWithIBTestSet() + nt, err := NewNetworkTopology(root, &Config{Plugin: topology.TopologyBlock}) + require.NoError(t, err) + + // Partition owns B1, B3, B4 but not B2 (B2 remains in global nt.domains). + partitionBlocks := make([]*blockInfo, 0, 3) + for _, b := range nt.blocks { + if b.name == "B2" { + continue + } + partitionBlocks = append(partitionBlocks, b) + } + + out := nt.complementBlocks(partitionBlocks, []int{4, 8, 16}) + require.Len(t, out, 3) + require.Equal(t, "B1", out[0].name) + require.Equal(t, "B3", out[1].name) + require.Equal(t, "B4", out[2].name) +} + +// TestDomainsForBlocksFilteredToPartitionNodes is a regression test for cross-partition +// node contamination. Domain B1 holds 4 nodes globally (n1–n4), but the partition-local +// blockInfo only lists n1, n2, n3. Without filtering, domainsForBlocks would copy all 4 +// hosts and n4 would appear in the complemented output. With the fix, only n1–n3 are +// used: the split produces two base blocks ([n1,n2] and [n3]) and n4 is absent. +func TestDomainsForBlocksFilteredToPartitionNodes(t *testing.T) { + domains := topology.NewDomainMap() + for _, n := range []string{"n1", "n2", "n3", "n4"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B1", HostName: n, InstanceID: n}) + } + + // Partition only owns n1, n2, n3 — n4 belongs to another partition. + nt := &NetworkTopology{ + domains: domains, + blocks: []*blockInfo{}, + } + partitionBlocks := []*blockInfo{ + {name: "B1", nodes: []string{"n1", "n2", "n3"}}, + } + + out := nt.complementBlocks(partitionBlocks, []int{2, 4}) + // groupSize=2 (maxAccelSize=3, 2^1×2=4≥3); B1 splits into 2 base blocks. + // len(packed)=2 ≠ len(input)=1 → complement applied. + require.Len(t, out, 2) + + seen := make(map[string]bool) + for _, b := range out { + require.Equal(t, "B1", b.name) + for _, n := range b.nodes { + seen[n] = true + } + } + require.True(t, seen["n1"]) + require.True(t, seen["n2"]) + require.True(t, seen["n3"]) + require.False(t, seen["n4"], "n4 belongs to another partition and must not appear") +} + +// TestComplementPreservesInputWhenDomainsMissing verifies that complementBlocks +// returns the original block list unchanged when domainsForBlocks produces fewer +// packed blocks than the input. This happens when some blocks have no matching +// entry in the global domain map (e.g. the block name was never registered). +// Before the fix, shouldUseComplementedBlocks treated len(out) < len(input) as a +// count mismatch warranting replacement, silently dropping the unmatched blocks. +func TestComplementPreservesInputWhenDomainsMissing(t *testing.T) { + domains := topology.NewDomainMap() + // Only B1 is in the domain map; B2 is not. + for _, n := range []string{"n1", "n2"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B1", HostName: n, InstanceID: n}) + } + nt := &NetworkTopology{ + domains: domains, + } + input := []*blockInfo{ + {id: "block001", name: "B1", nodes: []string{"n1", "n2"}}, + {id: "block002", name: "B2", nodes: []string{"n3", "n4"}}, // no domain entry + } + out := nt.complementBlocks(input, []int{2, 4}) + // B2 is absent from the global domain map, so packed has only 1 block vs 2 input + // blocks. The complement output would be shorter — must fall back to original input. + require.Equal(t, input, out) +} + +// TestGetBlockTopologyUnitWithMultiAcceleratorDomains verifies the YAML per-partition +// complement path end-to-end: two domains, three accelerators (a1, a2, a3), block +// sizes [2,4]. a2 is undersized (fewer nodes than groupSize=2 requires), so it gets +// an empty padding slot; tree-capacity expansion adds two more trailing empty slots. +func TestGetBlockTopologyUnitWithMultiAcceleratorDomains(t *testing.T) { + domains := topology.NewDomainMap() + for _, n := range []string{"n10", "n11", "n12"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a1", HostName: n, InstanceID: n}) + } + for _, n := range []string{"n20", "n21"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a2", HostName: n, InstanceID: n}) + } + for _, n := range []string{"n31", "n32", "n33"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a3", HostName: n, InstanceID: n}) + } + + cfg := &Config{ + Topologies: map[string]*TopologySpec{ + "topo1": { + Plugin: topology.TopologyBlock, + Nodes: []string{"n[10-12]", "n[20-21]", "n[31-33]"}, + BlockSizes: []int{2, 4}, + }, + }, + } + + graph := &topology.Graph{Domains: domains} + nt, err := NewNetworkTopology(graph, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + require.Nil(t, nt.Generate(&buf)) + + expected := strings.Join([]string{ + "- topology: topo1", + " cluster_default: false", + " block:", + " block_sizes:", + " - 2", + " - 4", + " blocks:", + " - block: block1", + " nodes: n[10-11]", + " - block: block2", + " nodes: n12", + " - block: block3", + " nodes: n[20-21]", + " - block: block4", + " - block: block5", + " nodes: n[31-32]", + " - block: block6", + " nodes: n33", + " - block: block7", + " - block: block8", + "", + }, "\n") + require.Equal(t, expected, buf.String()) +} + +// getBlockWithIBAsymmetricSpineTestSet models two spines with four leaf switches on the +// left spine and three on the right, each leaf switch hosting one accelerator domain. +func getBlockWithIBAsymmetricSpineTestSet() (*topology.Graph, map[string]string) { + n := func(id, name string) *topology.Vertex { + return &topology.Vertex{ID: id, Name: name} + } + + leaf := func(id string, nodes map[string]*topology.Vertex) *topology.Vertex { + return &topology.Vertex{ID: id, Vertices: nodes} + } + + l11 := leaf("L11", map[string]*topology.Vertex{"I11a": n("I11a", "Node101"), "I11b": n("I11b", "Node102"), "I11c": n("I11c", "Node103")}) + l12 := leaf("L12", map[string]*topology.Vertex{"I12a": n("I12a", "Node201"), "I12b": n("I12b", "Node202"), "I12c": n("I12c", "Node205")}) + l13 := leaf("L13", map[string]*topology.Vertex{"I13a": n("I13a", "Node301"), "I13b": n("I13b", "Node302"), "I13c": n("I13c", "Node303")}) + l14 := leaf("L14", map[string]*topology.Vertex{"I14a": n("I14a", "Node401"), "I14b": n("I14b", "Node402"), "I14c": n("I14c", "Node403")}) + l21 := leaf("L21", map[string]*topology.Vertex{"I21a": n("I21a", "Node501"), "I21b": n("I21b", "Node502"), "I21c": n("I21c", "Node503")}) + l22 := leaf("L22", map[string]*topology.Vertex{"I22a": n("I22a", "Node601"), "I22b": n("I22b", "Node602"), "I22c": n("I22c", "Node603")}) + l23 := leaf("L23", map[string]*topology.Vertex{"I23a": n("I23a", "Node701"), "I23b": n("I23b", "Node702"), "I23c": n("I23c", "Node703")}) + + spine1 := &topology.Vertex{ID: "SP1", Vertices: map[string]*topology.Vertex{"L11": l11, "L12": l12, "L13": l13, "L14": l14}} + spine2 := &topology.Vertex{ID: "SP2", Vertices: map[string]*topology.Vertex{"L21": l21, "L22": l22, "L23": l23}} + core := &topology.Vertex{Vertices: map[string]*topology.Vertex{"SP1": spine1, "SP2": spine2}} + + domains := testDomainMap(map[string]map[string]string{ + "B1": {"Node101": "I11a", "Node102": "I11b", "Node103": "I11c"}, + "B2": {"Node201": "I12a", "Node202": "I12b", "Node205": "I12c"}, + "B3": {"Node301": "I13a", "Node302": "I13b", "Node303": "I13c"}, + "B4": {"Node401": "I14a", "Node402": "I14b", "Node403": "I14c"}, + "B5": {"Node501": "I21a", "Node502": "I21b", "Node503": "I21c"}, + "B6": {"Node601": "I22a", "Node602": "I22b", "Node603": "I22c"}, + "B7": {"Node701": "I23a", "Node702": "I23b", "Node703": "I23c"}, + }) + + return &topology.Graph{Tiers: core, Domains: domains}, nil +} diff --git a/pkg/translate/block_test.go b/pkg/translate/block_test.go index e4fe7840..21ac780d 100644 --- a/pkg/translate/block_test.go +++ b/pkg/translate/block_test.go @@ -11,6 +11,7 @@ import ( "strings" "testing" + "github.com/NVIDIA/topograph/pkg/topology" "github.com/stretchr/testify/require" ) @@ -202,6 +203,132 @@ func TestGetBlockSizes(t *testing.T) { } } +// TestGenerateTopologyConfig exercises GenerateTopologyConfig directly for a +// cluster-wide block topology. For cluster-wide mode (no per-partition Topologies +// map), the returned []*TopologyUnit slice is always empty — the topology is written +// to the writer rather than returned. +// +// BlockSizes={2,4,8} triggers node complementing: baseBlockSize=2, +// maxAcceleratorSize=3, so groupSize=2 (2^1×2=4≥3). Each 3-node accelerator is +// split into 2 base blocks (no empty padding needed since 2 is already a multiple of +// groupSize). The 4 accelerators × 2 blocks each = 8 output blocks total. +// +// The test covers both full output (with node lists compacted to ranges) and +// skeleton-only output (block structure without node lists). +func TestGenerateTopologyConfig(t *testing.T) { + root, _ := getBlockWithIBTestSet() + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{2, 4, 8}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + t.Run("full output", func(t *testing.T) { + var buf bytes.Buffer + topologies, httpErr := nt.GenerateTopologyConfig(&buf, false) + require.Nil(t, httpErr) + // Cluster-wide block topology: topology is written to the writer, not + // returned in the TopologyUnit slice. + require.Empty(t, topologies) + // Each accelerator is split into 2 base blocks (nodes sorted alphabetically, + // then chunked at baseBlockSize=2): first chunk gets the lower 2, second + // gets the remaining 1. + expected := strings.Join([]string{ + "# block001=B1", + "BlockName=block001 Nodes=Node[104-105]", + "# block002=B1", + "BlockName=block002 Nodes=Node106", + "# block003=B2", + "BlockName=block003 Nodes=Node[201-202]", + "# block004=B2", + "BlockName=block004 Nodes=Node205", + "# block005=B3", + "BlockName=block005 Nodes=Node[304-305]", + "# block006=B3", + "BlockName=block006 Nodes=Node306", + "# block007=B4", + "BlockName=block007 Nodes=Node[401-402]", + "# block008=B4", + "BlockName=block008 Nodes=Node403", + "BlockSizes=2,4,8", + "", + }, "\n") + require.Equal(t, expected, buf.String()) + }) + + t.Run("skeleton only", func(t *testing.T) { + var buf bytes.Buffer + topologies, httpErr := nt.GenerateTopologyConfig(&buf, true) + require.Nil(t, httpErr) + require.Empty(t, topologies) + // Same 8-block structure but Nodes= is omitted from every line. + expected := strings.Join([]string{ + "# block001=B1", + "BlockName=block001", + "# block002=B1", + "BlockName=block002", + "# block003=B2", + "BlockName=block003", + "# block004=B2", + "BlockName=block004", + "# block005=B3", + "BlockName=block005", + "# block006=B3", + "BlockName=block006", + "# block007=B4", + "BlockName=block007", + "# block008=B4", + "BlockName=block008", + "BlockSizes=2,4,8", + "", + }, "\n") + require.Equal(t, expected, buf.String()) + }) +} + +// TestGetNodeTopologySpecAfterComplement verifies that GetNodeTopologySpec returns +// block IDs that match the emitted topology file after complement splits domains +// across multiple base blocks. +func TestGetNodeTopologySpecAfterComplement(t *testing.T) { + root, _ := getBlockWithIBTestSet() + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{2, 4, 8}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + _, httpErr := nt.GenerateTopologyConfig(&buf, false) + require.Nil(t, httpErr) + + // Expected mapping: first two nodes of each domain go to the first base block, + // the third node goes to the second (sorted alphabetically within the domain). + cases := []struct { + node string + blockID string + }{ + {"Node104", "block001"}, // B1 first chunk + {"Node105", "block001"}, // B1 first chunk + {"Node106", "block002"}, // B1 second chunk — stale before fix + {"Node201", "block003"}, // B2 first chunk — stale before fix + {"Node202", "block003"}, // B2 first chunk — stale before fix + {"Node205", "block004"}, // B2 second chunk — stale before fix + {"Node304", "block005"}, // B3 first chunk + {"Node305", "block005"}, // B3 first chunk + {"Node306", "block006"}, // B3 second chunk + {"Node401", "block007"}, // B4 first chunk + {"Node402", "block007"}, // B4 first chunk + {"Node403", "block008"}, // B4 second chunk + } + for _, tc := range cases { + spec, specErr := nt.GetNodeTopologySpec(tc.node, nil) + require.Nil(t, specErr, "node %s", tc.node) + require.Equal(t, "default:"+tc.blockID, spec, "node %s", tc.node) + } +} + func populateBlockInfo(blocks map[string]int) []*blockInfo { result := make([]*blockInfo, 0, len(blocks)) diff --git a/pkg/translate/block_tree.go b/pkg/translate/block_tree.go new file mode 100644 index 00000000..29560c6c --- /dev/null +++ b/pkg/translate/block_tree.go @@ -0,0 +1,337 @@ +/* + * Copyright 2026 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package translate + +import ( + "fmt" + "maps" + "slices" + "sort" + + "github.com/NVIDIA/topograph/pkg/topology" +) + +// blockTreeNode is implemented by host, base, and aggregate block nodes. +type blockTreeNode interface { + blockTreeNode() +} + +// hostNode is the lowermost tree level: a host slot or an empty placeholder (host == nil). +type hostNode struct { + host *topology.HostInfo +} + +func (*hostNode) blockTreeNode() {} + +// baseBlockNode is the Slurm base block level. It always holds exactly baseBlockSize +// host nodes; missing positions or hosts are nil-host placeholders. +type baseBlockNode struct { + id string + domain string // primary domain ID, pre-computed from id at construction + leaves []*hostNode +} + +func (*baseBlockNode) blockTreeNode() {} + +func (n *baseBlockNode) domainIdentifier() string { return n.domain } + +// aggregateBlockNode groups base blocks or other aggregates. An domain with +// multiple base blocks is represented as an aggregate of baseBlockNode children. +type aggregateBlockNode struct { + id string + children []blockTreeNode +} + +func (*aggregateBlockNode) blockTreeNode() {} + +// buildAggregateShape wraps the recursive shape builder so callers always receive an +// *aggregateBlockNode. When fanouts produces a single base block (no intermediate +// tiers), the base block is wrapped in a one-child aggregate. +func buildAggregateShape(fanouts []int, baseBlockSize int) *aggregateBlockNode { + node := buildShapeLevel(fanouts, 0, baseBlockSize) + agg, ok := node.(*aggregateBlockNode) + if !ok { + return &aggregateBlockNode{children: []blockTreeNode{node}} + } + return agg +} + +// buildShapeLevel recurses top-down through the fanout tiers. level == len(fanouts) +// is the base case; it emits an empty base block. Returns blockTreeNode because the +// base case produces *baseBlockNode while all other levels produce *aggregateBlockNode. +func buildShapeLevel(fanouts []int, level int, baseBlockSize int) blockTreeNode { + if level == len(fanouts) { + return newEmptyBaseBlock(baseBlockSize) + } + count := fanoutAtLevel(level, fanouts) + children := make([]blockTreeNode, count) + for i := range children { + children[i] = buildShapeLevel(fanouts, level+1, baseBlockSize) + } + return &aggregateBlockNode{children: children} +} + +// totalBaseBlockSlots returns the product of all fanout tiers, which equals the +// total number of base-block leaves in the shaped tree. +func totalBaseBlockSlots(fanouts []int) int { + n := 1 + for _, f := range fanouts { + n *= f + } + return n +} + +// expandFanoutsForCapacity grows the last fanout tier (power-of-two steps) until the +// tree has at least required base-block leaves. Returns fanouts unchanged when it is +// empty or required is non-positive — an empty slice would panic on the index write. +func expandFanoutsForCapacity(fanouts []int, required int) []int { + if required <= 0 || len(fanouts) == 0 { + return fanouts + } + out := append([]int(nil), fanouts...) + for totalBaseBlockSlots(out) < required { + out[len(out)-1] *= 2 + } + return out +} + +// mergeBaseBlocksIntoTree fills tree leaf slots left-to-right from the ordered packed list. +func mergeBaseBlocksIntoTree(tree *aggregateBlockNode, packed []*baseBlockNode) { + slots := collectBaseBlockSlots(tree) + for i, bb := range packed { + if i >= len(slots) { + break + } + *slots[i] = *bb + } +} + +// packDomainsIntoBaseBlocks packs all domain hosts into baseBlockSize-sized blocks. +// Each domain's hosts are split into base blocks independently; no merging across +// domains is performed. When groupSize > 1, each domain's base block count is +// rounded up to the nearest multiple of groupSize by appending empty base blocks, so +// that each domain occupies complete aggregate groups within the tree. +func packDomainsIntoBaseBlocks(domains topology.DomainMap, baseBlockSize, groupSize int) []*baseBlockNode { + if baseBlockSize <= 0 { + return nil + } + domainIDs := slices.Sorted(maps.Keys(domains)) + if len(domainIDs) == 0 { + return nil + } + + var blocks []*baseBlockNode + for _, domainID := range domainIDs { + hosts := hostsSorted(domains[domainID]) + baseBlocks := splitIntoBaseBlocks(domainID, hosts, baseBlockSize) + blocks = append(blocks, baseBlocks...) + // Pad to a multiple of groupSize so the domain fills complete aggregate groups. + if groupSize > 1 { + n := len(baseBlocks) + padded := ((n + groupSize - 1) / groupSize) * groupSize + for i := n; i < padded; i++ { + blocks = append(blocks, newEmptyBaseBlock(baseBlockSize)) + } + } + } + return blocks +} + +// splitIntoBaseBlocks splits a sorted host list into one or more base blocks of at +// most baseBlockSize leaves each. Overflow blocks get a "#N" suffix on the ID. +func splitIntoBaseBlocks(id string, hosts []*topology.HostInfo, baseBlockSize int) []*baseBlockNode { + blocks := make([]*baseBlockNode, 0, (len(hosts)+baseBlockSize-1)/baseBlockSize) + for start := 0; start < len(hosts); start += baseBlockSize { + end := start + baseBlockSize + if end > len(hosts) { + end = len(hosts) + } + blockID := id + if len(blocks) > 0 { + blockID = fmt.Sprintf("%s#%d", id, len(blocks)+1) + } + blocks = append(blocks, newBaseBlock(blockID, hosts[start:end], baseBlockSize)) + } + return blocks +} + +// hostsSorted returns hosts in deterministic alphabetical order so that block +// packing is reproducible across runs. +func hostsSorted(hosts map[string]*topology.HostInfo) []*topology.HostInfo { + list := make([]*topology.HostInfo, 0, len(hosts)) + for _, h := range hosts { + list = append(list, h) + } + sortHostsByName(list) + return list +} + +// collectBaseBlockSlots returns all base blocks in the tree via a left-to-right DFS. +// The returned order is identical to the slot order used by mergeBaseBlocksIntoTree, +// so the slice can be indexed by the same position numbers. +func collectBaseBlockSlots(tree *aggregateBlockNode) []*baseBlockNode { + var slots []*baseBlockNode + var walk func(blockTreeNode) + walk = func(n blockTreeNode) { + switch c := n.(type) { + case *baseBlockNode: + slots = append(slots, c) + case *aggregateBlockNode: + for _, ch := range c.children { + walk(ch) + } + } + } + walk(tree) + return slots +} + +// blocksFromShapedTree converts the tree's filled base-block slots to named blockInfo +// records, stopping at required slots so unused trailing capacity is not emitted. +func blocksFromShapedTree(tree *aggregateBlockNode, byName map[string]*blockInfo, required int) []*blockInfo { + slots := collectBaseBlockSlots(tree) + out := make([]*blockInfo, 0, required) + for i, bb := range slots { + if i >= required { + break + } + out = append(out, baseBlockToBlockInfo(bb, byName, i+1)) + } + return out +} + +// isEmptyBlock reports whether a block carries neither a name nor any nodes. +// A block with a name but no nodes is a valid placeholder — the domain is +// identified but no live hosts were assigned — and is not considered empty. +func isEmptyBlock(b *blockInfo) bool { + return b == nil || (len(b.name) == 0 && len(b.nodes) == 0) +} + +// baseBlockToBlockInfo resolves a base block to a blockInfo using a priority fallback +// chain, because not all blocks have live hosts attached to their leaves: +// 1. Host names directly in leaves (live hosts — normal case) +// 2. Domain IDs from leaves → byName lookup (placeholder hosts: Domain set, HostName empty) +// 3. Domain ID as display name with no nodes (domain known, host list missing entirely) +// 4. Empty blockInfo (tree slot was never filled) +func baseBlockToBlockInfo(bb *baseBlockNode, byName map[string]*blockInfo, seq int) *blockInfo { + id := fmt.Sprintf("block%03d", seq) + domainID := bb.domainIdentifier() + nodes := hostNamesFromLeaves(bb.leaves) + if len(nodes) > 0 { + return &blockInfo{id: id, name: blockDisplayName(bb.id, domainID), nodes: nodes} + } + for _, domain := range domainIDsFromLeaves(bb.leaves) { + if b := byName[domain]; b != nil { + return &blockInfo{ + id: id, + name: blockDisplayName(bb.id, domain), + nodes: append([]string(nil), b.nodes...), + } + } + } + if domainID != "" { + return &blockInfo{id: id, name: blockDisplayName(bb.id, domainID)} + } + return &blockInfo{id: id} +} + +func blockDisplayName(blockID, primarydomain string) string { + if primarydomain != "" { + return primarydomain + } + return blockID +} + +// domainIDsFromLeaves collects unique domainID values from leaf hosts. +// Sorted for determinism; used as a fallback key set in baseBlockToBlockInfo. +func domainIDsFromLeaves(leaves []*hostNode) []string { + seen := make(map[string]struct{}) + var ids []string + for _, leaf := range leaves { + if leaf.host == nil || leaf.host.Domain == "" { + continue + } + if _, ok := seen[leaf.host.Domain]; ok { + continue + } + seen[leaf.host.Domain] = struct{}{} + ids = append(ids, leaf.host.Domain) + } + sort.Strings(ids) + return ids +} + + +func hostNamesFromLeaves(leaves []*hostNode) []string { + nodes := make([]string, 0, len(leaves)) + for _, leaf := range leaves { + if leaf.host == nil || leaf.host.HostName == "" { + continue + } + nodes = append(nodes, leaf.host.HostName) + } + return nodes +} + +// extractDomainID returns the primary domain ID from a possibly compound block ID. +// It strips everything from the first compound separator onward: +// +// "acc-a+acc-b" → "acc-a" (merged block; separator produced by combinedBlockID) +// "acc/d0" → "acc" (domain-qualified path) +// "acc#2" → "acc" (overflow block produced by splitIntoBaseBlocks) +func extractDomainID(id string) string { + for i, r := range id { + if r == '/' || r == '#' || r == '+' { + return id[:i] + } + } + return id +} + +// newBaseBlock builds a baseBlockNode from a pre-sorted host list, filling slots +// left-to-right. Slots beyond the provided hosts remain empty placeholders. +func newBaseBlock(id string, hosts []*topology.HostInfo, baseBlockSize int) *baseBlockNode { + leaves := make([]*hostNode, baseBlockSize) + for i := range leaves { + leaves[i] = &hostNode{} + } + for i, h := range hosts { + if i >= baseBlockSize { + break + } + leaves[i] = &hostNode{host: h} + } + return &baseBlockNode{id: id, domain: extractDomainID(id), leaves: leaves} +} + +func newEmptyBaseBlock(baseBlockSize int) *baseBlockNode { + if baseBlockSize <= 0 { + return &baseBlockNode{} + } + leaves := make([]*hostNode, baseBlockSize) + for i := range leaves { + leaves[i] = &hostNode{} + } + return &baseBlockNode{leaves: leaves} +} + +// sortHostsByName sorts hosts alphabetically by HostName for deterministic packing. +func sortHostsByName(hosts []*topology.HostInfo) { + sort.Slice(hosts, func(i, j int) bool { + return hosts[i].HostName < hosts[j].HostName + }) +} + +// fanoutAtLevel returns the child count for the given tree level. fanouts is ordered +// leaf-to-root (bottom-up), but the tree is built top-down, so level 0 reads from the +// end of the slice (the outermost tier) and deeper levels read toward the front. +func fanoutAtLevel(level int, fanouts []int) int { + idx := len(fanouts) - 1 - level + if idx < 0 { + idx = 0 + } + return fanouts[idx] +} diff --git a/pkg/translate/block_tree_test.go b/pkg/translate/block_tree_test.go new file mode 100644 index 00000000..24baba31 --- /dev/null +++ b/pkg/translate/block_tree_test.go @@ -0,0 +1,220 @@ +/* + * Copyright 2026 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package translate + +import ( + "fmt" + "testing" + + "github.com/NVIDIA/topograph/pkg/topology" + "github.com/stretchr/testify/require" +) + +func TestSortHostsByName(t *testing.T) { + hosts := []*topology.HostInfo{ + {HostName: "z"}, + {HostName: "a"}, + {HostName: "m"}, + } + sortHostsByName(hosts) + require.Equal(t, []string{"a", "m", "z"}, []string{hosts[0].HostName, hosts[1].HostName, hosts[2].HostName}) +} + +// TestBaseBlockFillsSlotLeftToRight verifies that hosts fill base block slots left to +// right and that slots beyond the provided hosts remain empty placeholders. +func TestBaseBlockFillsSlotLeftToRight(t *testing.T) { + bb := newBaseBlock("B1", []*topology.HostInfo{ + {HostName: "n0", Domain: "B1"}, + {HostName: "n1", Domain: "B1"}, + }, 4) + + require.Len(t, bb.leaves, 4) + require.NotNil(t, bb.leaves[0].host) + require.Equal(t, "n0", bb.leaves[0].host.HostName) + require.NotNil(t, bb.leaves[1].host) + require.Equal(t, "n1", bb.leaves[1].host.HostName) + require.Nil(t, bb.leaves[2].host) + require.Nil(t, bb.leaves[3].host) +} + +// TestBaseBlocksFromDomainsMultiplePerdomain verifies that multiple hosts under +// the same domainID are packed into a single base block. +func TestBaseBlocksFromDomainsMultiplePerdomain(t *testing.T) { + domains := topology.NewDomainMap() + domains.AddHostInfo(&topology.HostInfo{ + Domain: "B1", + HostName: "n0", + }) + domains.AddHostInfo(&topology.HostInfo{ + Domain: "B1", + HostName: "n1", + }) + + packed := packDomainsIntoBaseBlocks(domains, 2, 0) + require.Len(t, packed, 1) + require.Equal(t, "B1", packed[0].id) + require.Len(t, hostNamesFromLeaves(packed[0].leaves), 2) +} + +// TestPackKeepsdomainsIndependent verifies that domains are never merged +// together even when each has fewer hosts than baseBlockSize. Each domain is +// packed independently into its own base block(s), with no cross-domain combining. +func TestPackKeepsdomainsIndependent(t *testing.T) { + domains := topology.NewDomainMap() + for _, accel := range []string{"B1", "B2", "B3"} { + for j := range 3 { + domains.AddHostInfo(&topology.HostInfo{ + Domain: accel, + HostName: fmt.Sprintf("%s-n%d", accel, j), + }) + } + } + + packed := packDomainsIntoBaseBlocks(domains, 8, 0) + require.Len(t, packed, 3) + require.Equal(t, "B1", packed[0].id) + require.Len(t, hostNamesFromLeaves(packed[0].leaves), 3) + require.Equal(t, "B2", packed[1].id) + require.Len(t, hostNamesFromLeaves(packed[1].leaves), 3) + require.Equal(t, "B3", packed[2].id) + require.Len(t, hostNamesFromLeaves(packed[2].leaves), 3) +} + +// TestPackSplitsWhenHostsExceedBlockSize verifies that a single domain with more +// hosts than baseBlockSize is split into multiple base blocks with "#N" ID suffixes. +func TestPackSplitsWhenHostsExceedBlockSize(t *testing.T) { + domains := topology.NewDomainMap() + for i := range 10 { + domains.AddHostInfo(&topology.HostInfo{ + Domain: "B1", + HostName: fmt.Sprintf("n%d", i), + }) + } + packed := packDomainsIntoBaseBlocks(domains, 4, 0) + require.Len(t, packed, 3) + require.Len(t, hostNamesFromLeaves(packed[0].leaves), 4) + require.Len(t, hostNamesFromLeaves(packed[2].leaves), 2) +} + +// TestShapedBlockTreeSlots verifies that two domains fill the two available tree +// slots in sorted order when the tree has exactly the needed capacity. +func TestShapedBlockTreeSlots(t *testing.T) { + domains := topology.NewDomainMap() + domains.AddHostInfo(&topology.HostInfo{ + Domain: "B1", + HostName: "n1", + InstanceID: "i1", + }) + domains.AddHostInfo(&topology.HostInfo{ + Domain: "B3", + HostName: "n3", + InstanceID: "i3", + }) + + fanouts, ok := fanoutsPerLevel([]int{4, 8}) + require.True(t, ok) + const baseBlockSize = 4 + packed := packDomainsIntoBaseBlocks(domains, baseBlockSize, 0) + expandedFanouts := expandFanoutsForCapacity(fanouts, len(packed)) + tree := buildAggregateShape(expandedFanouts, baseBlockSize) + mergeBaseBlocksIntoTree(tree, packed) + slots := collectBaseBlockSlots(tree) + require.Len(t, slots, 2) + require.Equal(t, "B1", slots[0].domainIdentifier()) + require.Equal(t, "B3", slots[1].domainIdentifier()) +} + +// TestBlocksFromShapedTreeFillsSequentially verifies that blocks fill left-to-right +// in sorted domain order regardless of domain ID format. Each domain +// has baseBlockSize hosts so they pack independently (no merging). +func TestBlocksFromShapedTreeFillsSequentially(t *testing.T) { + fanouts, ok := fanoutsPerLevel([]int{4, 8, 16}) + require.True(t, ok) + domains := topology.NewDomainMap() + accels := []string{"gpu-clique-a", "gpu-clique-b", "gpu-clique-c"} + for _, accel := range accels { + for i := range 4 { + domains.AddHostInfo(&topology.HostInfo{ + Domain: accel, + HostName: fmt.Sprintf("%s-n%d", accel, i), + }) + } + } + + const baseBlockSize = 4 + packed := packDomainsIntoBaseBlocks(domains, baseBlockSize, 0) + expandedFanouts := expandFanoutsForCapacity(fanouts, len(packed)) + tree := buildAggregateShape(expandedFanouts, baseBlockSize) + mergeBaseBlocksIntoTree(tree, packed) + byName := map[string]*blockInfo{ + "gpu-clique-a": {name: "gpu-clique-a", nodes: []string{"gpu-clique-a-n0"}}, + "gpu-clique-b": {name: "gpu-clique-b", nodes: []string{"gpu-clique-b-n0"}}, + "gpu-clique-c": {name: "gpu-clique-c", nodes: []string{"gpu-clique-c-n0"}}, + } + out := blocksFromShapedTree(tree, byName, 3) + require.Len(t, out, 3) + require.Equal(t, "gpu-clique-a", out[0].name) + require.Equal(t, "gpu-clique-b", out[1].name) + require.Equal(t, "gpu-clique-c", out[2].name) +} + + +// TestSplitIntoBaseBlocksChunksExcessHosts verifies that 12 hosts with a blockSize of 4 +// produce exactly 3 blocks, each fully populated, filling slots left-to-right. +func TestSplitIntoBaseBlocksChunksExcessHosts(t *testing.T) { + hosts := make([]*topology.HostInfo, 12) + for i := range 12 { + hosts[i] = &topology.HostInfo{ + HostName: fmt.Sprintf("n%d", i), + Domain: "B1", + } + } + sortHostsByName(hosts) + blocks := splitIntoBaseBlocks("B1", hosts, 4) + require.Len(t, blocks, 3) + require.Len(t, blocks[0].leaves, 4) + require.Len(t, hostNamesFromLeaves(blocks[0].leaves), 4) + require.Len(t, hostNamesFromLeaves(blocks[1].leaves), 4) + require.Len(t, hostNamesFromLeaves(blocks[2].leaves), 4) +} + +func TestExpandFanoutsForCapacity(t *testing.T) { + require.Equal(t, 4, totalBaseBlockSlots([]int{2, 2})) + require.Equal(t, []int{2, 8}, expandFanoutsForCapacity([]int{2, 2}, 12)) + require.Equal(t, 16, totalBaseBlockSlots(expandFanoutsForCapacity([]int{2, 2}, 12))) + // Empty fanout slice must not panic. + require.Equal(t, []int(nil), expandFanoutsForCapacity(nil, 4)) + require.Equal(t, []int{}, expandFanoutsForCapacity([]int{}, 4)) + // Non-positive required must return fanouts unchanged. + require.Equal(t, []int{2, 2}, expandFanoutsForCapacity([]int{2, 2}, 0)) + require.Equal(t, []int{2, 2}, expandFanoutsForCapacity([]int{2, 2}, -1)) +} + +// TestShapedTreeExpandsForExcessHosts verifies that when required base blocks exceed +// the initial tree capacity, the last fanout tier is doubled until all hosts fit. +func TestShapedTreeExpandsForExcessHosts(t *testing.T) { + domains := topology.NewDomainMap() + for i := range 12 { + domains.AddHostInfo(&topology.HostInfo{ + Domain: "B1", + HostName: fmt.Sprintf("n%d", i), + }) + } + fanouts, ok := fanoutsPerLevel([]int{4, 8, 16}) + require.True(t, ok) + const baseBlockSize = 4 + packed := packDomainsIntoBaseBlocks(domains, baseBlockSize, 0) + expandedFanouts := expandFanoutsForCapacity(fanouts, len(packed)) + tree := buildAggregateShape(expandedFanouts, baseBlockSize) + mergeBaseBlocksIntoTree(tree, packed) + slots := collectBaseBlockSlots(tree) + require.GreaterOrEqual(t, len(slots), 3) + var allNodes []string + for _, s := range slots { + allNodes = append(allNodes, hostNamesFromLeaves(s.leaves)...) + } + require.Len(t, allNodes, 12) +} diff --git a/pkg/translate/topology.go b/pkg/translate/topology.go index fa73d23e..b1e09173 100644 --- a/pkg/translate/topology.go +++ b/pkg/translate/topology.go @@ -35,6 +35,7 @@ type TopologySpec struct { type NetworkTopology struct { config *Config tree map[string][]string // adjacency list + domains topology.DomainMap // accelerator domains from provider graph blocks []*blockInfo // blocks vertices map[string]*topology.Vertex // object ID to Vertex map nodeInfo map[string]*nodeInfo // node name to nodeInfo map @@ -184,6 +185,7 @@ func (nt *NetworkTopology) initBlocks(graph *topology.Graph) { return } + nt.domains = graph.Domains domainBlocks := toBlockInfos(graph.Domains) nt.blocks = make([]*blockInfo, 0, len(domainBlocks)) indx := 0 @@ -192,8 +194,13 @@ func (nt *NetworkTopology) initBlocks(graph *topology.Graph) { for _, bInfo := range domainBlocks { bInfo.indx = indx for _, node := range bInfo.nodes { + hostInfo := graph.Domains[bInfo.name][node] + if hostInfo == nil { + klog.Warningf("initBlocks: missing host info for node %q in domain %q", node, bInfo.name) + continue + } nt.nodeInfo[node] = &nodeInfo{ - instanceID: graph.Domains[bInfo.name][node], + instanceID: hostInfo.InstanceID, blockID: bInfo.id, blockIndx: ptr.Int(indx), } diff --git a/pkg/translate/yaml.go b/pkg/translate/yaml.go index cea1bc58..d8c62a23 100644 --- a/pkg/translate/yaml.go +++ b/pkg/translate/yaml.go @@ -149,8 +149,13 @@ func (nt *NetworkTopology) getBlockTopologyUnit(topoName string, topoSpec *Topol indx := *info.blockIndx bInfo, ok := blockMap[indx] if !ok { + name := "" + if indx < len(nt.blocks) { + name = nt.blocks[indx].name + } blockMap[indx] = &blockInfo{ indx: indx, + name: name, nodes: []string{nodeName}, } } else { @@ -175,15 +180,18 @@ func (nt *NetworkTopology) getBlockTopologyUnit(topoName string, topoSpec *Topol return bInfos[i].indx < bInfos[j].indx }) + bInfos = nt.complementBlocks(bInfos, topoSpec.BlockSizes) + // populate block topology units ordered by block indices blocks := make([]*Block, 0, len(bInfos)) parents := make(map[string]string) for indx, bInfo := range bInfos { blockName := fmt.Sprintf("block%d", indx+1) - blocks = append(blocks, &Block{ - Name: blockName, - Nodes: strings.Join(cluset.Compact(bInfo.nodes), ","), - }) + block := &Block{Name: blockName} + if len(bInfo.nodes) != 0 { + block.Nodes = strings.Join(cluset.Compact(bInfo.nodes), ",") + } + blocks = append(blocks, block) for _, nodeName := range bInfo.nodes { parents[nodeName] = blockName diff --git a/pkg/translate/yaml_test.go b/pkg/translate/yaml_test.go index a5f623fd..9d8edbe3 100644 --- a/pkg/translate/yaml_test.go +++ b/pkg/translate/yaml_test.go @@ -349,3 +349,59 @@ func TestEmptyPartitionTopology(t *testing.T) { require.Nil(t, err) require.Equal(t, expectedSkeleton, buf.String()) } + +// TestGetBlockTopologyUnitComplementEmptyNodes verifies that per-partition block +// topology pads each accelerator's base blocks to a complete aggregate group. a2 has +// only 2 nodes (1 base block) but groupSize=2 requires 2 slots, so an empty block4 is +// inserted. Tree-capacity expansion rounds 3 groups to 4, adding block7 and block8. +func TestGetBlockTopologyUnitComplementEmptyNodes(t *testing.T) { + expected := `- topology: topo1 + cluster_default: false + block: + block_sizes: + - 2 + - 4 + blocks: + - block: block1 + nodes: n[10-11] + - block: block2 + nodes: n12 + - block: block3 + nodes: n[20-21] + - block: block4 + - block: block5 + nodes: n[31-32] + - block: block6 + nodes: n33 + - block: block7 + - block: block8 +` + domains := topology.NewDomainMap() + for _, n := range []string{"n10", "n11", "n12"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a1", HostName: n, InstanceID: n}) + } + for _, n := range []string{"n20", "n21"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a2", HostName: n, InstanceID: n}) + } + for _, n := range []string{"n31", "n32", "n33"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a3", HostName: n, InstanceID: n}) + } + + cfg := &Config{ + Topologies: map[string]*TopologySpec{ + "topo1": { + Plugin: topology.TopologyBlock, + Nodes: []string{"n[10-12]", "n[20-21]", "n[31-33]"}, + BlockSizes: []int{2, 4}, + }, + }, + } + + graph := &topology.Graph{Domains: domains} + nt, err := NewNetworkTopology(graph, cfg) + require.NoError(t, err) + + buf := &bytes.Buffer{} + require.Nil(t, nt.Generate(buf)) + require.Equal(t, expected, buf.String()) +} From f59beefee8f64b8158770c4e889bd32af095bfdf Mon Sep 17 00:00:00 2001 From: Dmitry Shmulevich Date: Thu, 11 Jun 2026 08:33:27 -0700 Subject: [PATCH 2/2] replace the temporary block tree used for complementing with direct flat slot packing Signed-off-by: Dmitry Shmulevich --- pkg/engines/slinky/engine.go | 6 +- pkg/engines/slurm/slurm.go | 44 +++- pkg/engines/slurm/slurm_test.go | 49 +++- pkg/translate/block_complement.go | 103 ++------ pkg/translate/block_complement_test.go | 39 ++- pkg/translate/block_pack.go | 174 +++++++++++++ pkg/translate/block_pack_test.go | 116 +++++++++ pkg/translate/block_tree.go | 337 ------------------------- pkg/translate/block_tree_test.go | 220 ---------------- 9 files changed, 417 insertions(+), 671 deletions(-) create mode 100644 pkg/translate/block_pack.go create mode 100644 pkg/translate/block_pack_test.go delete mode 100644 pkg/translate/block_tree.go delete mode 100644 pkg/translate/block_tree_test.go diff --git a/pkg/engines/slinky/engine.go b/pkg/engines/slinky/engine.go index 0e80c5bf..30f18f83 100644 --- a/pkg/engines/slinky/engine.go +++ b/pkg/engines/slinky/engine.go @@ -278,9 +278,9 @@ func (eng *SlinkyEngine) GenerateOutput(ctx context.Context, graph *topology.Gra GetPartitionNodes: eng.getPartitionNodes, Params: []any{p.Namespace}, } - cfg, err := slurm.GetTranslateConfig(ctx, &p.BaseParams, resolvedTopologies, topologyNodeFinder) - if err != nil { - return nil, httperr.NewError(http.StatusInternalServerError, err.Error()) + cfg, httpErr := slurm.GetTranslateConfig(ctx, &p.BaseParams, resolvedTopologies, topologyNodeFinder) + if httpErr != nil { + return nil, httpErr } nt, err := translate.NewNetworkTopology(graph, cfg) diff --git a/pkg/engines/slurm/slurm.go b/pkg/engines/slurm/slurm.go index c5e19c6d..3bc64049 100644 --- a/pkg/engines/slurm/slurm.go +++ b/pkg/engines/slurm/slurm.go @@ -234,9 +234,9 @@ func GenerateOutputParams(ctx context.Context, graph *topology.Graph, params *Pa params.Plugin = topology.TopologyTree } - cfg, err := GetTranslateConfig(ctx, ¶ms.BaseParams, params.Topologies, &TopologyNodeFinder{GetPartitionNodes: getPartitionNodes}) - if err != nil { - return nil, httperr.NewError(http.StatusInternalServerError, err.Error()) + cfg, httpErr := GetTranslateConfig(ctx, ¶ms.BaseParams, params.Topologies, &TopologyNodeFinder{GetPartitionNodes: getPartitionNodes}) + if httpErr != nil { + return nil, httpErr } nt, err := translate.NewNetworkTopology(graph, cfg) @@ -252,7 +252,7 @@ func GenerateOutputParams(ctx context.Context, graph *topology.Graph, params *Pa } } - if httpErr := nt.Generate(buf); httpErr != nil { + if httpErr = nt.Generate(buf); httpErr != nil { return nil, httpErr } @@ -276,7 +276,11 @@ func GenerateOutputParams(ctx context.Context, graph *topology.Graph, params *Pa return []byte("OK\n"), nil } -func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[string]*Topology, f *TopologyNodeFinder) (*translate.Config, error) { +func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[string]*Topology, f *TopologyNodeFinder) (*translate.Config, *httperr.Error) { + if err := validateBlockSizes(params.BlockSizes); err != nil { + return nil, httperr.NewError(http.StatusBadRequest, err.Error()) + } + cfg := &translate.Config{ Plugin: params.Plugin, BlockSizes: params.BlockSizes, @@ -286,6 +290,9 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[ if len(topologies) != 0 { cfg.Topologies = make(map[string]*translate.TopologySpec) for topo, sect := range topologies { + if err := validateBlockSizes(sect.BlockSizes); err != nil { + return nil, httperr.NewError(http.StatusBadRequest, fmt.Sprintf("topology %q: %v", topo, err)) + } spec := &translate.TopologySpec{ Plugin: sect.Plugin, BlockSizes: sect.BlockSizes, @@ -302,7 +309,7 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[ klog.V(4).Infof("%s %q discovered nodes %v", sect.Plugin, topo, nodes) spec.Nodes = nodes } else { - return nil, err + return nil, httperr.NewError(http.StatusInternalServerError, err.Error()) } } cfg.Topologies[topo] = spec @@ -312,6 +319,31 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[ return cfg, nil } +func validateBlockSizes(blockSizes []int) error { + if len(blockSizes) == 0 { + return nil + } + prev := blockSizes[0] + if prev <= 0 { + return fmt.Errorf("blockSizes[0]=%d must be positive", prev) + } + for i := 1; i < len(blockSizes); i++ { + cur := blockSizes[i] + if cur <= 0 { + return fmt.Errorf("blockSizes[%d]=%d must be positive", i, cur) + } + if cur <= prev || cur%prev != 0 { + return fmt.Errorf("blockSizes[%d]=%d must be greater than blockSizes[%d]=%d and be a multiple of power of two", i, cur, i-1, prev) + } + ratio := cur / prev + if ratio&(ratio-1) != 0 { + return fmt.Errorf("blockSizes[%d]/blockSizes[%d]=%d must be a multiple of power of two", i, i-1, ratio) + } + prev = cur + } + return nil +} + func getParams(params map[string]any) (*Params, error) { var p Params err := config.Decode(params, &p) diff --git a/pkg/engines/slurm/slurm_test.go b/pkg/engines/slurm/slurm_test.go index 6009acec..bebf4513 100644 --- a/pkg/engines/slurm/slurm_test.go +++ b/pkg/engines/slurm/slurm_test.go @@ -244,15 +244,31 @@ func TestGetTranslateConfig(t *testing.T) { name: "Case 2: valid blocksize", params: &BaseParams{ Plugin: topology.TopologyBlock, - BlockSizes: []int{2, 4, 8}, + BlockSizes: []int{2, 8, 32}, }, cfg: &translate.Config{ Plugin: topology.TopologyBlock, - BlockSizes: []int{2, 4, 8}, + BlockSizes: []int{2, 8, 32}, }, }, { - name: "Case 3: with invalid partition topology", + name: "Case 3: invalid top-level blocksize ratio", + params: &BaseParams{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{2, 6}, + }, + err: "blockSizes[1]/blockSizes[0]=3 must be a multiple of power of two", + }, + { + name: "Case 4: invalid top-level blocksize multiple", + params: &BaseParams{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{2, 5}, + }, + err: "blockSizes[1]=5 must be greater than blockSizes[0]=2 and be a multiple of power of two", + }, + { + name: "Case 5: invalid partition topology", params: &BaseParams{}, topologies: map[string]*Topology{ "topo1": { @@ -266,7 +282,7 @@ func TestGetTranslateConfig(t *testing.T) { err: "missing partition name", }, { - name: "Case 4: with valid partition topology", + name: "Case 6: with valid partition topology", params: &BaseParams{}, topologies: map[string]*Topology{ "default": { @@ -292,7 +308,7 @@ func TestGetTranslateConfig(t *testing.T) { }, }, { - name: "Case 5: explicit empty nodes do not use partition discovery", + name: "Case 7: explicit empty nodes do not use partition discovery", params: &BaseParams{}, topologies: map[string]*Topology{ "topo": { @@ -315,6 +331,18 @@ func TestGetTranslateConfig(t *testing.T) { }, }, }, + { + name: "Case 8: invalid partition blocksize ratio", + params: &BaseParams{}, + topologies: map[string]*Topology{ + "topo": { + Plugin: topology.TopologyBlock, + BlockSizes: []int{2, 6}, + Nodes: []string{"node[001-100]"}, + }, + }, + err: `topology "topo": blockSizes[1]/blockSizes[0]=3 must be a multiple of power of two`, + }, } for _, tc := range testCases { @@ -323,7 +351,7 @@ func TestGetTranslateConfig(t *testing.T) { if len(tc.err) != 0 { require.EqualError(t, err, tc.err) } else { - require.NoError(t, err) + require.Nil(t, err) require.Equal(t, tc.cfg, cfg) } }) @@ -360,7 +388,14 @@ SwitchName=S3 Nodes=Node[304-306] code: http.StatusBadRequest, }, { - name: "Case 3: valid input", + name: "Case 3: invalid semantic blocksize", + graph: graph, + params: map[string]any{"blockSizes": []int{2, 6}}, + err: "blockSizes[1]/blockSizes[0]=3 must be a multiple of power of two", + code: http.StatusBadRequest, + }, + { + name: "Case 4: valid input", graph: graph, cfg: cfg, }, diff --git a/pkg/translate/block_complement.go b/pkg/translate/block_complement.go index 00be2073..2b915d4a 100644 --- a/pkg/translate/block_complement.go +++ b/pkg/translate/block_complement.go @@ -5,31 +5,8 @@ package translate -import "github.com/NVIDIA/topograph/pkg/topology" - -// groupSizeFromDomains computes how many base blocks a fully-populated accelerator -// occupies, rounded up to the nearest power of two. It finds the maximum accelerator -// host count across all domains, then returns 2^n where 2^n * baseBlockSize is the -// smallest power-of-two multiple of baseBlockSize that is >= maxAcceleratorSize. -// Returns 1 when every accelerator fits within a single base block (no padding needed). -func groupSizeFromDomains(domains topology.DomainMap, baseBlockSize int) int { - maxNodes := 0 - for _, hosts := range domains { - if len(hosts) > maxNodes { - maxNodes = len(hosts) - } - } - groupSize := 1 - capacity := baseBlockSize - for capacity < maxNodes { - groupSize *= 2 - capacity *= 2 - } - return groupSize -} - -// complementBlocks builds a block tree shaped by BlockSizes, packs domain hosts into -// it, and returns the flat block list derived from low-level tree nodes. +// complementBlocks packs domain hosts into BlockSizes-shaped base-block slots and +// returns the resulting flat block list. // // Only domains for accelerators present in blocks are used so per-partition YAML // complementing is not masked by domains owned by other partitions in nt.domains. @@ -37,34 +14,32 @@ func groupSizeFromDomains(domains topology.DomainMap, baseBlockSize int) int { // The group size is derived from the maximum accelerator host count: it is the smallest // 2^n such that 2^n * baseBlockSize >= maxAcceleratorSize. Each accelerator's base // block count is then padded to a multiple of that groupSize so every accelerator -// occupies complete aggregate groups within the tree. +// occupies complete aggregate groups. func (nt *NetworkTopology) complementBlocks(blocks []*blockInfo, blockSizes []int) []*blockInfo { fanouts, ok := fanoutsPerLevel(blockSizes) if !ok || nt.domains == nil { return blocks } - domains := domainsForBlocks(nt.domains, blocks) - if len(domains) == 0 { + baseBlockSize := blockSizes[0] + domains, ok := orderedDomainsForBlocks(nt.domains, blocks) + if !ok { return blocks } - byName := blocksByName(blocks) - baseBlockSize := blockSizes[0] - groupSize := groupSizeFromDomains(domains, baseBlockSize) - packed := packDomainsIntoBaseBlocks(domains, baseBlockSize, groupSize) - expandedFanouts := expandFanoutsForCapacity(fanouts, len(packed)) - tree := buildAggregateShape(expandedFanouts, baseBlockSize) - mergeBaseBlocksIntoTree(tree, packed) + groupSize := groupSizeFromOrderedDomains(domains, baseBlockSize) + out := packOrderedDomainsIntoBlocks(domains, baseBlockSize, groupSize) // When no padding was added (packed count equals input count), stop at the exact // packed count so trailing tree-capacity slots do not falsely trigger complement usage. - required := len(packed) - if len(packed) != len(blocks) { - required = totalBaseBlockSlots(expandedFanouts) + if len(out) != len(blocks) { + required := expandedBaseBlockSlots(fanouts, len(out)) + for len(out) < required { + out = append(out, &blockInfo{}) + } } + assignSequentialBlockIDs(out) - out := blocksFromShapedTree(tree, byName, required) if !shouldUseComplementedBlocks(blocks, out) { return blocks } @@ -78,7 +53,7 @@ func (nt *NetworkTopology) complementBlocks(blocks []*blockInfo, blockSizes []in // - Count increase: an accelerator had more hosts than baseBlockSize and was split // into multiple base blocks, so the output is longer than the input. // -// A shorter output is never used: domainsForBlocks may skip blocks whose domain is +// A shorter output is never used: domain packing may skip blocks whose domain is // absent from the global map, producing fewer packed blocks than the input. Replacing // the input in that case would silently drop blocks. func shouldUseComplementedBlocks(input, out []*blockInfo) bool { @@ -91,54 +66,6 @@ func shouldUseComplementedBlocks(input, out []*blockInfo) bool { return len(out) > len(input) } -// domainsForBlocks returns a subset of the cluster domain map containing only the -// hosts that belong to the given partition-local blocks. For each block it intersects -// the global domain with the block's own node list, so that nodes owned by another -// partition in the same accelerator domain are never included. -func domainsForBlocks(all topology.DomainMap, blocks []*blockInfo) topology.DomainMap { - if all == nil { - return nil - } - local := topology.NewDomainMap() - for _, b := range blocks { - if b == nil || b.name == "" { - continue - } - hosts, ok := all[b.name] - if !ok { - continue - } - // Restrict to nodes the partition actually owns; a domain may span multiple - // partitions and the global map holds all of them. - partitionNodes := make(map[string]struct{}, len(b.nodes)) - for _, n := range b.nodes { - partitionNodes[n] = struct{}{} - } - for _, hi := range hosts { - if _, owned := partitionNodes[hi.HostName]; !owned { - continue - } - copy := *hi - local.AddHostInfo(©) - } - } - return local -} - -// blocksByName builds an accelerator-ID → blockInfo index used by baseBlockToBlockInfo -// to look up node lists when a block's leaves carry only an accelerator reference. -// Nil entries and blocks without a name are skipped, matching the guard in domainsForBlocks. -func blocksByName(blocks []*blockInfo) map[string]*blockInfo { - m := make(map[string]*blockInfo, len(blocks)) - for _, b := range blocks { - if b == nil || b.name == "" { - continue - } - m[b.name] = b - } - return m -} - // hasEmptyBlockSlots reports whether any interior slot in blocks is empty. // Trailing empty blocks are not considered complement slots because they arise from // tree capacity rounding and carry no structural meaning for the scheduler. diff --git a/pkg/translate/block_complement_test.go b/pkg/translate/block_complement_test.go index b25c776a..6a73a869 100644 --- a/pkg/translate/block_complement_test.go +++ b/pkg/translate/block_complement_test.go @@ -196,9 +196,9 @@ func TestComplementExcessHostsPerAccelerator(t *testing.T) { name := fmt.Sprintf("Node%03d", 100+i) nodes = append(nodes, name) domains.AddHostInfo(&topology.HostInfo{ - Domain: "B1", - HostName: name, - InstanceID: name, + Domain: "B1", + HostName: name, + InstanceID: name, }) } @@ -256,7 +256,7 @@ func TestComplementPartitionLocalDomainsOnly(t *testing.T) { // TestDomainsForBlocksFilteredToPartitionNodes is a regression test for cross-partition // node contamination. Domain B1 holds 4 nodes globally (n1–n4), but the partition-local -// blockInfo only lists n1, n2, n3. Without filtering, domainsForBlocks would copy all 4 +// blockInfo only lists n1, n2, n3. Without filtering, orderedDomainsForBlocks would copy all 4 // hosts and n4 would appear in the complemented output. With the fix, only n1–n3 are // used: the split produces two base blocks ([n1,n2] and [n3]) and n4 is absent. func TestDomainsForBlocksFilteredToPartitionNodes(t *testing.T) { @@ -293,11 +293,8 @@ func TestDomainsForBlocksFilteredToPartitionNodes(t *testing.T) { } // TestComplementPreservesInputWhenDomainsMissing verifies that complementBlocks -// returns the original block list unchanged when domainsForBlocks produces fewer -// packed blocks than the input. This happens when some blocks have no matching +// returns the original block list unchanged when an input block has no matching // entry in the global domain map (e.g. the block name was never registered). -// Before the fix, shouldUseComplementedBlocks treated len(out) < len(input) as a -// count mismatch warranting replacement, silently dropping the unmatched blocks. func TestComplementPreservesInputWhenDomainsMissing(t *testing.T) { domains := topology.NewDomainMap() // Only B1 is in the domain map; B2 is not. @@ -312,11 +309,33 @@ func TestComplementPreservesInputWhenDomainsMissing(t *testing.T) { {id: "block002", name: "B2", nodes: []string{"n3", "n4"}}, // no domain entry } out := nt.complementBlocks(input, []int{2, 4}) - // B2 is absent from the global domain map, so packed has only 1 block vs 2 input - // blocks. The complement output would be shorter — must fall back to original input. require.Equal(t, input, out) } +// TestComplementPreservesTopologyBlockOrder verifies that complementing follows the +// already established block order rather than sorting domain IDs alphabetically. +func TestComplementPreservesTopologyBlockOrder(t *testing.T) { + domains := topology.NewDomainMap() + for _, n := range []string{"b1n1", "b1n2", "b1n3"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B1", HostName: n, InstanceID: n}) + } + for _, n := range []string{"b2n1", "b2n2", "b2n3"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B2", HostName: n, InstanceID: n}) + } + nt := &NetworkTopology{domains: domains} + input := []*blockInfo{ + {id: "block001", name: "B2", nodes: []string{"b2n1", "b2n2", "b2n3"}}, + {id: "block002", name: "B1", nodes: []string{"b1n1", "b1n2", "b1n3"}}, + } + + out := nt.complementBlocks(input, []int{2, 4}) + require.Len(t, out, 4) + require.Equal(t, "B2", out[0].name) + require.Equal(t, "B2", out[1].name) + require.Equal(t, "B1", out[2].name) + require.Equal(t, "B1", out[3].name) +} + // TestGetBlockTopologyUnitWithMultiAcceleratorDomains verifies the YAML per-partition // complement path end-to-end: two domains, three accelerators (a1, a2, a3), block // sizes [2,4]. a2 is undersized (fewer nodes than groupSize=2 requires), so it gets diff --git a/pkg/translate/block_pack.go b/pkg/translate/block_pack.go new file mode 100644 index 00000000..9edd036b --- /dev/null +++ b/pkg/translate/block_pack.go @@ -0,0 +1,174 @@ +/* + * Copyright 2026 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package translate + +import ( + "fmt" + "sort" + + "github.com/NVIDIA/topograph/pkg/topology" +) + +type orderedDomain struct { + name string + hosts []*topology.HostInfo +} + +// orderedDomainsForBlocks returns partition-local domain hosts in the same order as +// blocks. Each domain is filtered to the block's node list so partitions sharing an +// accelerator domain cannot leak nodes into one another. +func orderedDomainsForBlocks(all topology.DomainMap, blocks []*blockInfo) ([]orderedDomain, bool) { + if all == nil { + return nil, false + } + + domains := make([]orderedDomain, 0, len(blocks)) + for _, b := range blocks { + hosts, ok := hostsForBlock(all, b) + if !ok { + return nil, false + } + domains = append(domains, orderedDomain{name: b.name, hosts: hosts}) + } + return domains, len(domains) != 0 +} + +func hostsForBlock(all topology.DomainMap, b *blockInfo) ([]*topology.HostInfo, bool) { + if b == nil || b.name == "" || len(b.nodes) == 0 { + return nil, false + } + hostsByName, ok := all[b.name] + if !ok { + return nil, false + } + + hosts := make([]*topology.HostInfo, 0, len(b.nodes)) + seen := make(map[string]struct{}, len(b.nodes)) + for _, node := range b.nodes { + if _, ok := seen[node]; ok { + continue + } + seen[node] = struct{}{} + + host := hostsByName[node] + if host == nil { + return nil, false + } + hosts = append(hosts, host) + } + if len(hosts) == 0 { + return nil, false + } + sortHostsByName(hosts) + return hosts, true +} + +// groupSizeFromOrderedDomains computes how many base blocks a fully populated +// accelerator occupies, rounded up to the nearest power of two. +func groupSizeFromOrderedDomains(domains []orderedDomain, baseBlockSize int) int { + maxNodes := 0 + for _, domain := range domains { + if len(domain.hosts) > maxNodes { + maxNodes = len(domain.hosts) + } + } + groupSize := 1 + capacity := baseBlockSize + for capacity < maxNodes { + groupSize *= 2 + capacity *= 2 + } + return groupSize +} + +func packOrderedDomainsIntoBlocks(domains []orderedDomain, baseBlockSize, groupSize int) []*blockInfo { + if baseBlockSize <= 0 { + return nil + } + + var blocks []*blockInfo + for _, domain := range domains { + before := len(blocks) + blocks = append(blocks, splitHostsIntoBlocks(domain.name, domain.hosts, baseBlockSize)...) + if groupSize > 1 { + blocks = padDomainGroup(blocks, len(blocks)-before, groupSize) + } + } + return blocks +} + +func splitHostsIntoBlocks(domainName string, hosts []*topology.HostInfo, baseBlockSize int) []*blockInfo { + blocks := make([]*blockInfo, 0, (len(hosts)+baseBlockSize-1)/baseBlockSize) + for start := 0; start < len(hosts); start += baseBlockSize { + end := start + baseBlockSize + if end > len(hosts) { + end = len(hosts) + } + blocks = append(blocks, &blockInfo{ + name: domainName, + nodes: hostNames(hosts[start:end]), + }) + } + return blocks +} + +func padDomainGroup(blocks []*blockInfo, realBlockCount, groupSize int) []*blockInfo { + padded := roundUpToMultiple(realBlockCount, groupSize) + for i := realBlockCount; i < padded; i++ { + blocks = append(blocks, &blockInfo{}) + } + return blocks +} + +func roundUpToMultiple(n, multiple int) int { + if multiple <= 1 { + return n + } + return ((n + multiple - 1) / multiple) * multiple +} + +func expandedBaseBlockSlots(fanouts []int, required int) int { + capacity := totalBaseBlockSlots(fanouts) + for capacity < required { + capacity *= 2 + } + return capacity +} + +func totalBaseBlockSlots(fanouts []int) int { + n := 1 + for _, f := range fanouts { + n *= f + } + return n +} + +func assignSequentialBlockIDs(blocks []*blockInfo) { + for i, block := range blocks { + block.id = fmt.Sprintf("block%03d", i+1) + } +} + +func isEmptyBlock(b *blockInfo) bool { + return b == nil || (len(b.name) == 0 && len(b.nodes) == 0) +} + +func hostNames(hosts []*topology.HostInfo) []string { + nodes := make([]string, 0, len(hosts)) + for _, host := range hosts { + if host == nil || host.HostName == "" { + continue + } + nodes = append(nodes, host.HostName) + } + return nodes +} + +func sortHostsByName(hosts []*topology.HostInfo) { + sort.Slice(hosts, func(i, j int) bool { + return hosts[i].HostName < hosts[j].HostName + }) +} diff --git a/pkg/translate/block_pack_test.go b/pkg/translate/block_pack_test.go new file mode 100644 index 00000000..37151002 --- /dev/null +++ b/pkg/translate/block_pack_test.go @@ -0,0 +1,116 @@ +/* + * Copyright 2026 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package translate + +import ( + "fmt" + "testing" + + "github.com/NVIDIA/topograph/pkg/topology" + "github.com/stretchr/testify/require" +) + +func TestSortHostsByName(t *testing.T) { + hosts := []*topology.HostInfo{ + {HostName: "z"}, + {HostName: "a"}, + {HostName: "m"}, + } + sortHostsByName(hosts) + require.Equal(t, []string{"a", "m", "z"}, []string{hosts[0].HostName, hosts[1].HostName, hosts[2].HostName}) +} + +func TestOrderedDomainsForBlocksPreservesOrderAndFiltersPartitionNodes(t *testing.T) { + domains := topology.NewDomainMap() + for _, node := range []string{"n20", "n21", "n22"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B2", HostName: node}) + } + for _, node := range []string{"n10", "n11", "n12"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B1", HostName: node}) + } + + ordered, ok := orderedDomainsForBlocks(domains, []*blockInfo{ + {name: "B2", nodes: []string{"n22", "n20"}}, + {name: "B1", nodes: []string{"n11"}}, + }) + require.True(t, ok) + require.Len(t, ordered, 2) + require.Equal(t, "B2", ordered[0].name) + require.Equal(t, []string{"n20", "n22"}, hostNames(ordered[0].hosts)) + require.Equal(t, "B1", ordered[1].name) + require.Equal(t, []string{"n11"}, hostNames(ordered[1].hosts)) +} + +func TestPackKeepsDomainsIndependent(t *testing.T) { + domains := []orderedDomain{ + testOrderedDomain("B1", "B1-n0", "B1-n1", "B1-n2"), + testOrderedDomain("B2", "B2-n0", "B2-n1", "B2-n2"), + testOrderedDomain("B3", "B3-n0", "B3-n1", "B3-n2"), + } + + packed := packOrderedDomainsIntoBlocks(domains, 8, 1) + require.Len(t, packed, 3) + require.Equal(t, "B1", packed[0].name) + require.Len(t, packed[0].nodes, 3) + require.Equal(t, "B2", packed[1].name) + require.Len(t, packed[1].nodes, 3) + require.Equal(t, "B3", packed[2].name) + require.Len(t, packed[2].nodes, 3) +} + +func TestPackSplitsWhenHostsExceedBlockSize(t *testing.T) { + hosts := make([]string, 0, 10) + for i := range 10 { + hosts = append(hosts, fmt.Sprintf("n%02d", i)) + } + + packed := packOrderedDomainsIntoBlocks([]orderedDomain{testOrderedDomain("B1", hosts...)}, 4, 1) + require.Len(t, packed, 3) + require.Equal(t, []string{"n00", "n01", "n02", "n03"}, packed[0].nodes) + require.Equal(t, []string{"n04", "n05", "n06", "n07"}, packed[1].nodes) + require.Equal(t, []string{"n08", "n09"}, packed[2].nodes) +} + +func TestPackPadsEachDomainToGroupSize(t *testing.T) { + domains := []orderedDomain{ + testOrderedDomain("a1", "n10", "n11", "n12"), + testOrderedDomain("a2", "n20", "n21"), + testOrderedDomain("a3", "n31", "n32", "n33"), + } + + packed := packOrderedDomainsIntoBlocks(domains, 2, 2) + require.Len(t, packed, 6) + require.Equal(t, "a1", packed[0].name) + require.Equal(t, "a1", packed[1].name) + require.Equal(t, "a2", packed[2].name) + require.True(t, isEmptyBlock(packed[3])) + require.Equal(t, "a3", packed[4].name) + require.Equal(t, "a3", packed[5].name) +} + +func TestExpandedBaseBlockSlots(t *testing.T) { + require.Equal(t, 4, totalBaseBlockSlots([]int{2, 2})) + require.Equal(t, 16, expandedBaseBlockSlots([]int{2, 2}, 12)) + require.Equal(t, 4, expandedBaseBlockSlots(nil, 4)) + require.Equal(t, 4, expandedBaseBlockSlots([]int{}, 4)) + require.Equal(t, 4, expandedBaseBlockSlots([]int{2, 2}, 0)) +} + +func TestAssignSequentialBlockIDs(t *testing.T) { + blocks := []*blockInfo{{name: "B1"}, {}, {name: "B2"}} + assignSequentialBlockIDs(blocks) + require.Equal(t, "block001", blocks[0].id) + require.Equal(t, "block002", blocks[1].id) + require.Equal(t, "block003", blocks[2].id) +} + +func testOrderedDomain(name string, nodes ...string) orderedDomain { + hosts := make([]*topology.HostInfo, 0, len(nodes)) + for _, node := range nodes { + hosts = append(hosts, &topology.HostInfo{Domain: name, HostName: node}) + } + return orderedDomain{name: name, hosts: hosts} +} diff --git a/pkg/translate/block_tree.go b/pkg/translate/block_tree.go deleted file mode 100644 index 29560c6c..00000000 --- a/pkg/translate/block_tree.go +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Copyright 2026 NVIDIA CORPORATION - * SPDX-License-Identifier: Apache-2.0 - */ - -package translate - -import ( - "fmt" - "maps" - "slices" - "sort" - - "github.com/NVIDIA/topograph/pkg/topology" -) - -// blockTreeNode is implemented by host, base, and aggregate block nodes. -type blockTreeNode interface { - blockTreeNode() -} - -// hostNode is the lowermost tree level: a host slot or an empty placeholder (host == nil). -type hostNode struct { - host *topology.HostInfo -} - -func (*hostNode) blockTreeNode() {} - -// baseBlockNode is the Slurm base block level. It always holds exactly baseBlockSize -// host nodes; missing positions or hosts are nil-host placeholders. -type baseBlockNode struct { - id string - domain string // primary domain ID, pre-computed from id at construction - leaves []*hostNode -} - -func (*baseBlockNode) blockTreeNode() {} - -func (n *baseBlockNode) domainIdentifier() string { return n.domain } - -// aggregateBlockNode groups base blocks or other aggregates. An domain with -// multiple base blocks is represented as an aggregate of baseBlockNode children. -type aggregateBlockNode struct { - id string - children []blockTreeNode -} - -func (*aggregateBlockNode) blockTreeNode() {} - -// buildAggregateShape wraps the recursive shape builder so callers always receive an -// *aggregateBlockNode. When fanouts produces a single base block (no intermediate -// tiers), the base block is wrapped in a one-child aggregate. -func buildAggregateShape(fanouts []int, baseBlockSize int) *aggregateBlockNode { - node := buildShapeLevel(fanouts, 0, baseBlockSize) - agg, ok := node.(*aggregateBlockNode) - if !ok { - return &aggregateBlockNode{children: []blockTreeNode{node}} - } - return agg -} - -// buildShapeLevel recurses top-down through the fanout tiers. level == len(fanouts) -// is the base case; it emits an empty base block. Returns blockTreeNode because the -// base case produces *baseBlockNode while all other levels produce *aggregateBlockNode. -func buildShapeLevel(fanouts []int, level int, baseBlockSize int) blockTreeNode { - if level == len(fanouts) { - return newEmptyBaseBlock(baseBlockSize) - } - count := fanoutAtLevel(level, fanouts) - children := make([]blockTreeNode, count) - for i := range children { - children[i] = buildShapeLevel(fanouts, level+1, baseBlockSize) - } - return &aggregateBlockNode{children: children} -} - -// totalBaseBlockSlots returns the product of all fanout tiers, which equals the -// total number of base-block leaves in the shaped tree. -func totalBaseBlockSlots(fanouts []int) int { - n := 1 - for _, f := range fanouts { - n *= f - } - return n -} - -// expandFanoutsForCapacity grows the last fanout tier (power-of-two steps) until the -// tree has at least required base-block leaves. Returns fanouts unchanged when it is -// empty or required is non-positive — an empty slice would panic on the index write. -func expandFanoutsForCapacity(fanouts []int, required int) []int { - if required <= 0 || len(fanouts) == 0 { - return fanouts - } - out := append([]int(nil), fanouts...) - for totalBaseBlockSlots(out) < required { - out[len(out)-1] *= 2 - } - return out -} - -// mergeBaseBlocksIntoTree fills tree leaf slots left-to-right from the ordered packed list. -func mergeBaseBlocksIntoTree(tree *aggregateBlockNode, packed []*baseBlockNode) { - slots := collectBaseBlockSlots(tree) - for i, bb := range packed { - if i >= len(slots) { - break - } - *slots[i] = *bb - } -} - -// packDomainsIntoBaseBlocks packs all domain hosts into baseBlockSize-sized blocks. -// Each domain's hosts are split into base blocks independently; no merging across -// domains is performed. When groupSize > 1, each domain's base block count is -// rounded up to the nearest multiple of groupSize by appending empty base blocks, so -// that each domain occupies complete aggregate groups within the tree. -func packDomainsIntoBaseBlocks(domains topology.DomainMap, baseBlockSize, groupSize int) []*baseBlockNode { - if baseBlockSize <= 0 { - return nil - } - domainIDs := slices.Sorted(maps.Keys(domains)) - if len(domainIDs) == 0 { - return nil - } - - var blocks []*baseBlockNode - for _, domainID := range domainIDs { - hosts := hostsSorted(domains[domainID]) - baseBlocks := splitIntoBaseBlocks(domainID, hosts, baseBlockSize) - blocks = append(blocks, baseBlocks...) - // Pad to a multiple of groupSize so the domain fills complete aggregate groups. - if groupSize > 1 { - n := len(baseBlocks) - padded := ((n + groupSize - 1) / groupSize) * groupSize - for i := n; i < padded; i++ { - blocks = append(blocks, newEmptyBaseBlock(baseBlockSize)) - } - } - } - return blocks -} - -// splitIntoBaseBlocks splits a sorted host list into one or more base blocks of at -// most baseBlockSize leaves each. Overflow blocks get a "#N" suffix on the ID. -func splitIntoBaseBlocks(id string, hosts []*topology.HostInfo, baseBlockSize int) []*baseBlockNode { - blocks := make([]*baseBlockNode, 0, (len(hosts)+baseBlockSize-1)/baseBlockSize) - for start := 0; start < len(hosts); start += baseBlockSize { - end := start + baseBlockSize - if end > len(hosts) { - end = len(hosts) - } - blockID := id - if len(blocks) > 0 { - blockID = fmt.Sprintf("%s#%d", id, len(blocks)+1) - } - blocks = append(blocks, newBaseBlock(blockID, hosts[start:end], baseBlockSize)) - } - return blocks -} - -// hostsSorted returns hosts in deterministic alphabetical order so that block -// packing is reproducible across runs. -func hostsSorted(hosts map[string]*topology.HostInfo) []*topology.HostInfo { - list := make([]*topology.HostInfo, 0, len(hosts)) - for _, h := range hosts { - list = append(list, h) - } - sortHostsByName(list) - return list -} - -// collectBaseBlockSlots returns all base blocks in the tree via a left-to-right DFS. -// The returned order is identical to the slot order used by mergeBaseBlocksIntoTree, -// so the slice can be indexed by the same position numbers. -func collectBaseBlockSlots(tree *aggregateBlockNode) []*baseBlockNode { - var slots []*baseBlockNode - var walk func(blockTreeNode) - walk = func(n blockTreeNode) { - switch c := n.(type) { - case *baseBlockNode: - slots = append(slots, c) - case *aggregateBlockNode: - for _, ch := range c.children { - walk(ch) - } - } - } - walk(tree) - return slots -} - -// blocksFromShapedTree converts the tree's filled base-block slots to named blockInfo -// records, stopping at required slots so unused trailing capacity is not emitted. -func blocksFromShapedTree(tree *aggregateBlockNode, byName map[string]*blockInfo, required int) []*blockInfo { - slots := collectBaseBlockSlots(tree) - out := make([]*blockInfo, 0, required) - for i, bb := range slots { - if i >= required { - break - } - out = append(out, baseBlockToBlockInfo(bb, byName, i+1)) - } - return out -} - -// isEmptyBlock reports whether a block carries neither a name nor any nodes. -// A block with a name but no nodes is a valid placeholder — the domain is -// identified but no live hosts were assigned — and is not considered empty. -func isEmptyBlock(b *blockInfo) bool { - return b == nil || (len(b.name) == 0 && len(b.nodes) == 0) -} - -// baseBlockToBlockInfo resolves a base block to a blockInfo using a priority fallback -// chain, because not all blocks have live hosts attached to their leaves: -// 1. Host names directly in leaves (live hosts — normal case) -// 2. Domain IDs from leaves → byName lookup (placeholder hosts: Domain set, HostName empty) -// 3. Domain ID as display name with no nodes (domain known, host list missing entirely) -// 4. Empty blockInfo (tree slot was never filled) -func baseBlockToBlockInfo(bb *baseBlockNode, byName map[string]*blockInfo, seq int) *blockInfo { - id := fmt.Sprintf("block%03d", seq) - domainID := bb.domainIdentifier() - nodes := hostNamesFromLeaves(bb.leaves) - if len(nodes) > 0 { - return &blockInfo{id: id, name: blockDisplayName(bb.id, domainID), nodes: nodes} - } - for _, domain := range domainIDsFromLeaves(bb.leaves) { - if b := byName[domain]; b != nil { - return &blockInfo{ - id: id, - name: blockDisplayName(bb.id, domain), - nodes: append([]string(nil), b.nodes...), - } - } - } - if domainID != "" { - return &blockInfo{id: id, name: blockDisplayName(bb.id, domainID)} - } - return &blockInfo{id: id} -} - -func blockDisplayName(blockID, primarydomain string) string { - if primarydomain != "" { - return primarydomain - } - return blockID -} - -// domainIDsFromLeaves collects unique domainID values from leaf hosts. -// Sorted for determinism; used as a fallback key set in baseBlockToBlockInfo. -func domainIDsFromLeaves(leaves []*hostNode) []string { - seen := make(map[string]struct{}) - var ids []string - for _, leaf := range leaves { - if leaf.host == nil || leaf.host.Domain == "" { - continue - } - if _, ok := seen[leaf.host.Domain]; ok { - continue - } - seen[leaf.host.Domain] = struct{}{} - ids = append(ids, leaf.host.Domain) - } - sort.Strings(ids) - return ids -} - - -func hostNamesFromLeaves(leaves []*hostNode) []string { - nodes := make([]string, 0, len(leaves)) - for _, leaf := range leaves { - if leaf.host == nil || leaf.host.HostName == "" { - continue - } - nodes = append(nodes, leaf.host.HostName) - } - return nodes -} - -// extractDomainID returns the primary domain ID from a possibly compound block ID. -// It strips everything from the first compound separator onward: -// -// "acc-a+acc-b" → "acc-a" (merged block; separator produced by combinedBlockID) -// "acc/d0" → "acc" (domain-qualified path) -// "acc#2" → "acc" (overflow block produced by splitIntoBaseBlocks) -func extractDomainID(id string) string { - for i, r := range id { - if r == '/' || r == '#' || r == '+' { - return id[:i] - } - } - return id -} - -// newBaseBlock builds a baseBlockNode from a pre-sorted host list, filling slots -// left-to-right. Slots beyond the provided hosts remain empty placeholders. -func newBaseBlock(id string, hosts []*topology.HostInfo, baseBlockSize int) *baseBlockNode { - leaves := make([]*hostNode, baseBlockSize) - for i := range leaves { - leaves[i] = &hostNode{} - } - for i, h := range hosts { - if i >= baseBlockSize { - break - } - leaves[i] = &hostNode{host: h} - } - return &baseBlockNode{id: id, domain: extractDomainID(id), leaves: leaves} -} - -func newEmptyBaseBlock(baseBlockSize int) *baseBlockNode { - if baseBlockSize <= 0 { - return &baseBlockNode{} - } - leaves := make([]*hostNode, baseBlockSize) - for i := range leaves { - leaves[i] = &hostNode{} - } - return &baseBlockNode{leaves: leaves} -} - -// sortHostsByName sorts hosts alphabetically by HostName for deterministic packing. -func sortHostsByName(hosts []*topology.HostInfo) { - sort.Slice(hosts, func(i, j int) bool { - return hosts[i].HostName < hosts[j].HostName - }) -} - -// fanoutAtLevel returns the child count for the given tree level. fanouts is ordered -// leaf-to-root (bottom-up), but the tree is built top-down, so level 0 reads from the -// end of the slice (the outermost tier) and deeper levels read toward the front. -func fanoutAtLevel(level int, fanouts []int) int { - idx := len(fanouts) - 1 - level - if idx < 0 { - idx = 0 - } - return fanouts[idx] -} diff --git a/pkg/translate/block_tree_test.go b/pkg/translate/block_tree_test.go deleted file mode 100644 index 24baba31..00000000 --- a/pkg/translate/block_tree_test.go +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Copyright 2026 NVIDIA CORPORATION - * SPDX-License-Identifier: Apache-2.0 - */ - -package translate - -import ( - "fmt" - "testing" - - "github.com/NVIDIA/topograph/pkg/topology" - "github.com/stretchr/testify/require" -) - -func TestSortHostsByName(t *testing.T) { - hosts := []*topology.HostInfo{ - {HostName: "z"}, - {HostName: "a"}, - {HostName: "m"}, - } - sortHostsByName(hosts) - require.Equal(t, []string{"a", "m", "z"}, []string{hosts[0].HostName, hosts[1].HostName, hosts[2].HostName}) -} - -// TestBaseBlockFillsSlotLeftToRight verifies that hosts fill base block slots left to -// right and that slots beyond the provided hosts remain empty placeholders. -func TestBaseBlockFillsSlotLeftToRight(t *testing.T) { - bb := newBaseBlock("B1", []*topology.HostInfo{ - {HostName: "n0", Domain: "B1"}, - {HostName: "n1", Domain: "B1"}, - }, 4) - - require.Len(t, bb.leaves, 4) - require.NotNil(t, bb.leaves[0].host) - require.Equal(t, "n0", bb.leaves[0].host.HostName) - require.NotNil(t, bb.leaves[1].host) - require.Equal(t, "n1", bb.leaves[1].host.HostName) - require.Nil(t, bb.leaves[2].host) - require.Nil(t, bb.leaves[3].host) -} - -// TestBaseBlocksFromDomainsMultiplePerdomain verifies that multiple hosts under -// the same domainID are packed into a single base block. -func TestBaseBlocksFromDomainsMultiplePerdomain(t *testing.T) { - domains := topology.NewDomainMap() - domains.AddHostInfo(&topology.HostInfo{ - Domain: "B1", - HostName: "n0", - }) - domains.AddHostInfo(&topology.HostInfo{ - Domain: "B1", - HostName: "n1", - }) - - packed := packDomainsIntoBaseBlocks(domains, 2, 0) - require.Len(t, packed, 1) - require.Equal(t, "B1", packed[0].id) - require.Len(t, hostNamesFromLeaves(packed[0].leaves), 2) -} - -// TestPackKeepsdomainsIndependent verifies that domains are never merged -// together even when each has fewer hosts than baseBlockSize. Each domain is -// packed independently into its own base block(s), with no cross-domain combining. -func TestPackKeepsdomainsIndependent(t *testing.T) { - domains := topology.NewDomainMap() - for _, accel := range []string{"B1", "B2", "B3"} { - for j := range 3 { - domains.AddHostInfo(&topology.HostInfo{ - Domain: accel, - HostName: fmt.Sprintf("%s-n%d", accel, j), - }) - } - } - - packed := packDomainsIntoBaseBlocks(domains, 8, 0) - require.Len(t, packed, 3) - require.Equal(t, "B1", packed[0].id) - require.Len(t, hostNamesFromLeaves(packed[0].leaves), 3) - require.Equal(t, "B2", packed[1].id) - require.Len(t, hostNamesFromLeaves(packed[1].leaves), 3) - require.Equal(t, "B3", packed[2].id) - require.Len(t, hostNamesFromLeaves(packed[2].leaves), 3) -} - -// TestPackSplitsWhenHostsExceedBlockSize verifies that a single domain with more -// hosts than baseBlockSize is split into multiple base blocks with "#N" ID suffixes. -func TestPackSplitsWhenHostsExceedBlockSize(t *testing.T) { - domains := topology.NewDomainMap() - for i := range 10 { - domains.AddHostInfo(&topology.HostInfo{ - Domain: "B1", - HostName: fmt.Sprintf("n%d", i), - }) - } - packed := packDomainsIntoBaseBlocks(domains, 4, 0) - require.Len(t, packed, 3) - require.Len(t, hostNamesFromLeaves(packed[0].leaves), 4) - require.Len(t, hostNamesFromLeaves(packed[2].leaves), 2) -} - -// TestShapedBlockTreeSlots verifies that two domains fill the two available tree -// slots in sorted order when the tree has exactly the needed capacity. -func TestShapedBlockTreeSlots(t *testing.T) { - domains := topology.NewDomainMap() - domains.AddHostInfo(&topology.HostInfo{ - Domain: "B1", - HostName: "n1", - InstanceID: "i1", - }) - domains.AddHostInfo(&topology.HostInfo{ - Domain: "B3", - HostName: "n3", - InstanceID: "i3", - }) - - fanouts, ok := fanoutsPerLevel([]int{4, 8}) - require.True(t, ok) - const baseBlockSize = 4 - packed := packDomainsIntoBaseBlocks(domains, baseBlockSize, 0) - expandedFanouts := expandFanoutsForCapacity(fanouts, len(packed)) - tree := buildAggregateShape(expandedFanouts, baseBlockSize) - mergeBaseBlocksIntoTree(tree, packed) - slots := collectBaseBlockSlots(tree) - require.Len(t, slots, 2) - require.Equal(t, "B1", slots[0].domainIdentifier()) - require.Equal(t, "B3", slots[1].domainIdentifier()) -} - -// TestBlocksFromShapedTreeFillsSequentially verifies that blocks fill left-to-right -// in sorted domain order regardless of domain ID format. Each domain -// has baseBlockSize hosts so they pack independently (no merging). -func TestBlocksFromShapedTreeFillsSequentially(t *testing.T) { - fanouts, ok := fanoutsPerLevel([]int{4, 8, 16}) - require.True(t, ok) - domains := topology.NewDomainMap() - accels := []string{"gpu-clique-a", "gpu-clique-b", "gpu-clique-c"} - for _, accel := range accels { - for i := range 4 { - domains.AddHostInfo(&topology.HostInfo{ - Domain: accel, - HostName: fmt.Sprintf("%s-n%d", accel, i), - }) - } - } - - const baseBlockSize = 4 - packed := packDomainsIntoBaseBlocks(domains, baseBlockSize, 0) - expandedFanouts := expandFanoutsForCapacity(fanouts, len(packed)) - tree := buildAggregateShape(expandedFanouts, baseBlockSize) - mergeBaseBlocksIntoTree(tree, packed) - byName := map[string]*blockInfo{ - "gpu-clique-a": {name: "gpu-clique-a", nodes: []string{"gpu-clique-a-n0"}}, - "gpu-clique-b": {name: "gpu-clique-b", nodes: []string{"gpu-clique-b-n0"}}, - "gpu-clique-c": {name: "gpu-clique-c", nodes: []string{"gpu-clique-c-n0"}}, - } - out := blocksFromShapedTree(tree, byName, 3) - require.Len(t, out, 3) - require.Equal(t, "gpu-clique-a", out[0].name) - require.Equal(t, "gpu-clique-b", out[1].name) - require.Equal(t, "gpu-clique-c", out[2].name) -} - - -// TestSplitIntoBaseBlocksChunksExcessHosts verifies that 12 hosts with a blockSize of 4 -// produce exactly 3 blocks, each fully populated, filling slots left-to-right. -func TestSplitIntoBaseBlocksChunksExcessHosts(t *testing.T) { - hosts := make([]*topology.HostInfo, 12) - for i := range 12 { - hosts[i] = &topology.HostInfo{ - HostName: fmt.Sprintf("n%d", i), - Domain: "B1", - } - } - sortHostsByName(hosts) - blocks := splitIntoBaseBlocks("B1", hosts, 4) - require.Len(t, blocks, 3) - require.Len(t, blocks[0].leaves, 4) - require.Len(t, hostNamesFromLeaves(blocks[0].leaves), 4) - require.Len(t, hostNamesFromLeaves(blocks[1].leaves), 4) - require.Len(t, hostNamesFromLeaves(blocks[2].leaves), 4) -} - -func TestExpandFanoutsForCapacity(t *testing.T) { - require.Equal(t, 4, totalBaseBlockSlots([]int{2, 2})) - require.Equal(t, []int{2, 8}, expandFanoutsForCapacity([]int{2, 2}, 12)) - require.Equal(t, 16, totalBaseBlockSlots(expandFanoutsForCapacity([]int{2, 2}, 12))) - // Empty fanout slice must not panic. - require.Equal(t, []int(nil), expandFanoutsForCapacity(nil, 4)) - require.Equal(t, []int{}, expandFanoutsForCapacity([]int{}, 4)) - // Non-positive required must return fanouts unchanged. - require.Equal(t, []int{2, 2}, expandFanoutsForCapacity([]int{2, 2}, 0)) - require.Equal(t, []int{2, 2}, expandFanoutsForCapacity([]int{2, 2}, -1)) -} - -// TestShapedTreeExpandsForExcessHosts verifies that when required base blocks exceed -// the initial tree capacity, the last fanout tier is doubled until all hosts fit. -func TestShapedTreeExpandsForExcessHosts(t *testing.T) { - domains := topology.NewDomainMap() - for i := range 12 { - domains.AddHostInfo(&topology.HostInfo{ - Domain: "B1", - HostName: fmt.Sprintf("n%d", i), - }) - } - fanouts, ok := fanoutsPerLevel([]int{4, 8, 16}) - require.True(t, ok) - const baseBlockSize = 4 - packed := packDomainsIntoBaseBlocks(domains, baseBlockSize, 0) - expandedFanouts := expandFanoutsForCapacity(fanouts, len(packed)) - tree := buildAggregateShape(expandedFanouts, baseBlockSize) - mergeBaseBlocksIntoTree(tree, packed) - slots := collectBaseBlockSlots(tree) - require.GreaterOrEqual(t, len(slots), 3) - var allNodes []string - for _, s := range slots { - allNodes = append(allNodes, hostNamesFromLeaves(s.leaves)...) - } - require.Len(t, allNodes, 12) -}