Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/engines/slinky/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ func (eng *SlinkyEngine) GenerateOutput(ctx context.Context, graph *topology.Gra
GetPartitionNodes: eng.getPartitionNodes,
Params: []any{p.Namespace},
}
cfg, err := slurm.GetTranslateConfig(ctx, &p.BaseParams, resolvedTopologies, topologyNodeFinder)
if err != nil {
return nil, httperr.NewError(http.StatusInternalServerError, err.Error())
cfg, httpErr := slurm.GetTranslateConfig(ctx, &p.BaseParams, resolvedTopologies, topologyNodeFinder)
if httpErr != nil {
return nil, httpErr
}

nt, err := translate.NewNetworkTopology(graph, cfg)
Expand Down
44 changes: 38 additions & 6 deletions pkg/engines/slurm/slurm.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func GenerateOutputParams(ctx context.Context, graph *topology.Graph, params *Pa
params.Plugin = topology.TopologyTree
}

cfg, err := GetTranslateConfig(ctx, &params.BaseParams, params.Topologies, &TopologyNodeFinder{GetPartitionNodes: getPartitionNodes})
if err != nil {
return nil, httperr.NewError(http.StatusInternalServerError, err.Error())
cfg, httpErr := GetTranslateConfig(ctx, &params.BaseParams, params.Topologies, &TopologyNodeFinder{GetPartitionNodes: getPartitionNodes})
if httpErr != nil {
return nil, httpErr
}

nt, err := translate.NewNetworkTopology(graph, cfg)
Expand All @@ -252,7 +252,7 @@ func GenerateOutputParams(ctx context.Context, graph *topology.Graph, params *Pa
}
}

if httpErr := nt.Generate(buf); httpErr != nil {
if httpErr = nt.Generate(buf); httpErr != nil {
return nil, httpErr
}

Expand All @@ -276,7 +276,11 @@ func GenerateOutputParams(ctx context.Context, graph *topology.Graph, params *Pa
return []byte("OK\n"), nil
}

func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[string]*Topology, f *TopologyNodeFinder) (*translate.Config, error) {
func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[string]*Topology, f *TopologyNodeFinder) (*translate.Config, *httperr.Error) {
if err := validateBlockSizes(params.BlockSizes); err != nil {
return nil, httperr.NewError(http.StatusBadRequest, err.Error())
}

cfg := &translate.Config{
Plugin: params.Plugin,
BlockSizes: params.BlockSizes,
Expand All @@ -286,6 +290,9 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[
if len(topologies) != 0 {
cfg.Topologies = make(map[string]*translate.TopologySpec)
for topo, sect := range topologies {
if err := validateBlockSizes(sect.BlockSizes); err != nil {
return nil, httperr.NewError(http.StatusBadRequest, fmt.Sprintf("topology %q: %v", topo, err))
}
spec := &translate.TopologySpec{
Plugin: sect.Plugin,
BlockSizes: sect.BlockSizes,
Expand All @@ -302,7 +309,7 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[
klog.V(4).Infof("%s %q discovered nodes %v", sect.Plugin, topo, nodes)
spec.Nodes = nodes
} else {
return nil, err
return nil, httperr.NewError(http.StatusInternalServerError, err.Error())
}
}
cfg.Topologies[topo] = spec
Expand All @@ -312,6 +319,31 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, topologies map[
return cfg, nil
}

func validateBlockSizes(blockSizes []int) error {
if len(blockSizes) == 0 {
return nil
}
prev := blockSizes[0]
if prev <= 0 {
return fmt.Errorf("blockSizes[0]=%d must be positive", prev)
}
for i := 1; i < len(blockSizes); i++ {
cur := blockSizes[i]
if cur <= 0 {
return fmt.Errorf("blockSizes[%d]=%d must be positive", i, cur)
}
if cur <= prev || cur%prev != 0 {
return fmt.Errorf("blockSizes[%d]=%d must be greater than blockSizes[%d]=%d and be a multiple of power of two", i, cur, i-1, prev)
}
ratio := cur / prev
if ratio&(ratio-1) != 0 {
return fmt.Errorf("blockSizes[%d]/blockSizes[%d]=%d must be a multiple of power of two", i, i-1, ratio)
}
prev = cur
}
return nil
}

func getParams(params map[string]any) (*Params, error) {
var p Params
err := config.Decode(params, &p)
Expand Down
49 changes: 42 additions & 7 deletions pkg/engines/slurm/slurm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,31 @@ func TestGetTranslateConfig(t *testing.T) {
name: "Case 2: valid blocksize",
params: &BaseParams{
Plugin: topology.TopologyBlock,
BlockSizes: []int{2, 4, 8},
BlockSizes: []int{2, 8, 32},
},
cfg: &translate.Config{
Plugin: topology.TopologyBlock,
BlockSizes: []int{2, 4, 8},
BlockSizes: []int{2, 8, 32},
},
},
{
name: "Case 3: with invalid partition topology",
name: "Case 3: invalid top-level blocksize ratio",
params: &BaseParams{
Plugin: topology.TopologyBlock,
BlockSizes: []int{2, 6},
},
err: "blockSizes[1]/blockSizes[0]=3 must be a multiple of power of two",
},
{
name: "Case 4: invalid top-level blocksize multiple",
params: &BaseParams{
Plugin: topology.TopologyBlock,
BlockSizes: []int{2, 5},
},
err: "blockSizes[1]=5 must be greater than blockSizes[0]=2 and be a multiple of power of two",
},
{
name: "Case 5: invalid partition topology",
params: &BaseParams{},
topologies: map[string]*Topology{
"topo1": {
Expand All @@ -266,7 +282,7 @@ func TestGetTranslateConfig(t *testing.T) {
err: "missing partition name",
},
{
name: "Case 4: with valid partition topology",
name: "Case 6: with valid partition topology",
params: &BaseParams{},
topologies: map[string]*Topology{
"default": {
Expand All @@ -292,7 +308,7 @@ func TestGetTranslateConfig(t *testing.T) {
},
},
{
name: "Case 5: explicit empty nodes do not use partition discovery",
name: "Case 7: explicit empty nodes do not use partition discovery",
params: &BaseParams{},
topologies: map[string]*Topology{
"topo": {
Expand All @@ -315,6 +331,18 @@ func TestGetTranslateConfig(t *testing.T) {
},
},
},
{
name: "Case 8: invalid partition blocksize ratio",
params: &BaseParams{},
topologies: map[string]*Topology{
"topo": {
Plugin: topology.TopologyBlock,
BlockSizes: []int{2, 6},
Nodes: []string{"node[001-100]"},
},
},
err: `topology "topo": blockSizes[1]/blockSizes[0]=3 must be a multiple of power of two`,
},
}

for _, tc := range testCases {
Expand All @@ -323,7 +351,7 @@ func TestGetTranslateConfig(t *testing.T) {
if len(tc.err) != 0 {
require.EqualError(t, err, tc.err)
} else {
require.NoError(t, err)
require.Nil(t, err)
require.Equal(t, tc.cfg, cfg)
}
})
Expand Down Expand Up @@ -360,7 +388,14 @@ SwitchName=S3 Nodes=Node[304-306]
code: http.StatusBadRequest,
},
{
name: "Case 3: valid input",
name: "Case 3: invalid semantic blocksize",
graph: graph,
params: map[string]any{"blockSizes": []int{2, 6}},
err: "blockSizes[1]/blockSizes[0]=3 must be a multiple of power of two",
code: http.StatusBadRequest,
},
{
name: "Case 4: valid input",
graph: graph,
cfg: cfg,
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/providers/infiniband/bm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func TestPopulateDomainsFromPdshOutput(t *testing.T) {
node-09: ClusterUUID : 50000000-0000-0000-0000-000000000005
`
domainMap := topology.DomainMap{
"50000000-0000-0000-0000-000000000004.4000000005": map[string]string{"node-07": "node-07", "node-08": "node-08"},
"50000000-0000-0000-0000-000000000005.4000000004": map[string]string{"node-10": "node-10"},
"50000000-0000-0000-0000-000000000005.4000000005": map[string]string{"node-09": "node-09"},
"50000000-0000-0000-0000-000000000004.4000000005": map[string]*topology.HostInfo{"node-07": {Domain: "50000000-0000-0000-0000-000000000004.4000000005", HostName: "node-07", InstanceID: "node-07"}, "node-08": {Domain: "50000000-0000-0000-0000-000000000004.4000000005", HostName: "node-08", InstanceID: "node-08"}},
"50000000-0000-0000-0000-000000000005.4000000004": map[string]*topology.HostInfo{"node-10": {Domain: "50000000-0000-0000-0000-000000000005.4000000004", HostName: "node-10", InstanceID: "node-10"}},
"50000000-0000-0000-0000-000000000005.4000000005": map[string]*topology.HostInfo{"node-09": {Domain: "50000000-0000-0000-0000-000000000005.4000000005", HostName: "node-09", InstanceID: "node-09"}},
}

testCases := []struct {
Expand Down
38 changes: 25 additions & 13 deletions pkg/topology/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,21 @@ import (
"k8s.io/klog/v2"
)

// DomainMap maps domain name to a map of hostname:instance
type DomainMap map[string]map[string]string
type HostInfo struct {
Domain string
InstanceID string
HostName string
}

// DomainMap maps accelerator domain name to host metadata.
type DomainMap map[string]map[string]*HostInfo

func NewDomainMap() DomainMap {
return make(DomainMap)
}

func (m DomainMap) AddHost(domain, instance, host string) {
if domain == "" {
klog.Warningf("skipping topology domain with empty name for host %q (instance %q)", host, instance)
return
}

if hosts, ok := m[domain]; ok {
hosts[host] = instance
return
}

m[domain] = map[string]string{host: instance}
m.AddHostInfo(&HostInfo{Domain: domain, InstanceID: instance, HostName: host})
}

func (m DomainMap) String() string {
Expand All @@ -52,3 +48,19 @@ func (m DomainMap) String() string {
}
return str.String()
}

func (m DomainMap) AddHostInfo(hostInfo *HostInfo) {
if hostInfo == nil {
return
}
if hostInfo.Domain == "" {
klog.Warningf("skipping topology domain with empty name for host %q (instance %q)", hostInfo.HostName, hostInfo.InstanceID)
return
}

if hosts, ok := m[hostInfo.Domain]; ok {
hosts[hostInfo.HostName] = hostInfo
} else {
m[hostInfo.Domain] = map[string]*HostInfo{hostInfo.HostName: hostInfo}
}
}
9 changes: 7 additions & 2 deletions pkg/topology/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ func TestDomainMapAddHost(t *testing.T) {
domainMap.AddHost("", "instance4", "host4")

require.Equal(t, DomainMap{
"domain1": map[string]string{"host1": "instance1", "host2": "instance2"},
"domain2": map[string]string{"host3": "instance3"},
"domain1": map[string]*HostInfo{
"host1": {Domain: "domain1", InstanceID: "instance1", HostName: "host1"},
"host2": {Domain: "domain1", InstanceID: "instance2", HostName: "host2"},
},
"domain2": map[string]*HostInfo{
"host3": {Domain: "domain2", InstanceID: "instance3", HostName: "host3"},
},
}, domainMap)
}
17 changes: 14 additions & 3 deletions pkg/translate/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,27 @@ func getBlockSizes(blocks []*blockInfo, requestedBlockSizes []int) []int {
}

func (nt *NetworkTopology) toBlockTopology(wr io.Writer, skeletonOnly bool) *httperr.Error {
blockSizes := getBlockSizes(nt.blocks, nt.config.BlockSizes)
blocks := nt.complementBlocks(nt.blocks, nt.config.BlockSizes)
// Refresh nodeInfo.blockID so GetNodeTopologySpec returns IDs that match the
// emitted topology file. complementBlocks may renumber blocks when it splits
// a domain across multiple base blocks, invalidating the IDs set by initBlocks.
for _, b := range blocks {
for _, node := range b.nodes {
if info, ok := nt.nodeInfo[node]; ok {
info.blockID = b.id
}
}
}
blockSizes := getBlockSizes(blocks, nt.config.BlockSizes)
Comment thread
ravisoundar marked this conversation as resolved.

for _, bInfo := range nt.blocks {
for _, bInfo := range blocks {
var comment string
if len(bInfo.name) != 0 {
comment = fmt.Sprintf("# %s=%s\n", bInfo.id, bInfo.name)
}

var err error
if skeletonOnly {
if skeletonOnly || len(bInfo.nodes) == 0 {
_, err = fmt.Fprintf(wr, "%sBlockName=%s\n", comment, bInfo.id)
} else {
outputNodeNames := strings.Join(cluset.Compact(bInfo.nodes), ",")
Expand Down
Loading
Loading