diff --git a/pkg/engines/slinky/engine.go b/pkg/engines/slinky/engine.go index 0e80c5bf..30f18f83 100644 --- a/pkg/engines/slinky/engine.go +++ b/pkg/engines/slinky/engine.go @@ -278,9 +278,9 @@ func (eng *SlinkyEngine) GenerateOutput(ctx context.Context, graph *topology.Gra GetPartitionNodes: eng.getPartitionNodes, Params: []any{p.Namespace}, } - cfg, err := slurm.GetTranslateConfig(ctx, &p.BaseParams, resolvedTopologies, topologyNodeFinder) - if err != nil { - return nil, httperr.NewError(http.StatusInternalServerError, err.Error()) + cfg, httpErr := slurm.GetTranslateConfig(ctx, &p.BaseParams, resolvedTopologies, topologyNodeFinder) + if httpErr != nil { + return nil, httpErr } nt, err := translate.NewNetworkTopology(graph, cfg) diff --git a/pkg/engines/slurm/slurm.go b/pkg/engines/slurm/slurm.go index c5e19c6d..3bc64049 100644 --- a/pkg/engines/slurm/slurm.go +++ b/pkg/engines/slurm/slurm.go @@ -234,9 +234,9 @@ func GenerateOutputParams(ctx context.Context, graph *topology.Graph, params *Pa params.Plugin = topology.TopologyTree } - cfg, err := GetTranslateConfig(ctx, ¶ms.BaseParams, params.Topologies, &TopologyNodeFinder{GetPartitionNodes: getPartitionNodes}) - if err != nil { - return nil, httperr.NewError(http.StatusInternalServerError, err.Error()) + cfg, httpErr := GetTranslateConfig(ctx, ¶ms.BaseParams, params.Topologies, &TopologyNodeFinder{GetPartitionNodes: getPartitionNodes}) + if httpErr != nil { + return nil, httpErr } nt, err := translate.NewNetworkTopology(graph, cfg) @@ -252,7 +252,7 @@ func GenerateOutputParams(ctx context.Context, graph *topology.Graph, params *Pa } } - if httpErr := nt.Generate(buf); httpErr != nil { + if httpErr = nt.Generate(buf); httpErr != nil { return nil, httpErr } @@ -276,7 +276,11 @@ func GenerateOutputParams(ctx context.Context, graph *topology.Graph, params *Pa return []byte("OK\n"), nil } -func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[string]*Topology, f *TopologyNodeFinder) (*translate.Config, error) { +func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[string]*Topology, f *TopologyNodeFinder) (*translate.Config, *httperr.Error) { + if err := validateBlockSizes(params.BlockSizes); err != nil { + return nil, httperr.NewError(http.StatusBadRequest, err.Error()) + } + cfg := &translate.Config{ Plugin: params.Plugin, BlockSizes: params.BlockSizes, @@ -286,6 +290,9 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[ if len(topologies) != 0 { cfg.Topologies = make(map[string]*translate.TopologySpec) for topo, sect := range topologies { + if err := validateBlockSizes(sect.BlockSizes); err != nil { + return nil, httperr.NewError(http.StatusBadRequest, fmt.Sprintf("topology %q: %v", topo, err)) + } spec := &translate.TopologySpec{ Plugin: sect.Plugin, BlockSizes: sect.BlockSizes, @@ -302,7 +309,7 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[ klog.V(4).Infof("%s %q discovered nodes %v", sect.Plugin, topo, nodes) spec.Nodes = nodes } else { - return nil, err + return nil, httperr.NewError(http.StatusInternalServerError, err.Error()) } } cfg.Topologies[topo] = spec @@ -312,6 +319,31 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[ return cfg, nil } +func validateBlockSizes(blockSizes []int) error { + if len(blockSizes) == 0 { + return nil + } + prev := blockSizes[0] + if prev <= 0 { + return fmt.Errorf("blockSizes[0]=%d must be positive", prev) + } + for i := 1; i < len(blockSizes); i++ { + cur := blockSizes[i] + if cur <= 0 { + return fmt.Errorf("blockSizes[%d]=%d must be positive", i, cur) + } + if cur <= prev || cur%prev != 0 { + return fmt.Errorf("blockSizes[%d]=%d must be greater than blockSizes[%d]=%d and be a multiple of power of two", i, cur, i-1, prev) + } + ratio := cur / prev + if ratio&(ratio-1) != 0 { + return fmt.Errorf("blockSizes[%d]/blockSizes[%d]=%d must be a multiple of power of two", i, i-1, ratio) + } + prev = cur + } + return nil +} + func getParams(params map[string]any) (*Params, error) { var p Params err := config.Decode(params, &p) diff --git a/pkg/engines/slurm/slurm_test.go b/pkg/engines/slurm/slurm_test.go index 6009acec..bebf4513 100644 --- a/pkg/engines/slurm/slurm_test.go +++ b/pkg/engines/slurm/slurm_test.go @@ -244,15 +244,31 @@ func TestGetTranslateConfig(t *testing.T) { name: "Case 2: valid blocksize", params: &BaseParams{ Plugin: topology.TopologyBlock, - BlockSizes: []int{2, 4, 8}, + BlockSizes: []int{2, 8, 32}, }, cfg: &translate.Config{ Plugin: topology.TopologyBlock, - BlockSizes: []int{2, 4, 8}, + BlockSizes: []int{2, 8, 32}, }, }, { - name: "Case 3: with invalid partition topology", + name: "Case 3: invalid top-level blocksize ratio", + params: &BaseParams{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{2, 6}, + }, + err: "blockSizes[1]/blockSizes[0]=3 must be a multiple of power of two", + }, + { + name: "Case 4: invalid top-level blocksize multiple", + params: &BaseParams{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{2, 5}, + }, + err: "blockSizes[1]=5 must be greater than blockSizes[0]=2 and be a multiple of power of two", + }, + { + name: "Case 5: invalid partition topology", params: &BaseParams{}, topologies: map[string]*Topology{ "topo1": { @@ -266,7 +282,7 @@ func TestGetTranslateConfig(t *testing.T) { err: "missing partition name", }, { - name: "Case 4: with valid partition topology", + name: "Case 6: with valid partition topology", params: &BaseParams{}, topologies: map[string]*Topology{ "default": { @@ -292,7 +308,7 @@ func TestGetTranslateConfig(t *testing.T) { }, }, { - name: "Case 5: explicit empty nodes do not use partition discovery", + name: "Case 7: explicit empty nodes do not use partition discovery", params: &BaseParams{}, topologies: map[string]*Topology{ "topo": { @@ -315,6 +331,18 @@ func TestGetTranslateConfig(t *testing.T) { }, }, }, + { + name: "Case 8: invalid partition blocksize ratio", + params: &BaseParams{}, + topologies: map[string]*Topology{ + "topo": { + Plugin: topology.TopologyBlock, + BlockSizes: []int{2, 6}, + Nodes: []string{"node[001-100]"}, + }, + }, + err: `topology "topo": blockSizes[1]/blockSizes[0]=3 must be a multiple of power of two`, + }, } for _, tc := range testCases { @@ -323,7 +351,7 @@ func TestGetTranslateConfig(t *testing.T) { if len(tc.err) != 0 { require.EqualError(t, err, tc.err) } else { - require.NoError(t, err) + require.Nil(t, err) require.Equal(t, tc.cfg, cfg) } }) @@ -360,7 +388,14 @@ SwitchName=S3 Nodes=Node[304-306] code: http.StatusBadRequest, }, { - name: "Case 3: valid input", + name: "Case 3: invalid semantic blocksize", + graph: graph, + params: map[string]any{"blockSizes": []int{2, 6}}, + err: "blockSizes[1]/blockSizes[0]=3 must be a multiple of power of two", + code: http.StatusBadRequest, + }, + { + name: "Case 4: valid input", graph: graph, cfg: cfg, }, diff --git a/pkg/providers/infiniband/bm_test.go b/pkg/providers/infiniband/bm_test.go index df63ff84..b700372c 100644 --- a/pkg/providers/infiniband/bm_test.go +++ b/pkg/providers/infiniband/bm_test.go @@ -29,9 +29,9 @@ func TestPopulateDomainsFromPdshOutput(t *testing.T) { node-09: ClusterUUID : 50000000-0000-0000-0000-000000000005 ` domainMap := topology.DomainMap{ - "50000000-0000-0000-0000-000000000004.4000000005": map[string]string{"node-07": "node-07", "node-08": "node-08"}, - "50000000-0000-0000-0000-000000000005.4000000004": map[string]string{"node-10": "node-10"}, - "50000000-0000-0000-0000-000000000005.4000000005": map[string]string{"node-09": "node-09"}, + "50000000-0000-0000-0000-000000000004.4000000005": map[string]*topology.HostInfo{"node-07": {Domain: "50000000-0000-0000-0000-000000000004.4000000005", HostName: "node-07", InstanceID: "node-07"}, "node-08": {Domain: "50000000-0000-0000-0000-000000000004.4000000005", HostName: "node-08", InstanceID: "node-08"}}, + "50000000-0000-0000-0000-000000000005.4000000004": map[string]*topology.HostInfo{"node-10": {Domain: "50000000-0000-0000-0000-000000000005.4000000004", HostName: "node-10", InstanceID: "node-10"}}, + "50000000-0000-0000-0000-000000000005.4000000005": map[string]*topology.HostInfo{"node-09": {Domain: "50000000-0000-0000-0000-000000000005.4000000005", HostName: "node-09", InstanceID: "node-09"}}, } testCases := []struct { diff --git a/pkg/topology/domain.go b/pkg/topology/domain.go index 36e4c1a9..6cbc9aaa 100644 --- a/pkg/topology/domain.go +++ b/pkg/topology/domain.go @@ -23,25 +23,21 @@ import ( "k8s.io/klog/v2" ) -// DomainMap maps domain name to a map of hostname:instance -type DomainMap map[string]map[string]string +type HostInfo struct { + Domain string + InstanceID string + HostName string +} + +// DomainMap maps accelerator domain name to host metadata. +type DomainMap map[string]map[string]*HostInfo func NewDomainMap() DomainMap { return make(DomainMap) } func (m DomainMap) AddHost(domain, instance, host string) { - if domain == "" { - klog.Warningf("skipping topology domain with empty name for host %q (instance %q)", host, instance) - return - } - - if hosts, ok := m[domain]; ok { - hosts[host] = instance - return - } - - m[domain] = map[string]string{host: instance} + m.AddHostInfo(&HostInfo{Domain: domain, InstanceID: instance, HostName: host}) } func (m DomainMap) String() string { @@ -52,3 +48,19 @@ func (m DomainMap) String() string { } return str.String() } + +func (m DomainMap) AddHostInfo(hostInfo *HostInfo) { + if hostInfo == nil { + return + } + if hostInfo.Domain == "" { + klog.Warningf("skipping topology domain with empty name for host %q (instance %q)", hostInfo.HostName, hostInfo.InstanceID) + return + } + + if hosts, ok := m[hostInfo.Domain]; ok { + hosts[hostInfo.HostName] = hostInfo + } else { + m[hostInfo.Domain] = map[string]*HostInfo{hostInfo.HostName: hostInfo} + } +} diff --git a/pkg/topology/domain_test.go b/pkg/topology/domain_test.go index 055f3505..a764f43e 100644 --- a/pkg/topology/domain_test.go +++ b/pkg/topology/domain_test.go @@ -31,7 +31,12 @@ func TestDomainMapAddHost(t *testing.T) { domainMap.AddHost("", "instance4", "host4") require.Equal(t, DomainMap{ - "domain1": map[string]string{"host1": "instance1", "host2": "instance2"}, - "domain2": map[string]string{"host3": "instance3"}, + "domain1": map[string]*HostInfo{ + "host1": {Domain: "domain1", InstanceID: "instance1", HostName: "host1"}, + "host2": {Domain: "domain1", InstanceID: "instance2", HostName: "host2"}, + }, + "domain2": map[string]*HostInfo{ + "host3": {Domain: "domain2", InstanceID: "instance3", HostName: "host3"}, + }, }, domainMap) } diff --git a/pkg/translate/block.go b/pkg/translate/block.go index 66688e28..254073f5 100644 --- a/pkg/translate/block.go +++ b/pkg/translate/block.go @@ -51,16 +51,27 @@ func getBlockSizes(blocks []*blockInfo, requestedBlockSizes []int) []int { } func (nt *NetworkTopology) toBlockTopology(wr io.Writer, skeletonOnly bool) *httperr.Error { - blockSizes := getBlockSizes(nt.blocks, nt.config.BlockSizes) + blocks := nt.complementBlocks(nt.blocks, nt.config.BlockSizes) + // Refresh nodeInfo.blockID so GetNodeTopologySpec returns IDs that match the + // emitted topology file. complementBlocks may renumber blocks when it splits + // a domain across multiple base blocks, invalidating the IDs set by initBlocks. + for _, b := range blocks { + for _, node := range b.nodes { + if info, ok := nt.nodeInfo[node]; ok { + info.blockID = b.id + } + } + } + blockSizes := getBlockSizes(blocks, nt.config.BlockSizes) - for _, bInfo := range nt.blocks { + for _, bInfo := range blocks { var comment string if len(bInfo.name) != 0 { comment = fmt.Sprintf("# %s=%s\n", bInfo.id, bInfo.name) } var err error - if skeletonOnly { + if skeletonOnly || len(bInfo.nodes) == 0 { _, err = fmt.Fprintf(wr, "%sBlockName=%s\n", comment, bInfo.id) } else { outputNodeNames := strings.Join(cluset.Compact(bInfo.nodes), ",") diff --git a/pkg/translate/block_complement.go b/pkg/translate/block_complement.go new file mode 100644 index 00000000..2b915d4a --- /dev/null +++ b/pkg/translate/block_complement.go @@ -0,0 +1,115 @@ +/* + * Copyright 2026 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package translate + +// complementBlocks packs domain hosts into BlockSizes-shaped base-block slots and +// returns the resulting flat block list. +// +// Only domains for accelerators present in blocks are used so per-partition YAML +// complementing is not masked by domains owned by other partitions in nt.domains. +// +// The group size is derived from the maximum accelerator host count: it is the smallest +// 2^n such that 2^n * baseBlockSize >= maxAcceleratorSize. Each accelerator's base +// block count is then padded to a multiple of that groupSize so every accelerator +// occupies complete aggregate groups. +func (nt *NetworkTopology) complementBlocks(blocks []*blockInfo, blockSizes []int) []*blockInfo { + fanouts, ok := fanoutsPerLevel(blockSizes) + if !ok || nt.domains == nil { + return blocks + } + + baseBlockSize := blockSizes[0] + domains, ok := orderedDomainsForBlocks(nt.domains, blocks) + if !ok { + return blocks + } + + groupSize := groupSizeFromOrderedDomains(domains, baseBlockSize) + out := packOrderedDomainsIntoBlocks(domains, baseBlockSize, groupSize) + + // When no padding was added (packed count equals input count), stop at the exact + // packed count so trailing tree-capacity slots do not falsely trigger complement usage. + if len(out) != len(blocks) { + required := expandedBaseBlockSlots(fanouts, len(out)) + for len(out) < required { + out = append(out, &blockInfo{}) + } + } + assignSequentialBlockIDs(out) + + if !shouldUseComplementedBlocks(blocks, out) { + return blocks + } + return out +} + +// shouldUseComplementedBlocks reports whether the tree-derived list should replace the +// input. Two cases warrant replacement: +// - Interior empty slots: the tree has gaps where no accelerator was placed, meaning +// the shaped output carries structural information the flat input list cannot express. +// - Count increase: an accelerator had more hosts than baseBlockSize and was split +// into multiple base blocks, so the output is longer than the input. +// +// A shorter output is never used: domain packing may skip blocks whose domain is +// absent from the global map, producing fewer packed blocks than the input. Replacing +// the input in that case would silently drop blocks. +func shouldUseComplementedBlocks(input, out []*blockInfo) bool { + if len(out) < len(input) { + return false + } + if hasEmptyBlockSlots(out) { + return true + } + return len(out) > len(input) +} + +// hasEmptyBlockSlots reports whether any interior slot in blocks is empty. +// Trailing empty blocks are not considered complement slots because they arise from +// tree capacity rounding and carry no structural meaning for the scheduler. +// "Trailing" means everything after the last non-empty block; only empty slots +// that appear before that boundary are treated as structural gaps. +func hasEmptyBlockSlots(blocks []*blockInfo) bool { + lastNonEmpty := -1 + for i := len(blocks) - 1; i >= 0; i-- { + if !isEmptyBlock(blocks[i]) { + lastNonEmpty = i + break + } + } + for i := 0; i < lastNonEmpty; i++ { + if isEmptyBlock(blocks[i]) { + return true + } + } + return false +} + +// fanoutsPerLevel derives child segment counts from BlockSizes. +// Each ratio BlockSizes[i]/BlockSizes[i-1] must be a power of two greater than 1. +func fanoutsPerLevel(blockSizes []int) ([]int, bool) { + if len(blockSizes) < 2 { + return nil, false + } + prev := blockSizes[0] + if prev <= 0 { + return nil, false + } + out := make([]int, 0, len(blockSizes)-1) + for i := 1; i < len(blockSizes); i++ { + cur := blockSizes[i] + if cur <= prev || cur%prev != 0 { + return nil, false + } + ratio := cur / prev + // Power-of-two check: ratio & (ratio-1) is zero only for powers of two. + if ratio&(ratio-1) != 0 { + return nil, false + } + out = append(out, ratio) + prev = cur + } + return out, true +} diff --git a/pkg/translate/block_complement_test.go b/pkg/translate/block_complement_test.go new file mode 100644 index 00000000..6a73a869 --- /dev/null +++ b/pkg/translate/block_complement_test.go @@ -0,0 +1,432 @@ +/* + * Copyright 2026 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package translate + +import ( + "bytes" + "fmt" + "strings" + "testing" + + "github.com/NVIDIA/topograph/pkg/topology" + "github.com/stretchr/testify/require" +) + +// TestComplementMissingBaseBlock verifies that when an accelerator domain is absent +// from the graph, the remaining blocks renumber sequentially with no empty placeholder. +// maxAcceleratorSize=3 ≤ baseBlockSize=4, so groupSize=1 (no padding) and complement +// is a no-op; the original 3-block list is returned unchanged. +func TestComplementMissingBaseBlock(t *testing.T) { + root, _ := getBlockWithIBTestSet() + delete(root.Domains, "B2") + + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{4, 8, 16}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + require.Nil(t, nt.toBlockTopology(&buf, false)) + + expected := strings.Join([]string{ + "# block001=B1", + "BlockName=block001 Nodes=Node[104-106]", + "# block002=B3", + "BlockName=block002 Nodes=Node[304-306]", + "# block003=B4", + "BlockName=block003 Nodes=Node[401-403]", + "BlockSizes=4,8,16", + "", + }, "\n") + require.Equal(t, expected, buf.String()) +} + +// TestComplementMissingLeafSegment verifies the asymmetric-spine case: one spine has +// 4 leaf switches and the other has 3. maxAcceleratorSize=3 ≤ baseBlockSize=4, so +// groupSize=1 (no padding) and complement is a no-op; all 7 accelerators appear as +// 7 sequential blocks with no trailing placeholders. +func TestComplementMissingLeafSegment(t *testing.T) { + root, _ := getBlockWithIBAsymmetricSpineTestSet() + + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{4, 16, 32}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + require.Nil(t, nt.toBlockTopology(&buf, false)) + + expected := strings.Join([]string{ + "# block001=B1", + "BlockName=block001 Nodes=Node[101-103]", + "# block002=B2", + "BlockName=block002 Nodes=Node[201-202,205]", + "# block003=B3", + "BlockName=block003 Nodes=Node[301-303]", + "# block004=B4", + "BlockName=block004 Nodes=Node[401-403]", + "# block005=B5", + "BlockName=block005 Nodes=Node[501-503]", + "# block006=B6", + "BlockName=block006 Nodes=Node[601-603]", + "# block007=B7", + "BlockName=block007 Nodes=Node[701-703]", + "BlockSizes=4,16,32", + "", + }, "\n") + require.Equal(t, expected, buf.String()) +} + +// TestNoComplementWithoutTree verifies that complementBlocks is a no-op when the graph +// has no Tiers (no switch tree). maxAcceleratorSize=3 ≤ baseBlockSize=4, so groupSize=1 +// and no structural padding is applied; the output contains no empty block slots. +func TestNoComplementWithoutTree(t *testing.T) { + root, _ := getBlockTestSet() + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{4, 8, 16}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + require.Nil(t, nt.toBlockTopology(&buf, false)) + require.NotContains(t, buf.String(), "BlockName=block002\n") + require.Contains(t, buf.String(), "BlockSizes=4,8,16") +} + +// TestNoComplementSingleBlockSize verifies that a single BlockSizes entry (no tiers) +// disables the complement path entirely; fanoutsPerLevel requires at least two sizes. +func TestNoComplementSingleBlockSize(t *testing.T) { + root, _ := getBlockWithIBTestSet() + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{3}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + require.Nil(t, nt.toBlockTopology(&buf, false)) + require.Equal(t, testBlockConfig1_2, buf.String()) +} + +func TestFanoutsPerLevel(t *testing.T) { + fanouts, ok := fanoutsPerLevel([]int{4, 8, 16}) + require.True(t, ok) + require.Equal(t, []int{2, 2}, fanouts) + + _, ok = fanoutsPerLevel([]int{4, 8, 12}) + require.False(t, ok) + + _, ok = fanoutsPerLevel([]int{4, 6, 12}) + require.False(t, ok) + + _, ok = fanoutsPerLevel([]int{4}) + require.False(t, ok) +} + +// TestHasEmptyBlockSlots verifies the interior-only rule: trailing empty blocks are +// not counted as complement slots (they arise from tree-capacity rounding), but an +// empty block that appears before the last non-empty block is a structural gap. +func TestHasEmptyBlockSlots(t *testing.T) { + require.False(t, hasEmptyBlockSlots([]*blockInfo{{name: "B1", nodes: []string{"n1"}}})) + // Single trailing empty: not a structural gap. + require.False(t, hasEmptyBlockSlots([]*blockInfo{{name: "B1", nodes: []string{"n1"}}, {}})) + // Multiple trailing empties: all are trailing, none are structural gaps. + require.False(t, hasEmptyBlockSlots([]*blockInfo{{name: "B1"}, {}, {}})) + // Interior empty between B1 and B3: structural gap. + require.True(t, hasEmptyBlockSlots([]*blockInfo{{name: "B1"}, {}, {name: "B3"}})) + // Interior empty followed by further trailing empties: gap is detected. + require.True(t, hasEmptyBlockSlots([]*blockInfo{{name: "B1"}, {}, {name: "B3"}, {}, {}})) + // Trailing empties after 7 filled blocks: tree-capacity artifact, not a gap. + require.False(t, hasEmptyBlockSlots([]*blockInfo{ + {name: "B1"}, {name: "B2"}, {name: "B3"}, {name: "B4"}, + {name: "B5"}, {name: "B6"}, {name: "B7"}, {}, {}, + })) +} + +// TestComplementKeepsSeparateAccelerators verifies that two undersized accelerators are +// never merged into a single base block. maxAcceleratorSize=3 ≤ baseBlockSize=8, so +// groupSize=1 and complement is a no-op; the original 2-block list is returned with +// each accelerator in its own separate block. +func TestComplementKeepsSeparateAccelerators(t *testing.T) { + domains := topology.NewDomainMap() + nodesB1 := []string{"Node101", "Node102", "Node103"} + nodesB2 := []string{"Node201", "Node202", "Node205"} + for _, n := range nodesB1 { + domains.AddHostInfo(&topology.HostInfo{Domain: "B1", HostName: n, InstanceID: n}) + } + for _, n := range nodesB2 { + domains.AddHostInfo(&topology.HostInfo{Domain: "B2", HostName: n, InstanceID: n}) + } + + nt := &NetworkTopology{ + domains: domains, + blocks: []*blockInfo{ + {name: "B1", nodes: nodesB1}, + {name: "B2", nodes: nodesB2}, + }, + } + + out := nt.complementBlocks(nt.blocks, []int{8, 16}) + require.Len(t, out, 2) + require.Equal(t, "B1", out[0].name) + require.Len(t, out[0].nodes, 3) + require.Equal(t, "B2", out[1].name) + require.Len(t, out[1].nodes, 3) +} + +// TestComplementExcessHostsPerAccelerator verifies the split path: when a single +// accelerator has more hosts than baseBlockSize it is split into multiple base blocks, +// each carrying the same accelerator name, and every host appears exactly once. +// maxAcceleratorSize=12, baseBlockSize=4 → groupSize=4 (2^2*4=16 ≥ 12); 3 real blocks +// padded to 4 (ceil(3/4)*4). +func TestComplementExcessHostsPerAccelerator(t *testing.T) { + domains := topology.NewDomainMap() + nodes := make([]string, 0, 12) + for i := 0; i < 12; i++ { + name := fmt.Sprintf("Node%03d", 100+i) + nodes = append(nodes, name) + domains.AddHostInfo(&topology.HostInfo{ + Domain: "B1", + HostName: name, + InstanceID: name, + }) + } + + nt := &NetworkTopology{ + domains: domains, + blocks: []*blockInfo{{ + id: "block001", + name: "B1", + nodes: nodes, + }}, + } + + out := nt.complementBlocks(nt.blocks, []int{4, 8, 16}) + // 3 base blocks (ceil(12/4)) padded to 4 (groupSize=2, ceil(3/2)*2=4). + require.Len(t, out, 4) + require.True(t, isEmptyBlock(out[3]), "out[3] should be the group-alignment padding block") + + seen := make(map[string]bool) + for _, b := range out[:3] { + require.Equal(t, "B1", b.name) + for _, n := range b.nodes { + seen[n] = true + } + } + require.Len(t, seen, 12) + for _, n := range nodes { + require.True(t, seen[n]) + } +} + +// TestComplementPartitionLocalDomainsOnly verifies that complementBlocks scopes domain +// lookup to the partition's own blocks. B2 exists in nt.domains but is excluded from +// partitionBlocks, so the complement result contains only B1, B3, and B4. With +// maxAcceleratorSize=3 ≤ baseBlockSize=4, groupSize=1 and no padding is applied. +func TestComplementPartitionLocalDomainsOnly(t *testing.T) { + root, _ := getBlockWithIBTestSet() + nt, err := NewNetworkTopology(root, &Config{Plugin: topology.TopologyBlock}) + require.NoError(t, err) + + // Partition owns B1, B3, B4 but not B2 (B2 remains in global nt.domains). + partitionBlocks := make([]*blockInfo, 0, 3) + for _, b := range nt.blocks { + if b.name == "B2" { + continue + } + partitionBlocks = append(partitionBlocks, b) + } + + out := nt.complementBlocks(partitionBlocks, []int{4, 8, 16}) + require.Len(t, out, 3) + require.Equal(t, "B1", out[0].name) + require.Equal(t, "B3", out[1].name) + require.Equal(t, "B4", out[2].name) +} + +// TestDomainsForBlocksFilteredToPartitionNodes is a regression test for cross-partition +// node contamination. Domain B1 holds 4 nodes globally (n1–n4), but the partition-local +// blockInfo only lists n1, n2, n3. Without filtering, orderedDomainsForBlocks would copy all 4 +// hosts and n4 would appear in the complemented output. With the fix, only n1–n3 are +// used: the split produces two base blocks ([n1,n2] and [n3]) and n4 is absent. +func TestDomainsForBlocksFilteredToPartitionNodes(t *testing.T) { + domains := topology.NewDomainMap() + for _, n := range []string{"n1", "n2", "n3", "n4"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B1", HostName: n, InstanceID: n}) + } + + // Partition only owns n1, n2, n3 — n4 belongs to another partition. + nt := &NetworkTopology{ + domains: domains, + blocks: []*blockInfo{}, + } + partitionBlocks := []*blockInfo{ + {name: "B1", nodes: []string{"n1", "n2", "n3"}}, + } + + out := nt.complementBlocks(partitionBlocks, []int{2, 4}) + // groupSize=2 (maxAccelSize=3, 2^1×2=4≥3); B1 splits into 2 base blocks. + // len(packed)=2 ≠ len(input)=1 → complement applied. + require.Len(t, out, 2) + + seen := make(map[string]bool) + for _, b := range out { + require.Equal(t, "B1", b.name) + for _, n := range b.nodes { + seen[n] = true + } + } + require.True(t, seen["n1"]) + require.True(t, seen["n2"]) + require.True(t, seen["n3"]) + require.False(t, seen["n4"], "n4 belongs to another partition and must not appear") +} + +// TestComplementPreservesInputWhenDomainsMissing verifies that complementBlocks +// returns the original block list unchanged when an input block has no matching +// entry in the global domain map (e.g. the block name was never registered). +func TestComplementPreservesInputWhenDomainsMissing(t *testing.T) { + domains := topology.NewDomainMap() + // Only B1 is in the domain map; B2 is not. + for _, n := range []string{"n1", "n2"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B1", HostName: n, InstanceID: n}) + } + nt := &NetworkTopology{ + domains: domains, + } + input := []*blockInfo{ + {id: "block001", name: "B1", nodes: []string{"n1", "n2"}}, + {id: "block002", name: "B2", nodes: []string{"n3", "n4"}}, // no domain entry + } + out := nt.complementBlocks(input, []int{2, 4}) + require.Equal(t, input, out) +} + +// TestComplementPreservesTopologyBlockOrder verifies that complementing follows the +// already established block order rather than sorting domain IDs alphabetically. +func TestComplementPreservesTopologyBlockOrder(t *testing.T) { + domains := topology.NewDomainMap() + for _, n := range []string{"b1n1", "b1n2", "b1n3"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B1", HostName: n, InstanceID: n}) + } + for _, n := range []string{"b2n1", "b2n2", "b2n3"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B2", HostName: n, InstanceID: n}) + } + nt := &NetworkTopology{domains: domains} + input := []*blockInfo{ + {id: "block001", name: "B2", nodes: []string{"b2n1", "b2n2", "b2n3"}}, + {id: "block002", name: "B1", nodes: []string{"b1n1", "b1n2", "b1n3"}}, + } + + out := nt.complementBlocks(input, []int{2, 4}) + require.Len(t, out, 4) + require.Equal(t, "B2", out[0].name) + require.Equal(t, "B2", out[1].name) + require.Equal(t, "B1", out[2].name) + require.Equal(t, "B1", out[3].name) +} + +// TestGetBlockTopologyUnitWithMultiAcceleratorDomains verifies the YAML per-partition +// complement path end-to-end: two domains, three accelerators (a1, a2, a3), block +// sizes [2,4]. a2 is undersized (fewer nodes than groupSize=2 requires), so it gets +// an empty padding slot; tree-capacity expansion adds two more trailing empty slots. +func TestGetBlockTopologyUnitWithMultiAcceleratorDomains(t *testing.T) { + domains := topology.NewDomainMap() + for _, n := range []string{"n10", "n11", "n12"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a1", HostName: n, InstanceID: n}) + } + for _, n := range []string{"n20", "n21"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a2", HostName: n, InstanceID: n}) + } + for _, n := range []string{"n31", "n32", "n33"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a3", HostName: n, InstanceID: n}) + } + + cfg := &Config{ + Topologies: map[string]*TopologySpec{ + "topo1": { + Plugin: topology.TopologyBlock, + Nodes: []string{"n[10-12]", "n[20-21]", "n[31-33]"}, + BlockSizes: []int{2, 4}, + }, + }, + } + + graph := &topology.Graph{Domains: domains} + nt, err := NewNetworkTopology(graph, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + require.Nil(t, nt.Generate(&buf)) + + expected := strings.Join([]string{ + "- topology: topo1", + " cluster_default: false", + " block:", + " block_sizes:", + " - 2", + " - 4", + " blocks:", + " - block: block1", + " nodes: n[10-11]", + " - block: block2", + " nodes: n12", + " - block: block3", + " nodes: n[20-21]", + " - block: block4", + " - block: block5", + " nodes: n[31-32]", + " - block: block6", + " nodes: n33", + " - block: block7", + " - block: block8", + "", + }, "\n") + require.Equal(t, expected, buf.String()) +} + +// getBlockWithIBAsymmetricSpineTestSet models two spines with four leaf switches on the +// left spine and three on the right, each leaf switch hosting one accelerator domain. +func getBlockWithIBAsymmetricSpineTestSet() (*topology.Graph, map[string]string) { + n := func(id, name string) *topology.Vertex { + return &topology.Vertex{ID: id, Name: name} + } + + leaf := func(id string, nodes map[string]*topology.Vertex) *topology.Vertex { + return &topology.Vertex{ID: id, Vertices: nodes} + } + + l11 := leaf("L11", map[string]*topology.Vertex{"I11a": n("I11a", "Node101"), "I11b": n("I11b", "Node102"), "I11c": n("I11c", "Node103")}) + l12 := leaf("L12", map[string]*topology.Vertex{"I12a": n("I12a", "Node201"), "I12b": n("I12b", "Node202"), "I12c": n("I12c", "Node205")}) + l13 := leaf("L13", map[string]*topology.Vertex{"I13a": n("I13a", "Node301"), "I13b": n("I13b", "Node302"), "I13c": n("I13c", "Node303")}) + l14 := leaf("L14", map[string]*topology.Vertex{"I14a": n("I14a", "Node401"), "I14b": n("I14b", "Node402"), "I14c": n("I14c", "Node403")}) + l21 := leaf("L21", map[string]*topology.Vertex{"I21a": n("I21a", "Node501"), "I21b": n("I21b", "Node502"), "I21c": n("I21c", "Node503")}) + l22 := leaf("L22", map[string]*topology.Vertex{"I22a": n("I22a", "Node601"), "I22b": n("I22b", "Node602"), "I22c": n("I22c", "Node603")}) + l23 := leaf("L23", map[string]*topology.Vertex{"I23a": n("I23a", "Node701"), "I23b": n("I23b", "Node702"), "I23c": n("I23c", "Node703")}) + + spine1 := &topology.Vertex{ID: "SP1", Vertices: map[string]*topology.Vertex{"L11": l11, "L12": l12, "L13": l13, "L14": l14}} + spine2 := &topology.Vertex{ID: "SP2", Vertices: map[string]*topology.Vertex{"L21": l21, "L22": l22, "L23": l23}} + core := &topology.Vertex{Vertices: map[string]*topology.Vertex{"SP1": spine1, "SP2": spine2}} + + domains := testDomainMap(map[string]map[string]string{ + "B1": {"Node101": "I11a", "Node102": "I11b", "Node103": "I11c"}, + "B2": {"Node201": "I12a", "Node202": "I12b", "Node205": "I12c"}, + "B3": {"Node301": "I13a", "Node302": "I13b", "Node303": "I13c"}, + "B4": {"Node401": "I14a", "Node402": "I14b", "Node403": "I14c"}, + "B5": {"Node501": "I21a", "Node502": "I21b", "Node503": "I21c"}, + "B6": {"Node601": "I22a", "Node602": "I22b", "Node603": "I22c"}, + "B7": {"Node701": "I23a", "Node702": "I23b", "Node703": "I23c"}, + }) + + return &topology.Graph{Tiers: core, Domains: domains}, nil +} diff --git a/pkg/translate/block_pack.go b/pkg/translate/block_pack.go new file mode 100644 index 00000000..9edd036b --- /dev/null +++ b/pkg/translate/block_pack.go @@ -0,0 +1,174 @@ +/* + * Copyright 2026 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package translate + +import ( + "fmt" + "sort" + + "github.com/NVIDIA/topograph/pkg/topology" +) + +type orderedDomain struct { + name string + hosts []*topology.HostInfo +} + +// orderedDomainsForBlocks returns partition-local domain hosts in the same order as +// blocks. Each domain is filtered to the block's node list so partitions sharing an +// accelerator domain cannot leak nodes into one another. +func orderedDomainsForBlocks(all topology.DomainMap, blocks []*blockInfo) ([]orderedDomain, bool) { + if all == nil { + return nil, false + } + + domains := make([]orderedDomain, 0, len(blocks)) + for _, b := range blocks { + hosts, ok := hostsForBlock(all, b) + if !ok { + return nil, false + } + domains = append(domains, orderedDomain{name: b.name, hosts: hosts}) + } + return domains, len(domains) != 0 +} + +func hostsForBlock(all topology.DomainMap, b *blockInfo) ([]*topology.HostInfo, bool) { + if b == nil || b.name == "" || len(b.nodes) == 0 { + return nil, false + } + hostsByName, ok := all[b.name] + if !ok { + return nil, false + } + + hosts := make([]*topology.HostInfo, 0, len(b.nodes)) + seen := make(map[string]struct{}, len(b.nodes)) + for _, node := range b.nodes { + if _, ok := seen[node]; ok { + continue + } + seen[node] = struct{}{} + + host := hostsByName[node] + if host == nil { + return nil, false + } + hosts = append(hosts, host) + } + if len(hosts) == 0 { + return nil, false + } + sortHostsByName(hosts) + return hosts, true +} + +// groupSizeFromOrderedDomains computes how many base blocks a fully populated +// accelerator occupies, rounded up to the nearest power of two. +func groupSizeFromOrderedDomains(domains []orderedDomain, baseBlockSize int) int { + maxNodes := 0 + for _, domain := range domains { + if len(domain.hosts) > maxNodes { + maxNodes = len(domain.hosts) + } + } + groupSize := 1 + capacity := baseBlockSize + for capacity < maxNodes { + groupSize *= 2 + capacity *= 2 + } + return groupSize +} + +func packOrderedDomainsIntoBlocks(domains []orderedDomain, baseBlockSize, groupSize int) []*blockInfo { + if baseBlockSize <= 0 { + return nil + } + + var blocks []*blockInfo + for _, domain := range domains { + before := len(blocks) + blocks = append(blocks, splitHostsIntoBlocks(domain.name, domain.hosts, baseBlockSize)...) + if groupSize > 1 { + blocks = padDomainGroup(blocks, len(blocks)-before, groupSize) + } + } + return blocks +} + +func splitHostsIntoBlocks(domainName string, hosts []*topology.HostInfo, baseBlockSize int) []*blockInfo { + blocks := make([]*blockInfo, 0, (len(hosts)+baseBlockSize-1)/baseBlockSize) + for start := 0; start < len(hosts); start += baseBlockSize { + end := start + baseBlockSize + if end > len(hosts) { + end = len(hosts) + } + blocks = append(blocks, &blockInfo{ + name: domainName, + nodes: hostNames(hosts[start:end]), + }) + } + return blocks +} + +func padDomainGroup(blocks []*blockInfo, realBlockCount, groupSize int) []*blockInfo { + padded := roundUpToMultiple(realBlockCount, groupSize) + for i := realBlockCount; i < padded; i++ { + blocks = append(blocks, &blockInfo{}) + } + return blocks +} + +func roundUpToMultiple(n, multiple int) int { + if multiple <= 1 { + return n + } + return ((n + multiple - 1) / multiple) * multiple +} + +func expandedBaseBlockSlots(fanouts []int, required int) int { + capacity := totalBaseBlockSlots(fanouts) + for capacity < required { + capacity *= 2 + } + return capacity +} + +func totalBaseBlockSlots(fanouts []int) int { + n := 1 + for _, f := range fanouts { + n *= f + } + return n +} + +func assignSequentialBlockIDs(blocks []*blockInfo) { + for i, block := range blocks { + block.id = fmt.Sprintf("block%03d", i+1) + } +} + +func isEmptyBlock(b *blockInfo) bool { + return b == nil || (len(b.name) == 0 && len(b.nodes) == 0) +} + +func hostNames(hosts []*topology.HostInfo) []string { + nodes := make([]string, 0, len(hosts)) + for _, host := range hosts { + if host == nil || host.HostName == "" { + continue + } + nodes = append(nodes, host.HostName) + } + return nodes +} + +func sortHostsByName(hosts []*topology.HostInfo) { + sort.Slice(hosts, func(i, j int) bool { + return hosts[i].HostName < hosts[j].HostName + }) +} diff --git a/pkg/translate/block_pack_test.go b/pkg/translate/block_pack_test.go new file mode 100644 index 00000000..37151002 --- /dev/null +++ b/pkg/translate/block_pack_test.go @@ -0,0 +1,116 @@ +/* + * Copyright 2026 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package translate + +import ( + "fmt" + "testing" + + "github.com/NVIDIA/topograph/pkg/topology" + "github.com/stretchr/testify/require" +) + +func TestSortHostsByName(t *testing.T) { + hosts := []*topology.HostInfo{ + {HostName: "z"}, + {HostName: "a"}, + {HostName: "m"}, + } + sortHostsByName(hosts) + require.Equal(t, []string{"a", "m", "z"}, []string{hosts[0].HostName, hosts[1].HostName, hosts[2].HostName}) +} + +func TestOrderedDomainsForBlocksPreservesOrderAndFiltersPartitionNodes(t *testing.T) { + domains := topology.NewDomainMap() + for _, node := range []string{"n20", "n21", "n22"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B2", HostName: node}) + } + for _, node := range []string{"n10", "n11", "n12"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "B1", HostName: node}) + } + + ordered, ok := orderedDomainsForBlocks(domains, []*blockInfo{ + {name: "B2", nodes: []string{"n22", "n20"}}, + {name: "B1", nodes: []string{"n11"}}, + }) + require.True(t, ok) + require.Len(t, ordered, 2) + require.Equal(t, "B2", ordered[0].name) + require.Equal(t, []string{"n20", "n22"}, hostNames(ordered[0].hosts)) + require.Equal(t, "B1", ordered[1].name) + require.Equal(t, []string{"n11"}, hostNames(ordered[1].hosts)) +} + +func TestPackKeepsDomainsIndependent(t *testing.T) { + domains := []orderedDomain{ + testOrderedDomain("B1", "B1-n0", "B1-n1", "B1-n2"), + testOrderedDomain("B2", "B2-n0", "B2-n1", "B2-n2"), + testOrderedDomain("B3", "B3-n0", "B3-n1", "B3-n2"), + } + + packed := packOrderedDomainsIntoBlocks(domains, 8, 1) + require.Len(t, packed, 3) + require.Equal(t, "B1", packed[0].name) + require.Len(t, packed[0].nodes, 3) + require.Equal(t, "B2", packed[1].name) + require.Len(t, packed[1].nodes, 3) + require.Equal(t, "B3", packed[2].name) + require.Len(t, packed[2].nodes, 3) +} + +func TestPackSplitsWhenHostsExceedBlockSize(t *testing.T) { + hosts := make([]string, 0, 10) + for i := range 10 { + hosts = append(hosts, fmt.Sprintf("n%02d", i)) + } + + packed := packOrderedDomainsIntoBlocks([]orderedDomain{testOrderedDomain("B1", hosts...)}, 4, 1) + require.Len(t, packed, 3) + require.Equal(t, []string{"n00", "n01", "n02", "n03"}, packed[0].nodes) + require.Equal(t, []string{"n04", "n05", "n06", "n07"}, packed[1].nodes) + require.Equal(t, []string{"n08", "n09"}, packed[2].nodes) +} + +func TestPackPadsEachDomainToGroupSize(t *testing.T) { + domains := []orderedDomain{ + testOrderedDomain("a1", "n10", "n11", "n12"), + testOrderedDomain("a2", "n20", "n21"), + testOrderedDomain("a3", "n31", "n32", "n33"), + } + + packed := packOrderedDomainsIntoBlocks(domains, 2, 2) + require.Len(t, packed, 6) + require.Equal(t, "a1", packed[0].name) + require.Equal(t, "a1", packed[1].name) + require.Equal(t, "a2", packed[2].name) + require.True(t, isEmptyBlock(packed[3])) + require.Equal(t, "a3", packed[4].name) + require.Equal(t, "a3", packed[5].name) +} + +func TestExpandedBaseBlockSlots(t *testing.T) { + require.Equal(t, 4, totalBaseBlockSlots([]int{2, 2})) + require.Equal(t, 16, expandedBaseBlockSlots([]int{2, 2}, 12)) + require.Equal(t, 4, expandedBaseBlockSlots(nil, 4)) + require.Equal(t, 4, expandedBaseBlockSlots([]int{}, 4)) + require.Equal(t, 4, expandedBaseBlockSlots([]int{2, 2}, 0)) +} + +func TestAssignSequentialBlockIDs(t *testing.T) { + blocks := []*blockInfo{{name: "B1"}, {}, {name: "B2"}} + assignSequentialBlockIDs(blocks) + require.Equal(t, "block001", blocks[0].id) + require.Equal(t, "block002", blocks[1].id) + require.Equal(t, "block003", blocks[2].id) +} + +func testOrderedDomain(name string, nodes ...string) orderedDomain { + hosts := make([]*topology.HostInfo, 0, len(nodes)) + for _, node := range nodes { + hosts = append(hosts, &topology.HostInfo{Domain: name, HostName: node}) + } + return orderedDomain{name: name, hosts: hosts} +} diff --git a/pkg/translate/block_test.go b/pkg/translate/block_test.go index e4fe7840..21ac780d 100644 --- a/pkg/translate/block_test.go +++ b/pkg/translate/block_test.go @@ -11,6 +11,7 @@ import ( "strings" "testing" + "github.com/NVIDIA/topograph/pkg/topology" "github.com/stretchr/testify/require" ) @@ -202,6 +203,132 @@ func TestGetBlockSizes(t *testing.T) { } } +// TestGenerateTopologyConfig exercises GenerateTopologyConfig directly for a +// cluster-wide block topology. For cluster-wide mode (no per-partition Topologies +// map), the returned []*TopologyUnit slice is always empty — the topology is written +// to the writer rather than returned. +// +// BlockSizes={2,4,8} triggers node complementing: baseBlockSize=2, +// maxAcceleratorSize=3, so groupSize=2 (2^1×2=4≥3). Each 3-node accelerator is +// split into 2 base blocks (no empty padding needed since 2 is already a multiple of +// groupSize). The 4 accelerators × 2 blocks each = 8 output blocks total. +// +// The test covers both full output (with node lists compacted to ranges) and +// skeleton-only output (block structure without node lists). +func TestGenerateTopologyConfig(t *testing.T) { + root, _ := getBlockWithIBTestSet() + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{2, 4, 8}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + t.Run("full output", func(t *testing.T) { + var buf bytes.Buffer + topologies, httpErr := nt.GenerateTopologyConfig(&buf, false) + require.Nil(t, httpErr) + // Cluster-wide block topology: topology is written to the writer, not + // returned in the TopologyUnit slice. + require.Empty(t, topologies) + // Each accelerator is split into 2 base blocks (nodes sorted alphabetically, + // then chunked at baseBlockSize=2): first chunk gets the lower 2, second + // gets the remaining 1. + expected := strings.Join([]string{ + "# block001=B1", + "BlockName=block001 Nodes=Node[104-105]", + "# block002=B1", + "BlockName=block002 Nodes=Node106", + "# block003=B2", + "BlockName=block003 Nodes=Node[201-202]", + "# block004=B2", + "BlockName=block004 Nodes=Node205", + "# block005=B3", + "BlockName=block005 Nodes=Node[304-305]", + "# block006=B3", + "BlockName=block006 Nodes=Node306", + "# block007=B4", + "BlockName=block007 Nodes=Node[401-402]", + "# block008=B4", + "BlockName=block008 Nodes=Node403", + "BlockSizes=2,4,8", + "", + }, "\n") + require.Equal(t, expected, buf.String()) + }) + + t.Run("skeleton only", func(t *testing.T) { + var buf bytes.Buffer + topologies, httpErr := nt.GenerateTopologyConfig(&buf, true) + require.Nil(t, httpErr) + require.Empty(t, topologies) + // Same 8-block structure but Nodes= is omitted from every line. + expected := strings.Join([]string{ + "# block001=B1", + "BlockName=block001", + "# block002=B1", + "BlockName=block002", + "# block003=B2", + "BlockName=block003", + "# block004=B2", + "BlockName=block004", + "# block005=B3", + "BlockName=block005", + "# block006=B3", + "BlockName=block006", + "# block007=B4", + "BlockName=block007", + "# block008=B4", + "BlockName=block008", + "BlockSizes=2,4,8", + "", + }, "\n") + require.Equal(t, expected, buf.String()) + }) +} + +// TestGetNodeTopologySpecAfterComplement verifies that GetNodeTopologySpec returns +// block IDs that match the emitted topology file after complement splits domains +// across multiple base blocks. +func TestGetNodeTopologySpecAfterComplement(t *testing.T) { + root, _ := getBlockWithIBTestSet() + cfg := &Config{ + Plugin: topology.TopologyBlock, + BlockSizes: []int{2, 4, 8}, + } + nt, err := NewNetworkTopology(root, cfg) + require.NoError(t, err) + + var buf bytes.Buffer + _, httpErr := nt.GenerateTopologyConfig(&buf, false) + require.Nil(t, httpErr) + + // Expected mapping: first two nodes of each domain go to the first base block, + // the third node goes to the second (sorted alphabetically within the domain). + cases := []struct { + node string + blockID string + }{ + {"Node104", "block001"}, // B1 first chunk + {"Node105", "block001"}, // B1 first chunk + {"Node106", "block002"}, // B1 second chunk — stale before fix + {"Node201", "block003"}, // B2 first chunk — stale before fix + {"Node202", "block003"}, // B2 first chunk — stale before fix + {"Node205", "block004"}, // B2 second chunk — stale before fix + {"Node304", "block005"}, // B3 first chunk + {"Node305", "block005"}, // B3 first chunk + {"Node306", "block006"}, // B3 second chunk + {"Node401", "block007"}, // B4 first chunk + {"Node402", "block007"}, // B4 first chunk + {"Node403", "block008"}, // B4 second chunk + } + for _, tc := range cases { + spec, specErr := nt.GetNodeTopologySpec(tc.node, nil) + require.Nil(t, specErr, "node %s", tc.node) + require.Equal(t, "default:"+tc.blockID, spec, "node %s", tc.node) + } +} + func populateBlockInfo(blocks map[string]int) []*blockInfo { result := make([]*blockInfo, 0, len(blocks)) diff --git a/pkg/translate/topology.go b/pkg/translate/topology.go index fa73d23e..b1e09173 100644 --- a/pkg/translate/topology.go +++ b/pkg/translate/topology.go @@ -35,6 +35,7 @@ type TopologySpec struct { type NetworkTopology struct { config *Config tree map[string][]string // adjacency list + domains topology.DomainMap // accelerator domains from provider graph blocks []*blockInfo // blocks vertices map[string]*topology.Vertex // object ID to Vertex map nodeInfo map[string]*nodeInfo // node name to nodeInfo map @@ -184,6 +185,7 @@ func (nt *NetworkTopology) initBlocks(graph *topology.Graph) { return } + nt.domains = graph.Domains domainBlocks := toBlockInfos(graph.Domains) nt.blocks = make([]*blockInfo, 0, len(domainBlocks)) indx := 0 @@ -192,8 +194,13 @@ func (nt *NetworkTopology) initBlocks(graph *topology.Graph) { for _, bInfo := range domainBlocks { bInfo.indx = indx for _, node := range bInfo.nodes { + hostInfo := graph.Domains[bInfo.name][node] + if hostInfo == nil { + klog.Warningf("initBlocks: missing host info for node %q in domain %q", node, bInfo.name) + continue + } nt.nodeInfo[node] = &nodeInfo{ - instanceID: graph.Domains[bInfo.name][node], + instanceID: hostInfo.InstanceID, blockID: bInfo.id, blockIndx: ptr.Int(indx), } diff --git a/pkg/translate/yaml.go b/pkg/translate/yaml.go index cea1bc58..d8c62a23 100644 --- a/pkg/translate/yaml.go +++ b/pkg/translate/yaml.go @@ -149,8 +149,13 @@ func (nt *NetworkTopology) getBlockTopologyUnit(topoName string, topoSpec *Topol indx := *info.blockIndx bInfo, ok := blockMap[indx] if !ok { + name := "" + if indx < len(nt.blocks) { + name = nt.blocks[indx].name + } blockMap[indx] = &blockInfo{ indx: indx, + name: name, nodes: []string{nodeName}, } } else { @@ -175,15 +180,18 @@ func (nt *NetworkTopology) getBlockTopologyUnit(topoName string, topoSpec *Topol return bInfos[i].indx < bInfos[j].indx }) + bInfos = nt.complementBlocks(bInfos, topoSpec.BlockSizes) + // populate block topology units ordered by block indices blocks := make([]*Block, 0, len(bInfos)) parents := make(map[string]string) for indx, bInfo := range bInfos { blockName := fmt.Sprintf("block%d", indx+1) - blocks = append(blocks, &Block{ - Name: blockName, - Nodes: strings.Join(cluset.Compact(bInfo.nodes), ","), - }) + block := &Block{Name: blockName} + if len(bInfo.nodes) != 0 { + block.Nodes = strings.Join(cluset.Compact(bInfo.nodes), ",") + } + blocks = append(blocks, block) for _, nodeName := range bInfo.nodes { parents[nodeName] = blockName diff --git a/pkg/translate/yaml_test.go b/pkg/translate/yaml_test.go index a5f623fd..9d8edbe3 100644 --- a/pkg/translate/yaml_test.go +++ b/pkg/translate/yaml_test.go @@ -349,3 +349,59 @@ func TestEmptyPartitionTopology(t *testing.T) { require.Nil(t, err) require.Equal(t, expectedSkeleton, buf.String()) } + +// TestGetBlockTopologyUnitComplementEmptyNodes verifies that per-partition block +// topology pads each accelerator's base blocks to a complete aggregate group. a2 has +// only 2 nodes (1 base block) but groupSize=2 requires 2 slots, so an empty block4 is +// inserted. Tree-capacity expansion rounds 3 groups to 4, adding block7 and block8. +func TestGetBlockTopologyUnitComplementEmptyNodes(t *testing.T) { + expected := `- topology: topo1 + cluster_default: false + block: + block_sizes: + - 2 + - 4 + blocks: + - block: block1 + nodes: n[10-11] + - block: block2 + nodes: n12 + - block: block3 + nodes: n[20-21] + - block: block4 + - block: block5 + nodes: n[31-32] + - block: block6 + nodes: n33 + - block: block7 + - block: block8 +` + domains := topology.NewDomainMap() + for _, n := range []string{"n10", "n11", "n12"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a1", HostName: n, InstanceID: n}) + } + for _, n := range []string{"n20", "n21"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a2", HostName: n, InstanceID: n}) + } + for _, n := range []string{"n31", "n32", "n33"} { + domains.AddHostInfo(&topology.HostInfo{Domain: "a3", HostName: n, InstanceID: n}) + } + + cfg := &Config{ + Topologies: map[string]*TopologySpec{ + "topo1": { + Plugin: topology.TopologyBlock, + Nodes: []string{"n[10-12]", "n[20-21]", "n[31-33]"}, + BlockSizes: []int{2, 4}, + }, + }, + } + + graph := &topology.Graph{Domains: domains} + nt, err := NewNetworkTopology(graph, cfg) + require.NoError(t, err) + + buf := &bytes.Buffer{} + require.Nil(t, nt.Generate(buf)) + require.Equal(t, expected, buf.String()) +}