Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions api/jobs_batch_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright IBM Corp. 2015, 2026
// SPDX-License-Identifier: BUSL-1.1

package api

type Workload struct {
JobID string
Tenant string
Priority int
}

type BatchQueueStatusResponse struct {
Workloads []Workload
}

type BatchQueueStatusOptions struct{}

// BatchQueueStatus is used to query the current batch job queue.
func (j *Jobs) BatchQueueStatus(opts *BatchQueueStatusOptions, q *QueryOptions) (*BatchQueueStatusResponse, *QueryMeta, error) {
var resp BatchQueueStatusResponse
qm, err := j.client.query("/v1/jobs/queue/status", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}
33 changes: 29 additions & 4 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ type SchedulerConfiguration struct {
// priority jobs to place higher priority jobs.
PreemptionConfig PreemptionConfig

// BatchQueue specifies the configuration of the batch jobs queue
// use to control queueing and scheduling of batch jobs.
//
// "Scheduling" in this context refers to releasing evaluations
// to the eval broker for scheduling with a worker.
BatchQueue BatchQueue

// MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled
MemoryOversubscriptionEnabled bool

Expand Down Expand Up @@ -215,14 +222,23 @@ type SchedulerSetConfigurationResponse struct {
WriteMeta
}

// SchedulerAlgorithm is an enum string that encapsulates the valid options for a
// SchedulerConfiguration block's SchedulerAlgorithm. These modes will allow the
// scheduler to be user-selectable.
type SchedulerAlgorithm string
// Enum strings that encapsulate the valid options for a
// their respective scheduler configuration blocks. These modes
// allow the config to be user-selectable.
type (
SchedulerAlgorithm string
BatchQueueTenant string
BatchQueueType string
)

const (
SchedulerAlgorithmBinpack SchedulerAlgorithm = "binpack"
SchedulerAlgorithmSpread SchedulerAlgorithm = "spread"

BatchQueueTypeDynamic BatchQueueType = "dynamicPriority"

TenantTypeMetadata BatchQueueTenant = "metadata"
TenantTypeNamespace BatchQueueTenant = "namespace"
)

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
Expand All @@ -233,6 +249,15 @@ type PreemptionConfig struct {
ServiceSchedulerEnabled bool
}

// BatchQueue is the configuration for a batch job queue used to control scheduling
// of batch jobs.
type BatchQueue struct {
Type BatchQueueType
TenantType BatchQueueTenant
MetadataKey string
Config map[string]any
}

// SchedulerGetConfiguration is used to query the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) {
var resp SchedulerConfigurationResponse
Expand Down
3 changes: 2 additions & 1 deletion ci/test-core.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@
"nomad/auth/...",
"nomad/deploymentwatcher/...",
"nomad/drainer/...",
"nomad/reporting/...",
"nomad/lock/...",
"nomad/peers/...",
"nomad/queues/...",
"nomad/reporting/...",
"nomad/state/...",
"nomad/stream/...",
"nomad/structs/...",
Expand Down
6 changes: 6 additions & 0 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,12 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled,
},
BatchQueue: structs.BatchQueue{
Type: structs.BatchQueueType(conf.BatchQueue.Type),
TenantType: structs.BatchQueueTenant(conf.BatchQueue.TenantType),
MetadataKey: conf.BatchQueue.MetadataKey,
Config: conf.BatchQueue.Config,
},
}

if err := args.Config.Validate(); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"job queue": func() (cli.Command, error) {
return &JobQueueCommand{
Meta: meta,
}, nil
},
"job revert": func() (cli.Command, error) {
return &JobRevertCommand{
Meta: meta,
Expand Down
136 changes: 136 additions & 0 deletions command/job_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright IBM Corp. 2015, 2026
// SPDX-License-Identifier: BUSL-1.1

package command

import (
"encoding/json"
"fmt"
"strings"

"github.com/hashicorp/nomad/api"
"github.com/posener/complete"
"github.com/ryanuber/columnize"
)

type JobQueueCommand struct {
Meta
}

func (c *JobQueueCommand) Help() string {
helpText := `
Usage: nomad job queue [options]

View the current status of workloads queued in a batch job queue.

When ACLs are enabled, this command requires a token with either TBD
capabilities. Probably at least 'list-jobs'.

General Options:

` + generalOptionsUsage(usageOptsDefault) + `

Eval Options:

-limit
The maximum number of workloads to return

-verbose
Display full output

-json
Display output as json

`
return strings.TrimSpace(helpText)
}

func (c *JobQueueCommand) Synopsis() string {
return "View the status of a batch job queue"
}

func (c *JobQueueCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-verbose": complete.PredictNothing,
"-limit": complete.PredictNothing,
"-json": complete.PredictNothing,
})
}

func (c *JobQueueCommand) AutocompleteArgs() complete.Predictor {
return JobPredictor(c.Meta.Client)
}

func (c *JobQueueCommand) Name() string { return "job queue" }

func (c *JobQueueCommand) Run(args []string) int {
var verbose, json bool
var limit int
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&json, "json", false, "")
flags.IntVar(&limit, "limit", 0, "")

if err := flags.Parse(args); err != nil {
return 1
}

// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 255
}

// Setup the options
qo := &api.QueryOptions{}

if limit > 0 {
qo.PerPage = int32(limit)
}

// Submit the request
resp, _, err := client.Jobs().BatchQueueStatus(nil, qo)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error during batch queue request: %s", err))
return 255
}
if resp == nil {
c.Ui.Error("Empty batch queue response")
}

if json {
if err := c.printJSON(resp); err != nil {
c.Ui.Error("Error unmarshaling json response")
return 255
}
} else {
c.printFormatted(resp)
}
return 0
}

func (c *JobQueueCommand) printJSON(resp *api.BatchQueueStatusResponse) error {
out, err := json.Marshal(resp.Workloads)
if err != nil {
return err
}

c.Ui.Output(string(out))
return nil
}

func (c *JobQueueCommand) printFormatted(resp *api.BatchQueueStatusResponse) {

out := make([]string, len(resp.Workloads)+1)
out[0] = "JobID|Tenant|Priority"

for i, v := range resp.Workloads {
out[i+1] = fmt.Sprintf("%s|%s|%d", v.JobID, v.Tenant, v.Priority)
}

c.Ui.Output(c.Colorize().Color("[bold]Batch Queue Workloads[reset]"))
c.Ui.Output(columnize.SimpleFormat(out))
}
64 changes: 64 additions & 0 deletions command/job_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright IBM Corp. 2015, 2026
// SPDX-License-Identifier: BUSL-1.1

package command

import (
"fmt"
"testing"

"github.com/hashicorp/cli"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/shoenig/test/must"
)

func TestJobQueue_Implements(t *testing.T) {
ci.Parallel(t)
var _ cli.Command = &JobQueueCommand{}
}

func TestJobQueue_printFormatted(t *testing.T) {
ci.Parallel(t)
ui := cli.NewMockUi()
cmd := &JobQueueCommand{Meta: Meta{Ui: ui}}

testResp := &api.BatchQueueStatusResponse{
Workloads: []api.Workload{
{
JobID: "123",
Tenant: "testTenant1",
Priority: 5,
},
},
}
cmd.printFormatted(testResp)

expect := "Batch Queue Workloads\n" +
"JobID Tenant Priority\n" +
"123 testTenant1 5\n"

fmt.Println(ui.OutputWriter.String())
must.Eq(t, expect, ui.OutputWriter.String())
}

func TestJobQueue_printJSON(t *testing.T) {
ci.Parallel(t)
ui := cli.NewMockUi()
cmd := &JobQueueCommand{Meta: Meta{Ui: ui}}

testResp := &api.BatchQueueStatusResponse{
Workloads: []api.Workload{
{
JobID: "123",
Tenant: "testTenant1",
Priority: 5,
},
},
}
cmd.printJSON(testResp)

expect := "[{\"JobID\":\"123\",\"Tenant\":\"testTenant1\",\"Priority\":5}]\n"

must.Eq(t, expect, ui.OutputWriter.String())
}
27 changes: 23 additions & 4 deletions command/operator_scheduler_get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func (o *OperatorSchedulerGetConfig) Run(args []string) int {

schedConfig := resp.SchedulerConfig

// Output the information.
o.Ui.Output(formatKV([]string{
out := []string{
fmt.Sprintf("Scheduler Algorithm|%s", schedConfig.SchedulerAlgorithm),
fmt.Sprintf("Memory Oversubscription|%v", schedConfig.MemoryOversubscriptionEnabled),
fmt.Sprintf("Reject Job Registration|%v", schedConfig.RejectJobRegistration),
Expand All @@ -87,8 +86,28 @@ func (o *OperatorSchedulerGetConfig) Run(args []string) int {
fmt.Sprintf("Preemption Batch Scheduler|%v", schedConfig.PreemptionConfig.BatchSchedulerEnabled),
fmt.Sprintf("Preemption SysBatch Scheduler|%v", schedConfig.PreemptionConfig.SysBatchSchedulerEnabled),
fmt.Sprintf("Node Limit For Feasibility Checks|%v", schedConfig.NodeLimitForFeasibilityChecks),
fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex),
}))
fmt.Sprintf("Batch Queue Type|%v", schedConfig.BatchQueue.Type),
}

if schedConfig.BatchQueue.Type != "" {
out = append(out, fmt.Sprintf("Batch Queue Tenant Type|%v", schedConfig.BatchQueue.TenantType))

// only append metadata key if it's set
if schedConfig.BatchQueue.TenantType == "metadata" {
out = append(out, fmt.Sprintf("Batch Queue Metadata Key|%v", schedConfig.BatchQueue.MetadataKey))
}

conf := ""
for k, v := range schedConfig.BatchQueue.Config {
conf = fmt.Sprintf("%s%s", conf, fmt.Sprintf("%v:%v ", k, v))
}
out = append(out, fmt.Sprintf("Batch Queue Config|%v", conf))
}

out = append(out, fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex))

// Output the information.
o.Ui.Output(formatKV(out))
return 0
}

Expand Down
4 changes: 4 additions & 0 deletions command/operator_scheduler_get_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func TestOperatorSchedulerGetConfig_Run(t *testing.T) {
s := ui.OutputWriter.String()
must.StrContains(t, s, "Scheduler Algorithm = binpack")
must.StrContains(t, s, "Preemption SysBatch Scheduler = false")
must.StrContains(t, s, "Scheduler Algorithm = binpack")
must.StrContains(t, s, "Preemption SysBatch Scheduler = false")
must.StrContains(t, s, "Node Limit For Feasibility Checks = 0")
must.StrContains(t, s, "Batch Queue Type =")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()

Expand Down
Loading
Loading