From 71efc64313a8c2bb1c2c10473549fb96b3a87b39 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Thu, 7 May 2026 12:25:33 -0400 Subject: [PATCH 01/14] batch queue: add config parameters to scheduler_configuration (#27816) --- api/operator.go | 33 ++++- command/operator_scheduler_get_config.go | 27 +++- command/operator_scheduler_get_config_test.go | 4 + nomad/structs/node_pool.go | 4 + nomad/structs/operator.go | 93 +++++++++++++ nomad/structs/operator_test.go | 126 ++++++++++++++++++ 6 files changed, 279 insertions(+), 8 deletions(-) diff --git a/api/operator.go b/api/operator.go index fc974d83fbf..ae517f6ba8a 100644 --- a/api/operator.go +++ b/api/operator.go @@ -174,6 +174,13 @@ type SchedulerConfiguration struct { // priority jobs to place higher priority jobs. PreemptionConfig PreemptionConfig + // BatchQueue specifies the configuration of the batch jobs queue + // use to control queueing and scheduling of batch jobs. + // + // "Scheduling" in this context refers to releasing evaluations + // to the eval broker for scheduling with a worker. + BatchQueue BatchQueue + // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool @@ -215,14 +222,23 @@ type SchedulerSetConfigurationResponse struct { WriteMeta } -// SchedulerAlgorithm is an enum string that encapsulates the valid options for a -// SchedulerConfiguration block's SchedulerAlgorithm. These modes will allow the -// scheduler to be user-selectable. -type SchedulerAlgorithm string +// Enum strings that encapsulate the valid options for a +// their respective scheduler configuration blocks. These modes +// allow the config to be user-selectable. +type ( + SchedulerAlgorithm string + BatchQueueTenant string + BatchQueueType string +) const ( SchedulerAlgorithmBinpack SchedulerAlgorithm = "binpack" SchedulerAlgorithmSpread SchedulerAlgorithm = "spread" + + BatchQueueTypeDynamic BatchQueueType = "dynamicPriority" + + TenantTypeMetadata BatchQueueTenant = "metadata" + TenantTypeNamespace BatchQueueTenant = "namespace" ) // PreemptionConfig specifies whether preemption is enabled based on scheduler type @@ -233,6 +249,15 @@ type PreemptionConfig struct { ServiceSchedulerEnabled bool } +// BatchQueue is the configuration for a batch job queue used to control scheduling +// of batch jobs. +type BatchQueue struct { + Type BatchQueueType + TenantType BatchQueueTenant + MetadataKey string + Config map[string]any +} + // SchedulerGetConfiguration is used to query the current Scheduler configuration. func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) { var resp SchedulerConfigurationResponse diff --git a/command/operator_scheduler_get_config.go b/command/operator_scheduler_get_config.go index feb88297211..d1fa2447f5d 100644 --- a/command/operator_scheduler_get_config.go +++ b/command/operator_scheduler_get_config.go @@ -76,8 +76,7 @@ func (o *OperatorSchedulerGetConfig) Run(args []string) int { schedConfig := resp.SchedulerConfig - // Output the information. - o.Ui.Output(formatKV([]string{ + out := []string{ fmt.Sprintf("Scheduler Algorithm|%s", schedConfig.SchedulerAlgorithm), fmt.Sprintf("Memory Oversubscription|%v", schedConfig.MemoryOversubscriptionEnabled), fmt.Sprintf("Reject Job Registration|%v", schedConfig.RejectJobRegistration), @@ -87,8 +86,28 @@ func (o *OperatorSchedulerGetConfig) Run(args []string) int { fmt.Sprintf("Preemption Batch Scheduler|%v", schedConfig.PreemptionConfig.BatchSchedulerEnabled), fmt.Sprintf("Preemption SysBatch Scheduler|%v", schedConfig.PreemptionConfig.SysBatchSchedulerEnabled), fmt.Sprintf("Node Limit For Feasibility Checks|%v", schedConfig.NodeLimitForFeasibilityChecks), - fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex), - })) + fmt.Sprintf("Batch Queue Type|%v", schedConfig.BatchQueue.Type), + } + + if schedConfig.BatchQueue.Type != "" { + out = append(out, fmt.Sprintf("Batch Queue Tenant Type|%v", schedConfig.BatchQueue.TenantType)) + + // only append metadata key if it's set + if schedConfig.BatchQueue.TenantType == "metadata" { + out = append(out, fmt.Sprintf("Batch Queue Metadata Key|%v", schedConfig.BatchQueue.MetadataKey)) + } + + conf := "" + for k, v := range schedConfig.BatchQueue.Config { + conf = fmt.Sprintf("%s%s", conf, fmt.Sprintf("%v:%v ", k, v)) + } + out = append(out, fmt.Sprintf("Batch Queue Config|%v", conf)) + } + + out = append(out, fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex)) + + // Output the information. + o.Ui.Output(formatKV(out)) return 0 } diff --git a/command/operator_scheduler_get_config_test.go b/command/operator_scheduler_get_config_test.go index a0be94b3da5..2cd0bbc9143 100644 --- a/command/operator_scheduler_get_config_test.go +++ b/command/operator_scheduler_get_config_test.go @@ -27,6 +27,10 @@ func TestOperatorSchedulerGetConfig_Run(t *testing.T) { s := ui.OutputWriter.String() must.StrContains(t, s, "Scheduler Algorithm = binpack") must.StrContains(t, s, "Preemption SysBatch Scheduler = false") + must.StrContains(t, s, "Scheduler Algorithm = binpack") + must.StrContains(t, s, "Preemption SysBatch Scheduler = false") + must.StrContains(t, s, "Node Limit For Feasibility Checks = 0") + must.StrContains(t, s, "Batch Queue Type =") ui.ErrorWriter.Reset() ui.OutputWriter.Reset() diff --git a/nomad/structs/node_pool.go b/nomad/structs/node_pool.go index b4dfc4308d7..05aa643df0c 100644 --- a/nomad/structs/node_pool.go +++ b/nomad/structs/node_pool.go @@ -249,6 +249,10 @@ type NodePoolSchedulerConfiguration struct { // If not defined, the global cluster scheduling algorithm is used. SchedulerAlgorithm SchedulerAlgorithm `hcl:"scheduler_algorithm"` + // BatchQueue defines the batch job queue configuration used + // to control scheduling of batch jobs. + BatchQueue BatchQueue `hcl:"batch_queue"` + // MemoryOversubscriptionEnabled specifies whether memory oversubscription // is enabled. If not defined, the global cluster configuration is used. MemoryOversubscriptionEnabled *bool `hcl:"memory_oversubscription_enabled"` diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index 3006572ccde..c71825ddbc8 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -224,6 +224,10 @@ type SchedulerConfiguration struct { // priority jobs to place higher priority jobs. PreemptionConfig PreemptionConfig `hcl:"preemption_config"` + // BatchQueue specifies the batch queue for this scheduler configuration + // which defines the behavior for scheduling batch job evaluations. + BatchQueue BatchQueue `hcl:"batch_queue"` + // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool `hcl:"memory_oversubscription_enabled"` @@ -286,6 +290,11 @@ func (s *SchedulerConfiguration) WithNodePool(pool *NodePool) *SchedulerConfigur if poolConfig.SchedulerAlgorithm != "" { schedConfig.SchedulerAlgorithm = poolConfig.SchedulerAlgorithm } + + if poolConfig.BatchQueue.Type != "" { + schedConfig.BatchQueue = poolConfig.BatchQueue + } + if poolConfig.MemoryOversubscriptionEnabled != nil { schedConfig.MemoryOversubscriptionEnabled = *poolConfig.MemoryOversubscriptionEnabled } @@ -310,6 +319,10 @@ func (s *SchedulerConfiguration) Validate() error { return fmt.Errorf("invalid scheduler algorithm: %v", s.SchedulerAlgorithm) } + if err := s.BatchQueue.Validate(); err != nil { + return err + } + return nil } @@ -346,6 +359,86 @@ type PreemptionConfig struct { ServiceSchedulerEnabled bool `hcl:"service_scheduler_enabled"` } +type ( + BatchQueueType string + BatchQueueTenant string +) + +const ( + BatchQueueTypeDynamic BatchQueueType = "dynamicPriorty" + + TenantTypeMetadata BatchQueueTenant = "metadata" + TenantTypeNamespace BatchQueueTenant = "namespace" + + DynamicCalcInterval = "calc_interval" + DynamicMaxAge = "max_age" +) + +type BatchQueue struct { + Type BatchQueueType `hcl:"type"` + TenantType BatchQueueTenant `hcl:"tenant_type"` + MetadataKey string `hcl:"metadata_key"` + Config map[string]any `hcl:"config"` +} + +type DynamicQueueConfig struct { + CalcInterval time.Duration + MaxAge time.Duration + MaxSize int + AgeWeight int + UsageWeight int + SizeWeight int +} + +func validateDuration(val any) error { + switch t := val.(type) { + case string: + if _, err := time.ParseDuration(t); err != nil { + return err + } + case int, nil: + default: + return fmt.Errorf("value not a duration: %v", val) + } + + return nil +} + +func (b *BatchQueue) Validate() error { + if b.Type == "" { + switch { + case b.TenantType != "", b.MetadataKey != "", b.Config != nil: + return errors.New("batch queue configuration found but no type specified") + } + + return nil + } + + switch b.Type { + case BatchQueueTypeDynamic: + if err := validateDuration(b.Config[DynamicCalcInterval]); err != nil { + return fmt.Errorf("failed to parse calc_interval: %v", err) + } + if err := validateDuration(b.Config[DynamicMaxAge]); err != nil { + return fmt.Errorf("failed to parse max_age: %v", err) + } + default: + return fmt.Errorf("unsupported batch queue type: %q", b.Type) + } + + switch b.TenantType { + case TenantTypeNamespace: + case TenantTypeMetadata: + if b.MetadataKey == "" { + return fmt.Errorf("metadata key must be specified if using metadata tenency") + } + default: + return fmt.Errorf("unsupported tenant type: %q", b.TenantType) + } + + return nil +} + // SchedulerSetConfigRequest is used by the Operator endpoint to update the // current Scheduler configuration of the cluster. type SchedulerSetConfigRequest struct { diff --git a/nomad/structs/operator_test.go b/nomad/structs/operator_test.go index b9d013022fb..371a37bc8c2 100644 --- a/nomad/structs/operator_test.go +++ b/nomad/structs/operator_test.go @@ -86,10 +86,16 @@ func TestSchedulerConfiguration_WithNodePool(t *testing.T) { pool: &NodePool{ SchedulerConfiguration: &NodePoolSchedulerConfiguration{ MemoryOversubscriptionEnabled: pointer.Of(true), + BatchQueue: BatchQueue{ + Type: "test", + }, }, }, expected: &SchedulerConfiguration{ MemoryOversubscriptionEnabled: true, + BatchQueue: BatchQueue{ + Type: "test", + }, }, }, { @@ -140,3 +146,123 @@ func TestSchedulerConfiguration_WithNodePool(t *testing.T) { }) } } + +func TestSchedulerConfiguration_Validate(t *testing.T) { + + testCases := []struct { + name string + schedConfig *SchedulerConfiguration + err string + }{ + { + name: "invalid scheduler algorithm", + schedConfig: &SchedulerConfiguration{ + SchedulerAlgorithm: "not-good", + }, + err: "invalid scheduler algorithm: not-good", + }, + { + name: "valid scheduler algorithm", + schedConfig: &SchedulerConfiguration{ + SchedulerAlgorithm: SchedulerAlgorithmBinpack, + }, + err: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.schedConfig.Validate() + if tc.err != "" { + must.ErrorContains(t, err, tc.err) + } else { + must.NoError(t, err) + } + }) + } +} + +func TestBatchQueue_Validate(t *testing.T) { + + testCases := []struct { + name string + batchConfig BatchQueue + err string + }{ + { + name: "invalid queue type", + batchConfig: BatchQueue{ + Type: "foo", + }, + err: "unsupported batch queue type", + }, + { + name: "invalid metadata type", + batchConfig: BatchQueue{ + Type: BatchQueueTypeDynamic, + TenantType: "foo", + }, + err: "unsupported tenant type", + }, + { + name: "batch config with no type", + batchConfig: BatchQueue{ + Type: "", + TenantType: TenantTypeNamespace, + }, + err: "batch queue configuration found but no type specified", + }, + { + name: "empty metadata key errors", + batchConfig: BatchQueue{ + Type: BatchQueueTypeDynamic, + TenantType: TenantTypeMetadata, + }, + err: "metadata key must be specified", + }, + { + name: "dynamicPriority - invalid interval", + batchConfig: BatchQueue{ + Type: BatchQueueTypeDynamic, + TenantType: TenantTypeNamespace, + Config: map[string]any{ + "calc_interval": "hello", + }, + }, + err: "failed to parse", + }, + { + name: "dynamicPriority - valid string interval", + batchConfig: BatchQueue{ + Type: BatchQueueTypeDynamic, + TenantType: TenantTypeNamespace, + Config: map[string]any{ + "calc_interval": "1h", + }, + }, + err: "", + }, + { + name: "dynamicPriority - valid int interval", + batchConfig: BatchQueue{ + Type: BatchQueueTypeDynamic, + TenantType: TenantTypeNamespace, + Config: map[string]any{ + "calc_interval": 1000, + }, + }, + err: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.batchConfig.Validate() + if tc.err != "" { + must.ErrorContains(t, err, tc.err) + } else { + must.NoError(t, err) + } + }) + } +} From de132a941ca04dca4607f01230cec6513b728c10 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Thu, 7 May 2026 12:27:07 -0400 Subject: [PATCH 02/14] command: initial batch queue cli implementation (#27909) --- api/jobs_batch_queue.go | 26 ++++++++ command/commands.go | 5 ++ command/job_queue.go | 136 ++++++++++++++++++++++++++++++++++++++ command/job_queue_test.go | 64 ++++++++++++++++++ 4 files changed, 231 insertions(+) create mode 100644 api/jobs_batch_queue.go create mode 100644 command/job_queue.go create mode 100644 command/job_queue_test.go diff --git a/api/jobs_batch_queue.go b/api/jobs_batch_queue.go new file mode 100644 index 00000000000..bd205ff427e --- /dev/null +++ b/api/jobs_batch_queue.go @@ -0,0 +1,26 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package api + +type Workload struct { + JobID string + Tenant string + Priority int +} + +type BatchQueueStatusResponse struct { + Workloads []Workload +} + +type BatchQueueStatusOptions struct{} + +// BatchQueueStatus is used to query the current batch job queue. +func (j *Jobs) BatchQueueStatus(opts *BatchQueueStatusOptions, q *QueryOptions) (*BatchQueueStatusResponse, *QueryMeta, error) { + var resp BatchQueueStatusResponse + qm, err := j.client.query("/v1/jobs/queue/status", &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} diff --git a/command/commands.go b/command/commands.go index 344d898e86b..de786f55bb5 100644 --- a/command/commands.go +++ b/command/commands.go @@ -501,6 +501,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "job queue": func() (cli.Command, error) { + return &JobQueueCommand{ + Meta: meta, + }, nil + }, "job revert": func() (cli.Command, error) { return &JobRevertCommand{ Meta: meta, diff --git a/command/job_queue.go b/command/job_queue.go new file mode 100644 index 00000000000..c0d79da5fb5 --- /dev/null +++ b/command/job_queue.go @@ -0,0 +1,136 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package command + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/hashicorp/nomad/api" + "github.com/posener/complete" + "github.com/ryanuber/columnize" +) + +type JobQueueCommand struct { + Meta +} + +func (c *JobQueueCommand) Help() string { + helpText := ` +Usage: nomad job queue [options] + + View the current status of workloads queued in a batch job queue. + + When ACLs are enabled, this command requires a token with either TBD + capabilities. Probably at least 'list-jobs'. + +General Options: + + ` + generalOptionsUsage(usageOptsDefault) + ` + +Eval Options: + + -limit + The maximum number of workloads to return + + -verbose + Display full output + + -json + Display output as json + +` + return strings.TrimSpace(helpText) +} + +func (c *JobQueueCommand) Synopsis() string { + return "View the status of a batch job queue" +} + +func (c *JobQueueCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-verbose": complete.PredictNothing, + "-limit": complete.PredictNothing, + "-json": complete.PredictNothing, + }) +} + +func (c *JobQueueCommand) AutocompleteArgs() complete.Predictor { + return JobPredictor(c.Meta.Client) +} + +func (c *JobQueueCommand) Name() string { return "job queue" } + +func (c *JobQueueCommand) Run(args []string) int { + var verbose, json bool + var limit int + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&verbose, "verbose", false, "") + flags.BoolVar(&json, "json", false, "") + flags.IntVar(&limit, "limit", 0, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 255 + } + + // Setup the options + qo := &api.QueryOptions{} + + if limit > 0 { + qo.PerPage = int32(limit) + } + + // Submit the request + resp, _, err := client.Jobs().BatchQueueStatus(nil, qo) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error during batch queue request: %s", err)) + return 255 + } + if resp == nil { + c.Ui.Error("Empty batch queue response") + } + + if json { + if err := c.printJSON(resp); err != nil { + c.Ui.Error("Error unmarshaling json response") + return 255 + } + } else { + c.printFormatted(resp) + } + return 0 +} + +func (c *JobQueueCommand) printJSON(resp *api.BatchQueueStatusResponse) error { + out, err := json.Marshal(resp.Workloads) + if err != nil { + return err + } + + c.Ui.Output(string(out)) + return nil +} + +func (c *JobQueueCommand) printFormatted(resp *api.BatchQueueStatusResponse) { + + out := make([]string, len(resp.Workloads)+1) + out[0] = "JobID|Tenant|Priority" + + for i, v := range resp.Workloads { + out[i+1] = fmt.Sprintf("%s|%s|%d", v.JobID, v.Tenant, v.Priority) + } + + c.Ui.Output(c.Colorize().Color("[bold]Batch Queue Workloads[reset]")) + c.Ui.Output(columnize.SimpleFormat(out)) +} diff --git a/command/job_queue_test.go b/command/job_queue_test.go new file mode 100644 index 00000000000..474ec21e2b4 --- /dev/null +++ b/command/job_queue_test.go @@ -0,0 +1,64 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package command + +import ( + "fmt" + "testing" + + "github.com/hashicorp/cli" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" +) + +func TestJobQueue_Implements(t *testing.T) { + ci.Parallel(t) + var _ cli.Command = &JobQueueCommand{} +} + +func TestJobQueue_printFormatted(t *testing.T) { + ci.Parallel(t) + ui := cli.NewMockUi() + cmd := &JobQueueCommand{Meta: Meta{Ui: ui}} + + testResp := &api.BatchQueueStatusResponse{ + Workloads: []api.Workload{ + { + JobID: "123", + Tenant: "testTenant1", + Priority: 5, + }, + }, + } + cmd.printFormatted(testResp) + + expect := "Batch Queue Workloads\n" + + "JobID Tenant Priority\n" + + "123 testTenant1 5\n" + + fmt.Println(ui.OutputWriter.String()) + must.Eq(t, expect, ui.OutputWriter.String()) +} + +func TestJobQueue_printJSON(t *testing.T) { + ci.Parallel(t) + ui := cli.NewMockUi() + cmd := &JobQueueCommand{Meta: Meta{Ui: ui}} + + testResp := &api.BatchQueueStatusResponse{ + Workloads: []api.Workload{ + { + JobID: "123", + Tenant: "testTenant1", + Priority: 5, + }, + }, + } + cmd.printJSON(testResp) + + expect := "[{\"JobID\":\"123\",\"Tenant\":\"testTenant1\",\"Priority\":5}]\n" + + must.Eq(t, expect, ui.OutputWriter.String()) +} From 2641634c1be3ac28f20f9b9cc4fa3d0458e2613f Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Thu, 7 May 2026 14:20:05 -0400 Subject: [PATCH 03/14] batch job queue: adds initial empty implementation (#27841) --- ci/test-core.json | 3 +- nomad/queues/batch_job_queue.go | 292 +++++++++++++++++++++++++++ nomad/queues/batch_job_queue_test.go | 133 ++++++++++++ nomad/queues/interface.go | 10 + nomad/queues/priority_queue.go | 45 +++++ 5 files changed, 482 insertions(+), 1 deletion(-) create mode 100644 nomad/queues/batch_job_queue.go create mode 100644 nomad/queues/batch_job_queue_test.go create mode 100644 nomad/queues/interface.go create mode 100644 nomad/queues/priority_queue.go diff --git a/ci/test-core.json b/ci/test-core.json index 4756a5ed233..be94013fffa 100644 --- a/ci/test-core.json +++ b/ci/test-core.json @@ -36,9 +36,10 @@ "nomad/auth/...", "nomad/deploymentwatcher/...", "nomad/drainer/...", - "nomad/reporting/...", "nomad/lock/...", "nomad/peers/...", + "nomad/queues/...", + "nomad/reporting/...", "nomad/state/...", "nomad/stream/...", "nomad/structs/...", diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go new file mode 100644 index 00000000000..62e2a2cb4a8 --- /dev/null +++ b/nomad/queues/batch_job_queue.go @@ -0,0 +1,292 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import ( + "container/heap" + "context" + "errors" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +var ErrWatchedEvalNotFound = errors.New("watched evaluation not found") + +type TenantID string + +type DynamicPriorityQueue struct { + // tenants is used to keep track of cluster usage for this queue. + // When workloads are placed or the configured interval is passed, + // cluster usage is updated for the workloads of each tenant. + tenants map[TenantID]Tenant + + // queue is the main datastructure that contains all pending workloads + // + // TODO: at the moment, this is using the go stdlib container/heap package, + // but we may want to switch to treeset from Hashicorp's go-set. + // Why? Both have O(logn) push/pop. Heap has constant time peeking, but + // we don't use that. We do want to iterate over workloads quickly, which + // we can do with a red-black tree. + queue WorkloadQueue + + // qMux locks the queue during concurrent access + qMux sync.Mutex + + // qNotify allows for notifying the consumer that workloads + // have been added to the queue + qNotify chan struct{} + + // enqueueCh is used to buffer workloads before they + // are processed by the manager and pushed onto the queue + enqueueCh chan *Workload + + // totalUsage is the sum of all tenant usages + totalUsage int + + // conf contains user configurations for tuning the behavior of the queue + conf *DynamicPriorityConfig + + // evalBroker is the injected broker for passing an evaluation + // on to be scheduled by Nomad + evalBroker Queue + + // state is the in-memory state store used for both reconciling tenant + // workload usages, and polling submitted evaluations for placement + state *state.StateStore + logger hclog.Logger +} + +type DynamicPriorityConfig struct { + TenantType string + MetadataKey string + CalcInterval time.Duration +} + +type Tenant struct { + tid TenantID + workloads map[string]*Workload + usage int +} + +type Workload struct { + id string + tid TenantID + priority int + eval *structs.Evaluation + size int + index int +} + +func (w *Workload) calculatePriority(_ int64) { + // unimplemented +} + +func NewDynamicPriorityQueue(state *state.StateStore, broker Queue, conf *DynamicPriorityConfig, logger hclog.Logger) *DynamicPriorityQueue { + return &DynamicPriorityQueue{ + tenants: map[TenantID]Tenant{}, + queue: WorkloadQueue{}, + state: state, + enqueueCh: make(chan *Workload, 8096), + evalBroker: broker, + qMux: sync.Mutex{}, + qNotify: make(chan struct{}, 1), + conf: conf, + logger: logger.Named("Dynamic Priority Queue"), + } +} + +func (d *DynamicPriorityQueue) Start(ctx context.Context) { + // rebuild internal state from statestore, unimplemented + + go d.runProducer(ctx) + go d.runConsumer(ctx) +} + +// Enqueue is the method used to put evaluations on the queue. +// It generates a workload with an empty priority, appends it +// to an internal channel to be processed and added to the actual +// heap container. +func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation) { + w := d.generateWorkload(e) + // in the event of an empty workload, just pass eval to eval broker + if w == nil { + d.evalBroker.Enqueue(e) + return + } + + d.enqueueCh <- w +} + +// runProducer pushes workloads onto the queue and notifies the consumer +// goroutine. It also updates priorities on the configured interval. +func (d *DynamicPriorityQueue) runProducer(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case w := <-d.enqueueCh: + w.calculatePriority(w.eval.CreateTime) + + d.qMux.Lock() + heap.Push(&d.queue, w) + d.qMux.Unlock() + + // Notify Workload consumer of new workload + select { + case d.qNotify <- struct{}{}: + default: + } + case <-time.After(d.conf.CalcInterval): + d.qMux.Lock() + d.calculatePriorities(time.Now().UnixNano()) + heap.Init(&d.queue) + d.qMux.Unlock() + } + } +} + +// runConsumer pops the highest priority workloads off the queue one +// at a time, enqueues them onto the Eval Broker, and waits for them +// to be placed before continuing. +func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-d.qNotify: + + // Pop a workload off the queue if available + d.qMux.Lock() + workload := heap.Pop(&d.queue).(*Workload) + d.qMux.Unlock() + + // Give the eval to the eval broker + d.evalBroker.Enqueue(workload.eval) + + // Wait for the eval to be placed + err := d.waitForPlacement(ctx, workload.eval, memdb.NewWatchSet()) + if err != nil { + d.logger.Error("failure waiting for workload placement", "evalID", workload.eval) + } + + d.qMux.Lock() + l := d.queue.Len() + d.qMux.Unlock() + + // If the queue still has work, notify self + // to continue. + if l > 0 { + select { + case d.qNotify <- struct{}{}: + default: + } + } + } + } +} + +// generateWorkload is used to create an initial workload from a given evaluation +func (d *DynamicPriorityQueue) generateWorkload(e *structs.Evaluation) *Workload { + job, err := d.state.JobByID(nil, e.Namespace, e.JobID) + if err != nil { + return nil + } + + tid := "" + switch d.conf.TenantType { + case "namespace": + tid = job.Namespace + case "metadata": + tenantID, ok := job.Meta[d.conf.MetadataKey] + if !ok { + return nil + } + tid = tenantID + default: + d.logger.Error("unknown tenant type, this is a bug.") + return nil + } + + return &Workload{ + tid: TenantID(tid), + priority: 0, + eval: e, + size: 0, + } +} + +func (d *DynamicPriorityQueue) calculatePriorities(time int64) { + // Decay tenant workload usages first, because a workload's + // priority relies on its tenant's usage. + for _, tenant := range d.tenants { + for range tenant.workloads { + // Unimplemented + d.totalUsage -= 0 + tenant.usage -= 0 + } + } + + // Now that we have accurate tenant usage, calculate + // each workloads new priority + for _, workload := range d.queue { + workload.calculatePriority(time) + } +} + +// waitForPlacement follows a given evalutation in the state store until it, or it's nexted/blocked evals +// have been marked terminal, indicating the workload has been scheduled. +// +// Note: If a job with an unsatisfiable contraint is given to the Eval Broker, this function will block +// until a Nomad operator manually intervenes and stops the job. In the future, we can add an optional +// configurable timeout for this blocking query. +func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *structs.Evaluation, ws memdb.WatchSet) error { + for !eval.TerminalStatus() || eval.BlockedEval != "" || eval.NextEval != "" { + id := eval.ID + + if eval.BlockedEval != "" { + id = eval.BlockedEval + } else if eval.NextEval != "" { + id = eval.NextEval + } + + snap, err := d.state.Snapshot() + if err != nil { + return err + } + + // TODO: handle snapshot restores + abandonCh := snap.AbandonCh() + ws.Add(abandonCh) + + eval, err = snap.EvalByID(ws, id) + if err != nil { + return err + } + if eval == nil { + return ErrWatchedEvalNotFound + } + + if eval.TerminalStatus() { + continue + } + + // If the latest version of the eval isn't terminal, wait for an update + if err = ws.WatchCtx(ctx); err != nil { + return err + } + + // The watch channel will be closed, we should delete it to + // prevent immediately firing on the next WatchCtx + for k := range ws { + delete(ws, k) + } + } + + return nil +} diff --git a/nomad/queues/batch_job_queue_test.go b/nomad/queues/batch_job_queue_test.go new file mode 100644 index 00000000000..6f139419be2 --- /dev/null +++ b/nomad/queues/batch_job_queue_test.go @@ -0,0 +1,133 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import ( + "fmt" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" +) + +func TestWaitForPlacement(t *testing.T) { + + t.Run("returns if eval complete", func(t *testing.T) { + ss := state.TestStateStore(t) + testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + + testEval := mock.Eval() + ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval}) + + ws := memdb.NewWatchSet() + doneCh := make(chan error) + go func() { + err := testQueue.waitForPlacement(t.Context(), testEval, ws) + doneCh <- err + }() + + testEval.Status = structs.EvalStatusComplete + ss.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{testEval}) + + done := <-doneCh + + must.NoError(t, done) + }) + + t.Run("continues watching blocked evals", func(t *testing.T) { + ss := state.TestStateStore(t) + testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + + testEval := mock.Eval() + blocked := mock.Eval() + + testEval.Status = structs.EvalStatusComplete + testEval.BlockedEval = blocked.ID + + ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval, blocked}) + + ws := memdb.NewWatchSet() + doneCh := make(chan error) + go func() { + err := testQueue.waitForPlacement(t.Context(), testEval, ws) + doneCh <- err + }() + + // We want to make sure the testQueue has begun a watch on the blocked eval + // before continuing, which is indicated by the length of the watchset being >0. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if len(ws) == 0 { + return fmt.Errorf("blocking query not started yet") + } + return nil + }), + wait.Timeout(5*time.Second), + wait.Gap(100*time.Millisecond), + )) + + select { + case <-doneCh: + t.Fatal("should not have exited") + default: + } + + blocked.Status = structs.EvalStatusComplete + ss.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{blocked}) + + done := <-doneCh + must.NoError(t, done) + }) + + t.Run("continues watching next evals after eval failure", func(t *testing.T) { + ss := state.TestStateStore(t) + testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + + testEval := mock.Eval() + next := mock.Eval() + + testEval.Status = structs.EvalStatusFailed + testEval.NextEval = next.ID + + ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval, next}) + + ws := memdb.NewWatchSet() + doneCh := make(chan error) + go func() { + err := testQueue.waitForPlacement(t.Context(), testEval, ws) + doneCh <- err + }() + + // We want to make sure the testQueue has begun a watch on the blocked eval + // before continuing, which is indicated by the length of the watchset being >0. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if len(ws) == 0 { + return fmt.Errorf("blocking query not started yet") + } + return nil + }), + wait.Timeout(5*time.Second), + wait.Gap(100*time.Millisecond), + )) + + select { + case <-doneCh: + t.Fatal("should not have exited") + default: + } + + next.Status = structs.EvalStatusComplete + ss.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{next}) + + done := <-doneCh + must.NoError(t, done) + }) +} diff --git a/nomad/queues/interface.go b/nomad/queues/interface.go new file mode 100644 index 00000000000..0d2d160792e --- /dev/null +++ b/nomad/queues/interface.go @@ -0,0 +1,10 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import "github.com/hashicorp/nomad/nomad/structs" + +type Queue interface { + Enqueue(*structs.Evaluation) +} diff --git a/nomad/queues/priority_queue.go b/nomad/queues/priority_queue.go new file mode 100644 index 00000000000..fee398124af --- /dev/null +++ b/nomad/queues/priority_queue.go @@ -0,0 +1,45 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import "container/heap" + +// A WorkloadQueue implements heap.Interface and holds *Workload. +type WorkloadQueue []*Workload + +func (pq WorkloadQueue) Len() int { return len(pq) } + +func (pq WorkloadQueue) Less(i, j int) bool { + // We want Pop to give us the highest, not lowest, priority so we use greater than here. + return pq[i].priority > pq[j].priority +} + +func (pq WorkloadQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *WorkloadQueue) Push(x any) { + n := len(*pq) + item := x.(*Workload) + item.index = n + *pq = append(*pq, item) +} + +func (pq *WorkloadQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // don't stop the GC from reclaiming the item eventually + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// update modifies the priority and value of an Item in the queue. +func (pq *WorkloadQueue) update(item *Workload, priority int) { + item.priority = priority + heap.Fix(pq, item.index) +} From ccef7c2fe874c27dbed5e2ca68349bf7d6d0eb4b Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Thu, 21 May 2026 17:07:31 -0400 Subject: [PATCH 04/14] batch queue: add queue initialization to server (#27953) --- nomad/fsm.go | 12 +++- nomad/leader.go | 4 ++ ...job_queue.go => dynamic_priority_queue.go} | 68 ++++++++++++------- ...test.go => dynamic_priority_queue_test.go} | 9 ++- nomad/queues/interface.go | 14 +++- nomad/queues/passthrough_queue.go | 28 ++++++++ nomad/queues/queue.go | 22 ++++++ nomad/server.go | 24 +++++++ nomad/structs/operator.go | 46 ++++++------- nomad/structs/operator_test.go | 2 +- 10 files changed, 176 insertions(+), 53 deletions(-) rename nomad/queues/{batch_job_queue.go => dynamic_priority_queue.go} (85%) rename nomad/queues/{batch_job_queue_test.go => dynamic_priority_queue_test.go} (86%) create mode 100644 nomad/queues/passthrough_queue.go create mode 100644 nomad/queues/queue.go diff --git a/nomad/fsm.go b/nomad/fsm.go index 9256a8c8868..9fff5e531d9 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/go-msgpack/v2/codec" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/queues" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -132,6 +133,7 @@ type SnapshotRestorers map[SnapshotType]SnapshotRestorer // this outside the Server to avoid exposing this outside the package. type nomadFSM struct { evalBroker *EvalBroker + batchQueue queues.Queue blockedEvals *BlockedEvals periodicDispatcher *PeriodicDispatch encrypter *Encrypter @@ -170,6 +172,9 @@ type FSMConfig struct { // EvalBroker is the evaluation broker evaluations should be added to EvalBroker *EvalBroker + // BatchQueue is the configured queue for batch job registrations + BatchQueue queues.Queue + // Periodic is the periodic job dispatcher that periodic jobs should be // added/removed from Periodic *PeriodicDispatch @@ -215,6 +220,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { fsm := &nomadFSM{ evalBroker: config.EvalBroker, + batchQueue: config.BatchQueue, periodicDispatcher: config.Periodic, blockedEvals: config.Blocked, encrypter: config.Encrypter, @@ -972,7 +978,11 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) { } if eval.ShouldEnqueue() { - n.evalBroker.Enqueue(eval) + if eval.Type == structs.JobTypeBatch && eval.TriggeredBy == structs.EvalTriggerJobRegister { + n.batchQueue.Enqueue(eval) + } else { + n.evalBroker.Enqueue(eval) + } } else if eval.ShouldBlock() { n.blockedEvals.Block(eval) } else if eval.Status == structs.EvalStatusComplete && diff --git a/nomad/leader.go b/nomad/leader.go index 26a5f1d9521..57c8bf6684c 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -409,6 +409,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // the operator. restoreEvals := s.handleEvalBrokerStateChange(schedulerConfig) + s.batchJobQueue.SetEnabled(true, s.State()) + // Enable the deployment watcher, since we are now the leader s.deploymentWatcher.SetEnabled(true, s.State()) @@ -1447,6 +1449,8 @@ func (s *Server) revokeLeadership() error { // Disable the deployment watcher as it is only useful as a leader. s.deploymentWatcher.SetEnabled(false, nil) + s.batchJobQueue.SetEnabled(false, nil) + // Disable the node drainer s.nodeDrainer.SetEnabled(false, nil) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/dynamic_priority_queue.go similarity index 85% rename from nomad/queues/batch_job_queue.go rename to nomad/queues/dynamic_priority_queue.go index 62e2a2cb4a8..76b67dcd40e 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/dynamic_priority_queue.go @@ -8,6 +8,7 @@ import ( "context" "errors" "sync" + "sync/atomic" "time" "github.com/hashicorp/go-hclog" @@ -24,7 +25,7 @@ type DynamicPriorityQueue struct { // tenants is used to keep track of cluster usage for this queue. // When workloads are placed or the configured interval is passed, // cluster usage is updated for the workloads of each tenant. - tenants map[TenantID]Tenant + tenants map[TenantID]*Tenant // queue is the main datastructure that contains all pending workloads // @@ -49,12 +50,20 @@ type DynamicPriorityQueue struct { // totalUsage is the sum of all tenant usages totalUsage int + tenantType structs.BatchQueueTenant + + metadataKey string + // conf contains user configurations for tuning the behavior of the queue - conf *DynamicPriorityConfig + conf *structs.DynamicQueueConfig // evalBroker is the injected broker for passing an evaluation // on to be scheduled by Nomad - evalBroker Queue + evalBroker Broker + + // enabled tracks whether the server running the batch job queue is the leader + // so should process evaluations + enabled atomic.Bool // state is the in-memory state store used for both reconciling tenant // workload usages, and polling submitted evaluations for placement @@ -62,12 +71,6 @@ type DynamicPriorityQueue struct { logger hclog.Logger } -type DynamicPriorityConfig struct { - TenantType string - MetadataKey string - CalcInterval time.Duration -} - type Tenant struct { tid TenantID workloads map[string]*Workload @@ -87,25 +90,33 @@ func (w *Workload) calculatePriority(_ int64) { // unimplemented } -func NewDynamicPriorityQueue(state *state.StateStore, broker Queue, conf *DynamicPriorityConfig, logger hclog.Logger) *DynamicPriorityQueue { +func NewDynamicPriorityQueue(broker Broker, qconf *structs.BatchQueue, conf *structs.DynamicQueueConfig, logger hclog.Logger) *DynamicPriorityQueue { return &DynamicPriorityQueue{ - tenants: map[TenantID]Tenant{}, - queue: WorkloadQueue{}, - state: state, - enqueueCh: make(chan *Workload, 8096), - evalBroker: broker, - qMux: sync.Mutex{}, - qNotify: make(chan struct{}, 1), - conf: conf, - logger: logger.Named("Dynamic Priority Queue"), + tenants: make(map[TenantID]*Tenant), + queue: WorkloadQueue{}, + enqueueCh: make(chan *Workload, 8192), + evalBroker: broker, + qMux: sync.Mutex{}, + qNotify: make(chan struct{}, 1), + tenantType: qconf.TenantType, + metadataKey: qconf.MetadataKey, + conf: conf, + logger: logger.Named("Dynamic Priority Queue"), + totalUsage: 0, } } -func (d *DynamicPriorityQueue) Start(ctx context.Context) { - // rebuild internal state from statestore, unimplemented - +func (d *DynamicPriorityQueue) Start(ctx context.Context) error { go d.runProducer(ctx) go d.runConsumer(ctx) + + return nil +} + +func (d *DynamicPriorityQueue) SetEnabled(val bool, state *state.StateStore) { + // rebuild internal state from statestore, unimplemented + d.state = state + d.enabled.Store(val) } // Enqueue is the method used to put evaluations on the queue. @@ -113,7 +124,12 @@ func (d *DynamicPriorityQueue) Start(ctx context.Context) { // to an internal channel to be processed and added to the actual // heap container. func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation) { + if !d.enabled.Load() { + return + } + w := d.generateWorkload(e) + // in the event of an empty workload, just pass eval to eval broker if w == nil { d.evalBroker.Enqueue(e) @@ -143,6 +159,10 @@ func (d *DynamicPriorityQueue) runProducer(ctx context.Context) { default: } case <-time.After(d.conf.CalcInterval): + if !d.enabled.Load() { + continue + } + d.qMux.Lock() d.calculatePriorities(time.Now().UnixNano()) heap.Init(&d.queue) @@ -199,11 +219,11 @@ func (d *DynamicPriorityQueue) generateWorkload(e *structs.Evaluation) *Workload } tid := "" - switch d.conf.TenantType { + switch d.tenantType { case "namespace": tid = job.Namespace case "metadata": - tenantID, ok := job.Meta[d.conf.MetadataKey] + tenantID, ok := job.Meta[d.metadataKey] if !ok { return nil } diff --git a/nomad/queues/batch_job_queue_test.go b/nomad/queues/dynamic_priority_queue_test.go similarity index 86% rename from nomad/queues/batch_job_queue_test.go rename to nomad/queues/dynamic_priority_queue_test.go index 6f139419be2..a1e41d08fa9 100644 --- a/nomad/queues/batch_job_queue_test.go +++ b/nomad/queues/dynamic_priority_queue_test.go @@ -21,7 +21,8 @@ func TestWaitForPlacement(t *testing.T) { t.Run("returns if eval complete", func(t *testing.T) { ss := state.TestStateStore(t) - testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + testQueue := NewDynamicPriorityQueue(nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions)) + testQueue.SetEnabled(true, ss) testEval := mock.Eval() ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval}) @@ -43,7 +44,8 @@ func TestWaitForPlacement(t *testing.T) { t.Run("continues watching blocked evals", func(t *testing.T) { ss := state.TestStateStore(t) - testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + testQueue := NewDynamicPriorityQueue(nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions)) + testQueue.SetEnabled(true, ss) testEval := mock.Eval() blocked := mock.Eval() @@ -88,7 +90,8 @@ func TestWaitForPlacement(t *testing.T) { t.Run("continues watching next evals after eval failure", func(t *testing.T) { ss := state.TestStateStore(t) - testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + testQueue := NewDynamicPriorityQueue(nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions)) + testQueue.SetEnabled(true, ss) testEval := mock.Eval() next := mock.Eval() diff --git a/nomad/queues/interface.go b/nomad/queues/interface.go index 0d2d160792e..41da0dcb577 100644 --- a/nomad/queues/interface.go +++ b/nomad/queues/interface.go @@ -3,8 +3,20 @@ package queues -import "github.com/hashicorp/nomad/nomad/structs" +import ( + "context" + + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) type Queue interface { Enqueue(*structs.Evaluation) + Start(context.Context) error + SetEnabled(bool, *state.StateStore) +} + +// Broker is the interface for an evaluation broker +type Broker interface { + Enqueue(*structs.Evaluation) } diff --git a/nomad/queues/passthrough_queue.go b/nomad/queues/passthrough_queue.go new file mode 100644 index 00000000000..d754892bd16 --- /dev/null +++ b/nomad/queues/passthrough_queue.go @@ -0,0 +1,28 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import ( + "context" + + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +type PassthroughQueue struct { + broker Broker +} + +func NewPassthroughQueue(b Broker) *PassthroughQueue { + return &PassthroughQueue{ + broker: b, + } +} + +// Start is a noop for the passthrough implementation +func (p *PassthroughQueue) Start(context.Context) error { return nil } + +func (p *PassthroughQueue) Enqueue(e *structs.Evaluation) { p.broker.Enqueue(e) } + +func (p *PassthroughQueue) SetEnabled(bool, *state.StateStore) {} diff --git a/nomad/queues/queue.go b/nomad/queues/queue.go new file mode 100644 index 00000000000..2eeea6e3661 --- /dev/null +++ b/nomad/queues/queue.go @@ -0,0 +1,22 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import ( + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/structs" +) + +func NewQueue(sconf *structs.BatchQueue, broker Broker, logger hclog.Logger) (Queue, error) { + switch sconf.Type { + case structs.BatchQueueTypeDynamic: + qconf := &structs.DynamicQueueConfig{} + if err := structs.DecodeBatchQueueConf(sconf.Config, qconf); err != nil { + return nil, err + } + return NewDynamicPriorityQueue(broker, sconf, qconf, logger), nil + default: + return NewPassthroughQueue(broker), nil + } +} diff --git a/nomad/server.go b/nomad/server.go index 987efee78b6..d37e1d6c202 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -47,6 +47,7 @@ import ( "github.com/hashicorp/nomad/nomad/drainer" "github.com/hashicorp/nomad/nomad/lock" "github.com/hashicorp/nomad/nomad/peers" + "github.com/hashicorp/nomad/nomad/queues" "github.com/hashicorp/nomad/nomad/reporting" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -214,6 +215,10 @@ type Server struct { // that are waiting to be brokered to a sub-scheduler evalBroker *EvalBroker + // batchJobQueue is the interface for enqueuing job + // register evaluations on a queue implementation + batchJobQueue queues.Queue + // brokerLock is used to synchronise the alteration of the blockedEvals and // evalBroker enabled state. These two subsystems change state when // leadership changes or when the user modifies the setting via the @@ -482,6 +487,18 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc Encrypter: s.encrypter, }) + // Creates the batch job queue + s.batchJobQueue, err = queues.NewQueue( + &s.config.DefaultSchedulerConfig.BatchQueue, + evalBroker, + logger, + ) + if err != nil { + s.Shutdown() + s.logger.Error("failed to create batch job queue", "error", err) + return nil, fmt.Errorf("Failed to create batch jo queue: %v", err) + } + // Initialize the Raft server if err := s.setupRaft(); err != nil { s.Shutdown() @@ -489,6 +506,12 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc return nil, fmt.Errorf("Failed to start Raft: %v", err) } + if err := s.batchJobQueue.Start(s.shutdownCtx); err != nil { + s.Shutdown() + s.logger.Error("failed to start batch job queue", "error", err) + return nil, fmt.Errorf("Failed to start batcj job queue: %v", err) + } + // Initialize the wan Serf s.serf, err = s.setupSerf(config.SerfConfig, s.eventCh, serfSnapshot) if err != nil { @@ -1356,6 +1379,7 @@ func (s *Server) setupRaft() error { // Create the FSM fsmConfig := &FSMConfig{ EvalBroker: s.evalBroker, + BatchQueue: s.batchJobQueue, Periodic: s.periodicDispatcher, Blocked: s.blockedEvals, Encrypter: s.encrypter, diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index c71825ddbc8..7ea077e2764 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -9,6 +9,7 @@ import ( "net/netip" "time" + "github.com/go-viper/mapstructure/v2" "github.com/hashicorp/go-uuid" "github.com/hashicorp/raft" ) @@ -365,7 +366,7 @@ type ( ) const ( - BatchQueueTypeDynamic BatchQueueType = "dynamicPriorty" + BatchQueueTypeDynamic BatchQueueType = "dynamicPriority" TenantTypeMetadata BatchQueueTenant = "metadata" TenantTypeNamespace BatchQueueTenant = "namespace" @@ -382,23 +383,25 @@ type BatchQueue struct { } type DynamicQueueConfig struct { - CalcInterval time.Duration - MaxAge time.Duration - MaxSize int - AgeWeight int - UsageWeight int - SizeWeight int -} - -func validateDuration(val any) error { - switch t := val.(type) { - case string: - if _, err := time.ParseDuration(t); err != nil { - return err - } - case int, nil: - default: - return fmt.Errorf("value not a duration: %v", val) + CalcInterval time.Duration `mapstructure:"calc_interval"` + MaxAge time.Duration `mapstructure:"max_age"` + MaxSize int `mapstructure:"max_size"` + AgeWeight int `mapstructure:"age_weight"` + UsageWeight int `mapstructure:"usage_weight"` + SizeWeight int `mapstructure:"size_weight"` +} + +func DecodeBatchQueueConf[T any](in map[string]any, out *T) error { + decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + Result: out, + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + }) + if err != nil { + return fmt.Errorf("unable to create config decoder, %w", err) + } + + if err := decoder.Decode(in); err != nil { + return fmt.Errorf("unable to decode config, %w", err) } return nil @@ -416,11 +419,8 @@ func (b *BatchQueue) Validate() error { switch b.Type { case BatchQueueTypeDynamic: - if err := validateDuration(b.Config[DynamicCalcInterval]); err != nil { - return fmt.Errorf("failed to parse calc_interval: %v", err) - } - if err := validateDuration(b.Config[DynamicMaxAge]); err != nil { - return fmt.Errorf("failed to parse max_age: %v", err) + if err := DecodeBatchQueueConf(b.Config, &DynamicQueueConfig{}); err != nil { + return err } default: return fmt.Errorf("unsupported batch queue type: %q", b.Type) diff --git a/nomad/structs/operator_test.go b/nomad/structs/operator_test.go index 371a37bc8c2..c52b9f8dbe6 100644 --- a/nomad/structs/operator_test.go +++ b/nomad/structs/operator_test.go @@ -229,7 +229,7 @@ func TestBatchQueue_Validate(t *testing.T) { "calc_interval": "hello", }, }, - err: "failed to parse", + err: "unable to decode conf", }, { name: "dynamicPriority - valid string interval", From 823445bd8c42ba594d00c81b6e9b9d8ca962b3b4 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 2 Jun 2026 12:59:52 -0400 Subject: [PATCH 05/14] bjq: add operator set-config ability and ability to manage multiple queues --- command/agent/operator_endpoint.go | 6 + command/operator_scheduler_set_config.go | 23 +++ nomad/fsm.go | 10 +- nomad/leader.go | 4 +- nomad/operator_endpoint.go | 2 + nomad/queues/batch_queue_manager.go | 151 +++++++++++++++++ nomad/queues/batch_queue_manager_test.go | 170 ++++++++++++++++++++ nomad/queues/dynamic_priority_queue.go | 32 ++-- nomad/queues/dynamic_priority_queue_test.go | 9 +- nomad/queues/interface.go | 3 +- nomad/queues/passthrough_queue.go | 2 + nomad/queues/queue.go | 5 +- nomad/server.go | 23 +-- 13 files changed, 386 insertions(+), 54 deletions(-) create mode 100644 nomad/queues/batch_queue_manager.go create mode 100644 nomad/queues/batch_queue_manager_test.go diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index eb0ca537919..c3908447b78 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -318,6 +318,12 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled, ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled, }, + BatchQueue: structs.BatchQueue{ + Type: structs.BatchQueueType(conf.BatchQueue.Type), + TenantType: structs.BatchQueueTenant(conf.BatchQueue.TenantType), + MetadataKey: conf.BatchQueue.MetadataKey, + Config: conf.BatchQueue.Config, + }, } if err := args.Config.Validate(); err != nil { diff --git a/command/operator_scheduler_set_config.go b/command/operator_scheduler_set_config.go index d7e3665356d..b0385b58956 100644 --- a/command/operator_scheduler_set_config.go +++ b/command/operator_scheduler_set_config.go @@ -4,7 +4,9 @@ package command import ( + "encoding/json" "fmt" + "maps" "strings" "github.com/hashicorp/cli" @@ -32,6 +34,11 @@ type OperatorSchedulerSetConfig struct { preemptSysBatchScheduler flagHelper.BoolValue preemptSystemScheduler flagHelper.BoolValue nodeLimitForFeasibilityChecks flagHelper.UintValue + + batchQueueType string + batchQueueTenantType string + batchQueueMetadataKey string + batchQueueConfig string } func (o *OperatorSchedulerSetConfig) AutocompleteFlags() complete.Flags { @@ -50,6 +57,10 @@ func (o *OperatorSchedulerSetConfig) AutocompleteFlags() complete.Flags { "-preempt-sysbatch-scheduler": complete.PredictSet("true", "false"), "-preempt-system-scheduler": complete.PredictSet("true", "false"), "-node-limit-for-feasibility-checks": complete.PredictAnything, + "-batch-queue-type": complete.PredictSet("dynamicPriority"), + "-batch-queue-tenant-type": complete.PredictSet("metadata", "namespace"), + "-batch-queue-metadata-key": complete.PredictAnything, + "-batch-queue-config": complete.PredictAnything, }, ) } @@ -76,6 +87,11 @@ func (o *OperatorSchedulerSetConfig) Run(args []string) int { flags.Var(&o.preemptSystemScheduler, "preempt-system-scheduler", "") flags.Var(&o.nodeLimitForFeasibilityChecks, "node-limit-for-feasibility-checks", "") + flags.StringVar(&o.batchQueueType, "batch-queue-type", "", "") + flags.StringVar(&o.batchQueueTenantType, "batch-queue-tenant-type", "", "") + flags.StringVar(&o.batchQueueMetadataKey, "batch-queue-metdata-key", "", "") + flags.StringVar(&o.batchQueueConfig, "batch-queue-config", "", "") + if err := flags.Parse(args); err != nil { return 1 } @@ -139,6 +155,13 @@ func (o *OperatorSchedulerSetConfig) Run(args []string) int { o.preemptSystemScheduler.Merge(&schedulerConfig.PreemptionConfig.SystemSchedulerEnabled) o.nodeLimitForFeasibilityChecks.Merge(&schedulerConfig.NodeLimitForFeasibilityChecks) + if o.batchQueueConfig != "" { + conf := make(map[string]any) + _ = json.Unmarshal([]byte(o.batchQueueConfig), &conf) + maps.Copy(schedulerConfig.BatchQueue.Config, conf) + } + fmt.Println(schedulerConfig.BatchQueue.Config) + // Check-and-set the new configuration. result, _, err := client.Operator().SchedulerCASConfiguration(schedulerConfig, nil) if err != nil { diff --git a/nomad/fsm.go b/nomad/fsm.go index 9fff5e531d9..649c2e763d1 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -133,7 +133,7 @@ type SnapshotRestorers map[SnapshotType]SnapshotRestorer // this outside the Server to avoid exposing this outside the package. type nomadFSM struct { evalBroker *EvalBroker - batchQueue queues.Queue + batchQueueMgr *queues.BatchQueueManager blockedEvals *BlockedEvals periodicDispatcher *PeriodicDispatch encrypter *Encrypter @@ -173,7 +173,7 @@ type FSMConfig struct { EvalBroker *EvalBroker // BatchQueue is the configured queue for batch job registrations - BatchQueue queues.Queue + BatchQueueMgr *queues.BatchQueueManager // Periodic is the periodic job dispatcher that periodic jobs should be // added/removed from @@ -220,7 +220,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { fsm := &nomadFSM{ evalBroker: config.EvalBroker, - batchQueue: config.BatchQueue, + batchQueueMgr: config.BatchQueueMgr, periodicDispatcher: config.Periodic, blockedEvals: config.Blocked, encrypter: config.Encrypter, @@ -979,7 +979,9 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) { if eval.ShouldEnqueue() { if eval.Type == structs.JobTypeBatch && eval.TriggeredBy == structs.EvalTriggerJobRegister { - n.batchQueue.Enqueue(eval) + n.batchQueueMgr.Enqueue(eval) // batchQueueMgr + + // if eval.NodePool -> pass to correct queue } else { n.evalBroker.Enqueue(eval) } diff --git a/nomad/leader.go b/nomad/leader.go index 57c8bf6684c..75c85ad232c 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -409,7 +409,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // the operator. restoreEvals := s.handleEvalBrokerStateChange(schedulerConfig) - s.batchJobQueue.SetEnabled(true, s.State()) + s.batchQueueMgr.SetEnabled(true, s.State()) // Enable the deployment watcher, since we are now the leader s.deploymentWatcher.SetEnabled(true, s.State()) @@ -1449,7 +1449,7 @@ func (s *Server) revokeLeadership() error { // Disable the deployment watcher as it is only useful as a leader. s.deploymentWatcher.SetEnabled(false, nil) - s.batchJobQueue.SetEnabled(false, nil) + s.batchQueueMgr.SetEnabled(false, nil) // Disable the node drainer s.nodeDrainer.SetEnabled(false, nil) diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index af00ca769b2..3644081c8b8 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -467,6 +467,8 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe // restore functions have protections around leadership transitions and // restoring into non-running brokers. if reply.Updated { + op.srv.batchQueueMgr.Update("", &args.Config.BatchQueue) + if op.srv.handleEvalBrokerStateChange(&args.Config) { return op.srv.restoreEvals() } diff --git a/nomad/queues/batch_queue_manager.go b/nomad/queues/batch_queue_manager.go new file mode 100644 index 00000000000..455b80e0a79 --- /dev/null +++ b/nomad/queues/batch_queue_manager.go @@ -0,0 +1,151 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import ( + "context" + "sync/atomic" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +type BatchQueueManager struct { + defaultQueue Queue + defaultConf structs.BatchQueue + nodePoolQueues map[string]Queue + broker Broker + state *state.StateStore + enabled atomic.Bool + shutdownCtx context.Context + logger hclog.Logger +} + +func NewBatchQueueMgr(ctx context.Context, defaultConf structs.BatchQueue, broker Broker, logger hclog.Logger) *BatchQueueManager { + return &BatchQueueManager{ + nodePoolQueues: make(map[string]Queue), + defaultConf: defaultConf, + broker: broker, + shutdownCtx: ctx, + logger: logger, + } +} + +// Enqueue takes an evaluation and passes it to the respective queue. +func (b *BatchQueueManager) Enqueue(e *structs.Evaluation) { + if !b.enabled.Load() { + return + } + + // This shouldn't happen, but in the event we enqueue + // an eval before setting state, just pass it to the broker + if b.state == nil { + b.broker.Enqueue(e) + return + } + + job, err := b.state.JobByID(nil, e.Namespace, e.JobID) + if err != nil { + b.logger.Error("batch queue failed to lookup job for eval", "evalID", e.ID, "err", err) + b.broker.Enqueue(e) + return + } + + // if a node pool has a specific batch queue configuration, use that, + // otherwise use the scheduler config queue. + if queue, ok := b.nodePoolQueues[job.NodePool]; !ok { + b.defaultQueue.Enqueue(e) + } else { + queue.Enqueue(e) + } +} + +func (b *BatchQueueManager) SetEnabled(enabled bool, state *state.StateStore) { + if enabled { + if b.state == nil { + b.state = state + } + b.createQueues() + } else { + // stop default queue and any node pool queues + b.defaultQueue.Stop() + for _, q := range b.nodePoolQueues { + q.Stop() + } + } + + b.enabled.Store(enabled) +} + +// createQueues should create all queues for the server. It should always create a queue +// for the default scheduler config, but only create queues for node pool scheduler configs +// that have a valid batch queue configuration. This allows easy fallback to the default queue. +func (b *BatchQueueManager) createQueues() { + defaultConf := b.defaultConf + _, conf, err := b.state.SchedulerConfig() + + if conf != nil { + defaultConf = conf.BatchQueue + } + + b.defaultQueue, err = NewQueue(b.state, &defaultConf, b.broker, b.logger) + if err != nil { + b.logger.Error("failed to create default batch queue", "err", err) + return + } + b.defaultQueue.Start(b.shutdownCtx) + + nodePoolIter, err := b.state.NodePools(nil, state.SortDefault) + + for { + raw := nodePoolIter.Next() + if raw == nil { + break + } + np := raw.(*structs.NodePool) + + conf := structs.BatchQueue{} + if np.SchedulerConfiguration != nil { + conf = np.SchedulerConfiguration.BatchQueue + } + + // Do not create queue for empty node pool scheduler configs + if conf.Type == "" { + continue + } + + queue, err := NewQueue(b.state, &conf, b.broker, b.logger) + if err != nil { + b.logger.Error("failed to create node pool queue", "err", err) + return + } + + b.nodePoolQueues[np.Name] = queue + queue.Start(b.shutdownCtx) + } +} + +// Update is used to either update the default queue or a specific node pools queue. +func (b *BatchQueueManager) Update(nodePool string, conf *structs.BatchQueue) { + if !b.enabled.Load() { + return + } + + queue, err := NewQueue(b.state, conf, b.broker, b.logger) + if err != nil { + b.logger.Error("failed to update batch queue", "err", err) + return + } + + if nodePool == "" { + b.defaultQueue.Stop() + b.defaultQueue = queue + } else { + b.nodePoolQueues[nodePool].Stop() + b.nodePoolQueues[nodePool] = queue + } + + queue.Start(b.shutdownCtx) +} diff --git a/nomad/queues/batch_queue_manager_test.go b/nomad/queues/batch_queue_manager_test.go new file mode 100644 index 00000000000..66dd702295d --- /dev/null +++ b/nomad/queues/batch_queue_manager_test.go @@ -0,0 +1,170 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import ( + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" + tmock "github.com/stretchr/testify/mock" +) + +type MockBroker struct { + tmock.Mock +} + +func (m *MockBroker) Enqueue(e *structs.Evaluation) { + m.Called(e) +} + +func TestBatchQueueManager_Enqueue(t *testing.T) { + t.Run("does not enqueue if not enabled", func(t *testing.T) { + // Test will fail if an eval is given to the mock broker + mgr := NewBatchQueueMgr(t.Context(), structs.BatchQueue{}, &MockBroker{}, hclog.Default()) + mgr.enabled.Store(false) + mgr.Enqueue(&structs.Evaluation{}) + }) + + t.Run("enqueues on node pool queue if available", func(t *testing.T) { + mockBroker := &MockBroker{} + mockBroker.On("Enqueue", tmock.Anything).Return() + mgr := NewBatchQueueMgr(t.Context(), structs.BatchQueue{}, mockBroker, hclog.Default()) + + mgr.enabled.Store(true) + ss := state.TestStateStore(t) + mockJob := mock.Job() + must.NoError(t, ss.UpsertJob(structs.MsgTypeTestSetup, 1, nil, mockJob)) + mgr.state = ss + + mgr.nodePoolQueues[mockJob.NodePool] = NewPassthroughQueue(mockBroker) + // give a not setup mock to the defaultQueue so test will fail if an eval + // is given to this queue + mgr.defaultQueue = NewPassthroughQueue(&MockBroker{}) + + mockEval := &structs.Evaluation{ + JobID: mockJob.ID, + Namespace: mockJob.Namespace, + } + + mgr.Enqueue(mockEval) + must.Eq(t, len(mockBroker.Calls), 1) + }) + + t.Run("enqueues on default queue if no node pool queue", func(t *testing.T) { + mockBroker := &MockBroker{} + mockBroker.On("Enqueue", tmock.Anything).Return() + mgr := NewBatchQueueMgr(t.Context(), structs.BatchQueue{}, mockBroker, hclog.Default()) + + mgr.enabled.Store(true) + ss := state.TestStateStore(t) + mockJob := mock.Job() + must.NoError(t, ss.UpsertJob(structs.MsgTypeTestSetup, 1, nil, mockJob)) + mgr.state = ss + + // only setup the default queue, test will fail if it tries to add + // to a specific nodePool queue + mgr.defaultQueue = NewPassthroughQueue(mockBroker) + + mockEval := &structs.Evaluation{ + JobID: mockJob.ID, + Namespace: mockJob.Namespace, + } + + mgr.Enqueue(mockEval) + must.Eq(t, 1, len(mockBroker.Calls)) + }) +} + +func TestBatchQueueManager_SetEnabled(t *testing.T) { + t.Run("creates queues when enabled", func(t *testing.T) { + mockBroker := &MockBroker{} + mgr := NewBatchQueueMgr(t.Context(), structs.BatchQueue{}, mockBroker, hclog.Default()) + ss := state.TestStateStore(t) + ss.UpsertNodePools(structs.MsgTypeTestSetup, 1, []*structs.NodePool{ + { + Name: "test", + SchedulerConfiguration: &structs.NodePoolSchedulerConfiguration{ + BatchQueue: structs.BatchQueue{ + Type: "test", + }, + }, + }, + }) + + mgr.SetEnabled(true, ss) + + must.NotNil(t, mgr.defaultQueue) + must.Eq(t, 1, len(mgr.nodePoolQueues)) + }) + + t.Run("stops queues when disabled", func(t *testing.T) { + mockBroker := &MockBroker{} + mgr := NewBatchQueueMgr(t.Context(), structs.BatchQueue{}, mockBroker, hclog.Default()) + stopCh1 := make(chan struct{}) + stopCh2 := make(chan struct{}) + // DynamicPriorityQueue closes the stop chan on Stop(), so we can use it to assert stop is called + mgr.defaultQueue = &DynamicPriorityQueue{stopCh: stopCh1} + mgr.nodePoolQueues["test"] = &DynamicPriorityQueue{stopCh: stopCh2} + + mgr.SetEnabled(false, nil) + + select { + case <-stopCh1: + case <-time.After(50 * time.Millisecond): + t.FailNow() + } + + select { + case <-stopCh2: + case <-time.After(50 * time.Millisecond): + t.FailNow() + } + }) +} + +func TestBatchQueueManager_Update(t *testing.T) { + t.Run("updates default queue given empty node pool", func(t *testing.T) { + mockBroker := &MockBroker{} + mgr := NewBatchQueueMgr(t.Context(), structs.BatchQueue{}, mockBroker, hclog.Default()) + stopCh1 := make(chan struct{}) + mgr.defaultQueue = &DynamicPriorityQueue{stopCh: stopCh1} + before := mgr.defaultQueue + mgr.Update("", &structs.BatchQueue{}) + after := mgr.defaultQueue + + must.EqOp(t, before, after) // not enabled so should skip update + + mgr.enabled.Store(true) // set enabled so update happens + before = mgr.defaultQueue + mgr.Update("", &structs.BatchQueue{}) + after = mgr.defaultQueue + + must.NotEqOp(t, before, after) + }) + + t.Run("updates specific queue given node pool", func(t *testing.T) { + mockBroker := &MockBroker{} + mgr := NewBatchQueueMgr(t.Context(), structs.BatchQueue{}, mockBroker, hclog.Default()) + stopCh1 := make(chan struct{}) + mgr.nodePoolQueues["test"] = &DynamicPriorityQueue{stopCh: stopCh1} + + before := mgr.nodePoolQueues["test"] + mgr.Update("test", &structs.BatchQueue{}) + after := mgr.nodePoolQueues["test"] + + must.EqOp(t, before, after) // not enabled so should skip update + + mgr.enabled.Store(true) // set enabled so update happens + before = mgr.nodePoolQueues["test"] + mgr.Update("test", &structs.BatchQueue{}) + after = mgr.nodePoolQueues["test"] + + must.NotEqOp(t, before, after) + }) +} diff --git a/nomad/queues/dynamic_priority_queue.go b/nomad/queues/dynamic_priority_queue.go index 76b67dcd40e..9e18cdb83ab 100644 --- a/nomad/queues/dynamic_priority_queue.go +++ b/nomad/queues/dynamic_priority_queue.go @@ -8,7 +8,6 @@ import ( "context" "errors" "sync" - "sync/atomic" "time" "github.com/hashicorp/go-hclog" @@ -61,14 +60,13 @@ type DynamicPriorityQueue struct { // on to be scheduled by Nomad evalBroker Broker - // enabled tracks whether the server running the batch job queue is the leader - // so should process evaluations - enabled atomic.Bool - // state is the in-memory state store used for both reconciling tenant // workload usages, and polling submitted evaluations for placement - state *state.StateStore + state *state.StateStore + logger hclog.Logger + + stopCh chan struct{} } type Tenant struct { @@ -90,7 +88,7 @@ func (w *Workload) calculatePriority(_ int64) { // unimplemented } -func NewDynamicPriorityQueue(broker Broker, qconf *structs.BatchQueue, conf *structs.DynamicQueueConfig, logger hclog.Logger) *DynamicPriorityQueue { +func NewDynamicPriorityQueue(state *state.StateStore, broker Broker, qconf *structs.BatchQueue, conf *structs.DynamicQueueConfig, logger hclog.Logger) *DynamicPriorityQueue { return &DynamicPriorityQueue{ tenants: make(map[TenantID]*Tenant), queue: WorkloadQueue{}, @@ -101,8 +99,10 @@ func NewDynamicPriorityQueue(broker Broker, qconf *structs.BatchQueue, conf *str tenantType: qconf.TenantType, metadataKey: qconf.MetadataKey, conf: conf, + state: state, logger: logger.Named("Dynamic Priority Queue"), totalUsage: 0, + stopCh: make(chan struct{}), } } @@ -113,10 +113,8 @@ func (d *DynamicPriorityQueue) Start(ctx context.Context) error { return nil } -func (d *DynamicPriorityQueue) SetEnabled(val bool, state *state.StateStore) { - // rebuild internal state from statestore, unimplemented - d.state = state - d.enabled.Store(val) +func (d *DynamicPriorityQueue) Stop() { + close(d.stopCh) } // Enqueue is the method used to put evaluations on the queue. @@ -124,10 +122,6 @@ func (d *DynamicPriorityQueue) SetEnabled(val bool, state *state.StateStore) { // to an internal channel to be processed and added to the actual // heap container. func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation) { - if !d.enabled.Load() { - return - } - w := d.generateWorkload(e) // in the event of an empty workload, just pass eval to eval broker @@ -146,6 +140,8 @@ func (d *DynamicPriorityQueue) runProducer(ctx context.Context) { select { case <-ctx.Done(): return + case <-d.stopCh: + return case w := <-d.enqueueCh: w.calculatePriority(w.eval.CreateTime) @@ -159,10 +155,6 @@ func (d *DynamicPriorityQueue) runProducer(ctx context.Context) { default: } case <-time.After(d.conf.CalcInterval): - if !d.enabled.Load() { - continue - } - d.qMux.Lock() d.calculatePriorities(time.Now().UnixNano()) heap.Init(&d.queue) @@ -179,6 +171,8 @@ func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { select { case <-ctx.Done(): return + case <-d.stopCh: + return case <-d.qNotify: // Pop a workload off the queue if available diff --git a/nomad/queues/dynamic_priority_queue_test.go b/nomad/queues/dynamic_priority_queue_test.go index a1e41d08fa9..885de40f887 100644 --- a/nomad/queues/dynamic_priority_queue_test.go +++ b/nomad/queues/dynamic_priority_queue_test.go @@ -21,8 +21,7 @@ func TestWaitForPlacement(t *testing.T) { t.Run("returns if eval complete", func(t *testing.T) { ss := state.TestStateStore(t) - testQueue := NewDynamicPriorityQueue(nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions)) - testQueue.SetEnabled(true, ss) + testQueue := NewDynamicPriorityQueue(ss, nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions)) testEval := mock.Eval() ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval}) @@ -44,8 +43,7 @@ func TestWaitForPlacement(t *testing.T) { t.Run("continues watching blocked evals", func(t *testing.T) { ss := state.TestStateStore(t) - testQueue := NewDynamicPriorityQueue(nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions)) - testQueue.SetEnabled(true, ss) + testQueue := NewDynamicPriorityQueue(ss, nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions)) testEval := mock.Eval() blocked := mock.Eval() @@ -90,8 +88,7 @@ func TestWaitForPlacement(t *testing.T) { t.Run("continues watching next evals after eval failure", func(t *testing.T) { ss := state.TestStateStore(t) - testQueue := NewDynamicPriorityQueue(nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions)) - testQueue.SetEnabled(true, ss) + testQueue := NewDynamicPriorityQueue(ss, nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions)) testEval := mock.Eval() next := mock.Eval() diff --git a/nomad/queues/interface.go b/nomad/queues/interface.go index 41da0dcb577..db63a0d397a 100644 --- a/nomad/queues/interface.go +++ b/nomad/queues/interface.go @@ -6,14 +6,13 @@ package queues import ( "context" - "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) type Queue interface { Enqueue(*structs.Evaluation) Start(context.Context) error - SetEnabled(bool, *state.StateStore) + Stop() } // Broker is the interface for an evaluation broker diff --git a/nomad/queues/passthrough_queue.go b/nomad/queues/passthrough_queue.go index d754892bd16..aca149b8f18 100644 --- a/nomad/queues/passthrough_queue.go +++ b/nomad/queues/passthrough_queue.go @@ -23,6 +23,8 @@ func NewPassthroughQueue(b Broker) *PassthroughQueue { // Start is a noop for the passthrough implementation func (p *PassthroughQueue) Start(context.Context) error { return nil } +func (p *PassthroughQueue) Stop() {} + func (p *PassthroughQueue) Enqueue(e *structs.Evaluation) { p.broker.Enqueue(e) } func (p *PassthroughQueue) SetEnabled(bool, *state.StateStore) {} diff --git a/nomad/queues/queue.go b/nomad/queues/queue.go index 2eeea6e3661..be0be9b9d3b 100644 --- a/nomad/queues/queue.go +++ b/nomad/queues/queue.go @@ -5,17 +5,18 @@ package queues import ( "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) -func NewQueue(sconf *structs.BatchQueue, broker Broker, logger hclog.Logger) (Queue, error) { +func NewQueue(ss *state.StateStore, sconf *structs.BatchQueue, broker Broker, logger hclog.Logger) (Queue, error) { switch sconf.Type { case structs.BatchQueueTypeDynamic: qconf := &structs.DynamicQueueConfig{} if err := structs.DecodeBatchQueueConf(sconf.Config, qconf); err != nil { return nil, err } - return NewDynamicPriorityQueue(broker, sconf, qconf, logger), nil + return NewDynamicPriorityQueue(ss, broker, sconf, qconf, logger), nil default: return NewPassthroughQueue(broker), nil } diff --git a/nomad/server.go b/nomad/server.go index d37e1d6c202..17a230b9369 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -217,7 +217,7 @@ type Server struct { // batchJobQueue is the interface for enqueuing job // register evaluations on a queue implementation - batchJobQueue queues.Queue + batchQueueMgr *queues.BatchQueueManager // brokerLock is used to synchronise the alteration of the blockedEvals and // evalBroker enabled state. These two subsystems change state when @@ -487,17 +487,8 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc Encrypter: s.encrypter, }) - // Creates the batch job queue - s.batchJobQueue, err = queues.NewQueue( - &s.config.DefaultSchedulerConfig.BatchQueue, - evalBroker, - logger, - ) - if err != nil { - s.Shutdown() - s.logger.Error("failed to create batch job queue", "error", err) - return nil, fmt.Errorf("Failed to create batch jo queue: %v", err) - } + // Creates the batch job queue manager + s.batchQueueMgr = queues.NewBatchQueueMgr(s.shutdownCtx, s.config.DefaultSchedulerConfig.BatchQueue, evalBroker, logger) // Initialize the Raft server if err := s.setupRaft(); err != nil { @@ -506,12 +497,6 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc return nil, fmt.Errorf("Failed to start Raft: %v", err) } - if err := s.batchJobQueue.Start(s.shutdownCtx); err != nil { - s.Shutdown() - s.logger.Error("failed to start batch job queue", "error", err) - return nil, fmt.Errorf("Failed to start batcj job queue: %v", err) - } - // Initialize the wan Serf s.serf, err = s.setupSerf(config.SerfConfig, s.eventCh, serfSnapshot) if err != nil { @@ -1379,7 +1364,7 @@ func (s *Server) setupRaft() error { // Create the FSM fsmConfig := &FSMConfig{ EvalBroker: s.evalBroker, - BatchQueue: s.batchJobQueue, + BatchQueueMgr: s.batchQueueMgr, Periodic: s.periodicDispatcher, Blocked: s.blockedEvals, Encrypter: s.encrypter, From 1791cd566d415f1d89105d45b10dfc580c53953e Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 2 Jun 2026 13:15:09 -0400 Subject: [PATCH 06/14] fix ineffectual assignment --- nomad/queues/batch_queue_manager.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/nomad/queues/batch_queue_manager.go b/nomad/queues/batch_queue_manager.go index 455b80e0a79..c5fd0365cbc 100644 --- a/nomad/queues/batch_queue_manager.go +++ b/nomad/queues/batch_queue_manager.go @@ -41,7 +41,7 @@ func (b *BatchQueueManager) Enqueue(e *structs.Evaluation) { // This shouldn't happen, but in the event we enqueue // an eval before setting state, just pass it to the broker - if b.state == nil { + if b.state == nil || b.defaultQueue == nil { b.broker.Enqueue(e) return } @@ -85,6 +85,10 @@ func (b *BatchQueueManager) SetEnabled(enabled bool, state *state.StateStore) { func (b *BatchQueueManager) createQueues() { defaultConf := b.defaultConf _, conf, err := b.state.SchedulerConfig() + if err != nil { + b.logger.Error("failed to get scheduler config from state, skipping queue creation", "err", err) + return + } if conf != nil { defaultConf = conf.BatchQueue @@ -98,6 +102,10 @@ func (b *BatchQueueManager) createQueues() { b.defaultQueue.Start(b.shutdownCtx) nodePoolIter, err := b.state.NodePools(nil, state.SortDefault) + if err != nil { + b.logger.Error("failed to get node pools from state, skipping node pool queue creation", "err", err) + return + } for { raw := nodePoolIter.Next() From 1ba3c4c222bc935a8021b619759cc5e5cab0c1e5 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 2 Jun 2026 14:15:42 -0400 Subject: [PATCH 07/14] remove leftover comment --- nomad/fsm.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 649c2e763d1..e1fac9c1f94 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -979,9 +979,7 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) { if eval.ShouldEnqueue() { if eval.Type == structs.JobTypeBatch && eval.TriggeredBy == structs.EvalTriggerJobRegister { - n.batchQueueMgr.Enqueue(eval) // batchQueueMgr - - // if eval.NodePool -> pass to correct queue + n.batchQueueMgr.Enqueue(eval) } else { n.evalBroker.Enqueue(eval) } From aa399a272085fbcd69d16145056286f71d8f702a Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 3 Jun 2026 11:58:07 -0400 Subject: [PATCH 08/14] more set-config updates and adds helper text --- command/operator_scheduler_set_config.go | 49 ++++++++++++++++++++---- nomad/queues/dynamic_priority_queue.go | 2 +- nomad/structs/operator.go | 1 + 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/command/operator_scheduler_set_config.go b/command/operator_scheduler_set_config.go index b0385b58956..f53f220e933 100644 --- a/command/operator_scheduler_set_config.go +++ b/command/operator_scheduler_set_config.go @@ -89,7 +89,7 @@ func (o *OperatorSchedulerSetConfig) Run(args []string) int { flags.StringVar(&o.batchQueueType, "batch-queue-type", "", "") flags.StringVar(&o.batchQueueTenantType, "batch-queue-tenant-type", "", "") - flags.StringVar(&o.batchQueueMetadataKey, "batch-queue-metdata-key", "", "") + flags.StringVar(&o.batchQueueMetadataKey, "batch-queue-metadata-key", "", "") flags.StringVar(&o.batchQueueConfig, "batch-queue-config", "", "") if err := flags.Parse(args); err != nil { @@ -155,12 +155,28 @@ func (o *OperatorSchedulerSetConfig) Run(args []string) int { o.preemptSystemScheduler.Merge(&schedulerConfig.PreemptionConfig.SystemSchedulerEnabled) o.nodeLimitForFeasibilityChecks.Merge(&schedulerConfig.NodeLimitForFeasibilityChecks) + if o.batchQueueType != "" { + schedulerConfig.BatchQueue.Type = api.BatchQueueType(o.batchQueueType) + } + if o.batchQueueTenantType != "" { + schedulerConfig.BatchQueue.TenantType = api.BatchQueueTenant(o.batchQueueTenantType) + } + if o.batchQueueMetadataKey != "" { + schedulerConfig.BatchQueue.MetadataKey = o.batchQueueMetadataKey + } if o.batchQueueConfig != "" { conf := make(map[string]any) - _ = json.Unmarshal([]byte(o.batchQueueConfig), &conf) - maps.Copy(schedulerConfig.BatchQueue.Config, conf) + err = json.Unmarshal([]byte(o.batchQueueConfig), &conf) + if err != nil { + o.Ui.Error(fmt.Sprintf("Error parsing batch queue config: %s", err)) + return 1 + } + if schedulerConfig.BatchQueue.Config != nil { + maps.Copy(schedulerConfig.BatchQueue.Config, conf) + } else { + schedulerConfig.BatchQueue.Config = conf + } } - fmt.Println(schedulerConfig.BatchQueue.Config) // Check-and-set the new configuration. result, _, err := client.Operator().SchedulerCASConfiguration(schedulerConfig, nil) @@ -238,10 +254,27 @@ Scheduler Set Config Options: -node-limit-for-feasibility-checks= Limits the number of feasible nodes to consider when scheduling a job that - specifies spread and/or affinity. Defaults to 100 nodes if unset. Lower - numbers result in better scheduler performance and more randomization of jobs - across nodes. Higher numbers result in more deterministic application of - feasibility checks. + specifies spread and/or affinity. Defaults to 100 nodes if unset. Lower + numbers result in better scheduler performance and more randomization of jobs + across nodes. Higher numbers result in more deterministic application of + feasibility checks. + + -batch-queue-type=["dynamic_priority"] + Specifies the type of batch queue to configure. Currently only the dynamic_priority + queue type is supported. + + -batch-queue-tenant-type=["namespace"|"metadata"] + Specifies the tenant type to use for the batch queue. The two types of + supported tenancy options are namespace or job metadata. + + -batch-queue-metadata-key + If metadata tenancy is specified, this option configures what key to use + in the job's metadata for the tenants unique identifier. + + -batch-queue-config + Specifies any custom configuration for the configured batch queue type. + This should be given as a JSON string which will be parsed and validated + by the Nomad Server. ` return strings.TrimSpace(helpText) } diff --git a/nomad/queues/dynamic_priority_queue.go b/nomad/queues/dynamic_priority_queue.go index 9e18cdb83ab..0e9bf381a88 100644 --- a/nomad/queues/dynamic_priority_queue.go +++ b/nomad/queues/dynamic_priority_queue.go @@ -100,7 +100,7 @@ func NewDynamicPriorityQueue(state *state.StateStore, broker Broker, qconf *stru metadataKey: qconf.MetadataKey, conf: conf, state: state, - logger: logger.Named("Dynamic Priority Queue"), + logger: logger.Named("dynamic_priority_queue"), totalUsage: 0, stopCh: make(chan struct{}), } diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index 7ea077e2764..c05c79b027a 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -395,6 +395,7 @@ func DecodeBatchQueueConf[T any](in map[string]any, out *T) error { decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ Result: out, DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + ErrorUnset: true, }) if err != nil { return fmt.Errorf("unable to create config decoder, %w", err) From e8dbf27aa346b9cc0d59ee7bd12158a423853470 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 3 Jun 2026 13:18:22 -0400 Subject: [PATCH 09/14] fix batch queue validate test --- nomad/structs/operator_test.go | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/nomad/structs/operator_test.go b/nomad/structs/operator_test.go index c52b9f8dbe6..1f8c7553b15 100644 --- a/nomad/structs/operator_test.go +++ b/nomad/structs/operator_test.go @@ -201,6 +201,14 @@ func TestBatchQueue_Validate(t *testing.T) { batchConfig: BatchQueue{ Type: BatchQueueTypeDynamic, TenantType: "foo", + Config: map[string]any{ + "calc_interval": "5m", + "max_age": "12h", + "max_size": 1000, + "age_weight": 10, + "size_weight": 10, + "usage_weight": 10, + }, }, err: "unsupported tenant type", }, @@ -217,6 +225,14 @@ func TestBatchQueue_Validate(t *testing.T) { batchConfig: BatchQueue{ Type: BatchQueueTypeDynamic, TenantType: TenantTypeMetadata, + Config: map[string]any{ + "calc_interval": "5m", + "max_age": "12h", + "max_size": 1000, + "age_weight": 10, + "size_weight": 10, + "usage_weight": 10, + }, }, err: "metadata key must be specified", }, @@ -226,7 +242,12 @@ func TestBatchQueue_Validate(t *testing.T) { Type: BatchQueueTypeDynamic, TenantType: TenantTypeNamespace, Config: map[string]any{ - "calc_interval": "hello", + "calc_interval": "hello", //invalid + "max_age": "12h", + "max_size": 1000, + "age_weight": 10, + "size_weight": 10, + "usage_weight": 10, }, }, err: "unable to decode conf", @@ -238,6 +259,11 @@ func TestBatchQueue_Validate(t *testing.T) { TenantType: TenantTypeNamespace, Config: map[string]any{ "calc_interval": "1h", + "max_age": "12h", + "max_size": 1000, + "age_weight": 10, + "size_weight": 10, + "usage_weight": 10, }, }, err: "", @@ -249,6 +275,11 @@ func TestBatchQueue_Validate(t *testing.T) { TenantType: TenantTypeNamespace, Config: map[string]any{ "calc_interval": 1000, + "max_age": "12h", + "max_size": 1000, + "age_weight": 10, + "size_weight": 10, + "usage_weight": 10, }, }, err: "", From c00de12e8697e6d14f07acf9c9f217a4563d03aa Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Thu, 4 Jun 2026 16:14:43 -0400 Subject: [PATCH 10/14] update interfacet to add job to enqueue --- nomad/leader.go | 6 +++++- nomad/queues/batch_queue_manager.go | 4 ++-- nomad/queues/dynamic_priority_queue.go | 13 ++++--------- nomad/queues/interface.go | 2 +- nomad/queues/passthrough_queue.go | 2 +- 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index 75c85ad232c..1ac065ca76c 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -832,7 +832,11 @@ func (s *Server) restoreEvals() error { eval := raw.(*structs.Evaluation) if eval.ShouldEnqueue() { - s.evalBroker.Restore(eval) + if eval.Type == structs.JobTypeBatch && eval.TriggeredBy == structs.EvalTriggerJobRegister { + s.batchQueueMgr.Enqueue(eval) + } else { + s.evalBroker.Enqueue(eval) + } } else if eval.ShouldBlock() { s.blockedEvals.Block(eval) } diff --git a/nomad/queues/batch_queue_manager.go b/nomad/queues/batch_queue_manager.go index c5fd0365cbc..854ee000b83 100644 --- a/nomad/queues/batch_queue_manager.go +++ b/nomad/queues/batch_queue_manager.go @@ -56,9 +56,9 @@ func (b *BatchQueueManager) Enqueue(e *structs.Evaluation) { // if a node pool has a specific batch queue configuration, use that, // otherwise use the scheduler config queue. if queue, ok := b.nodePoolQueues[job.NodePool]; !ok { - b.defaultQueue.Enqueue(e) + b.defaultQueue.Enqueue(e, job) } else { - queue.Enqueue(e) + queue.Enqueue(e, job) } } diff --git a/nomad/queues/dynamic_priority_queue.go b/nomad/queues/dynamic_priority_queue.go index 0e9bf381a88..c52744ed34c 100644 --- a/nomad/queues/dynamic_priority_queue.go +++ b/nomad/queues/dynamic_priority_queue.go @@ -121,8 +121,8 @@ func (d *DynamicPriorityQueue) Stop() { // It generates a workload with an empty priority, appends it // to an internal channel to be processed and added to the actual // heap container. -func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation) { - w := d.generateWorkload(e) +func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation, j *structs.Job) { + w := d.generateWorkload(e, j) // in the event of an empty workload, just pass eval to eval broker if w == nil { @@ -206,12 +206,7 @@ func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { } // generateWorkload is used to create an initial workload from a given evaluation -func (d *DynamicPriorityQueue) generateWorkload(e *structs.Evaluation) *Workload { - job, err := d.state.JobByID(nil, e.Namespace, e.JobID) - if err != nil { - return nil - } - +func (d *DynamicPriorityQueue) generateWorkload(eval *structs.Evaluation, job *structs.Job) *Workload { tid := "" switch d.tenantType { case "namespace": @@ -230,7 +225,7 @@ func (d *DynamicPriorityQueue) generateWorkload(e *structs.Evaluation) *Workload return &Workload{ tid: TenantID(tid), priority: 0, - eval: e, + eval: eval, size: 0, } } diff --git a/nomad/queues/interface.go b/nomad/queues/interface.go index db63a0d397a..ac0733c397e 100644 --- a/nomad/queues/interface.go +++ b/nomad/queues/interface.go @@ -10,7 +10,7 @@ import ( ) type Queue interface { - Enqueue(*structs.Evaluation) + Enqueue(*structs.Evaluation, *structs.Job) Start(context.Context) error Stop() } diff --git a/nomad/queues/passthrough_queue.go b/nomad/queues/passthrough_queue.go index aca149b8f18..75808634358 100644 --- a/nomad/queues/passthrough_queue.go +++ b/nomad/queues/passthrough_queue.go @@ -25,6 +25,6 @@ func (p *PassthroughQueue) Start(context.Context) error { return nil } func (p *PassthroughQueue) Stop() {} -func (p *PassthroughQueue) Enqueue(e *structs.Evaluation) { p.broker.Enqueue(e) } +func (p *PassthroughQueue) Enqueue(e *structs.Evaluation, j *structs.Job) { p.broker.Enqueue(e) } func (p *PassthroughQueue) SetEnabled(bool, *state.StateStore) {} From 76c4d34bd77f567ab41243d46bde29beb4ab2b6c Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Fri, 5 Jun 2026 12:16:13 -0400 Subject: [PATCH 11/14] use proper restore for eval broker --- nomad/leader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/leader.go b/nomad/leader.go index 1ac065ca76c..32adafc4ea0 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -835,7 +835,7 @@ func (s *Server) restoreEvals() error { if eval.Type == structs.JobTypeBatch && eval.TriggeredBy == structs.EvalTriggerJobRegister { s.batchQueueMgr.Enqueue(eval) } else { - s.evalBroker.Enqueue(eval) + s.evalBroker.Restore(eval) } } else if eval.ShouldBlock() { s.blockedEvals.Block(eval) From 98ec67e3b454fe3d7b499fe3dcddb4116c592fa9 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 9 Jun 2026 10:06:32 -0400 Subject: [PATCH 12/14] use context cancel to stop queue --- nomad/queues/dynamic_priority_queue.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/nomad/queues/dynamic_priority_queue.go b/nomad/queues/dynamic_priority_queue.go index c52744ed34c..384c8f29066 100644 --- a/nomad/queues/dynamic_priority_queue.go +++ b/nomad/queues/dynamic_priority_queue.go @@ -66,7 +66,8 @@ type DynamicPriorityQueue struct { logger hclog.Logger - stopCh chan struct{} + // cancelCtx is used to stop the queue + cancelCtx context.CancelFunc } type Tenant struct { @@ -102,19 +103,22 @@ func NewDynamicPriorityQueue(state *state.StateStore, broker Broker, qconf *stru state: state, logger: logger.Named("dynamic_priority_queue"), totalUsage: 0, - stopCh: make(chan struct{}), } } func (d *DynamicPriorityQueue) Start(ctx context.Context) error { - go d.runProducer(ctx) - go d.runConsumer(ctx) + runCtx, cancel := context.WithCancel(ctx) + d.cancelCtx = cancel + + go d.runProducer(runCtx) + go d.runConsumer(runCtx) return nil } func (d *DynamicPriorityQueue) Stop() { - close(d.stopCh) + // close(d.stopCh) + d.cancelCtx() } // Enqueue is the method used to put evaluations on the queue. @@ -140,8 +144,6 @@ func (d *DynamicPriorityQueue) runProducer(ctx context.Context) { select { case <-ctx.Done(): return - case <-d.stopCh: - return case w := <-d.enqueueCh: w.calculatePriority(w.eval.CreateTime) @@ -171,8 +173,6 @@ func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { select { case <-ctx.Done(): return - case <-d.stopCh: - return case <-d.qNotify: // Pop a workload off the queue if available From 090bd9730d9221aa11b6c8b1abc6dabb619b7317 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 9 Jun 2026 10:18:07 -0400 Subject: [PATCH 13/14] fix tests --- nomad/queues/batch_queue_manager_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/nomad/queues/batch_queue_manager_test.go b/nomad/queues/batch_queue_manager_test.go index 66dd702295d..8f6cb4a1749 100644 --- a/nomad/queues/batch_queue_manager_test.go +++ b/nomad/queues/batch_queue_manager_test.go @@ -4,6 +4,7 @@ package queues import ( + "context" "testing" "time" @@ -109,8 +110,9 @@ func TestBatchQueueManager_SetEnabled(t *testing.T) { stopCh1 := make(chan struct{}) stopCh2 := make(chan struct{}) // DynamicPriorityQueue closes the stop chan on Stop(), so we can use it to assert stop is called - mgr.defaultQueue = &DynamicPriorityQueue{stopCh: stopCh1} - mgr.nodePoolQueues["test"] = &DynamicPriorityQueue{stopCh: stopCh2} + _, cancel := context.WithCancel(t.Context()) + mgr.defaultQueue = &DynamicPriorityQueue{cancelCtx: cancel} + mgr.nodePoolQueues["test"] = &DynamicPriorityQueue{} mgr.SetEnabled(false, nil) @@ -132,8 +134,8 @@ func TestBatchQueueManager_Update(t *testing.T) { t.Run("updates default queue given empty node pool", func(t *testing.T) { mockBroker := &MockBroker{} mgr := NewBatchQueueMgr(t.Context(), structs.BatchQueue{}, mockBroker, hclog.Default()) - stopCh1 := make(chan struct{}) - mgr.defaultQueue = &DynamicPriorityQueue{stopCh: stopCh1} + _, cancel := context.WithCancel(t.Context()) + mgr.defaultQueue = &DynamicPriorityQueue{cancelCtx: cancel} before := mgr.defaultQueue mgr.Update("", &structs.BatchQueue{}) after := mgr.defaultQueue @@ -151,8 +153,8 @@ func TestBatchQueueManager_Update(t *testing.T) { t.Run("updates specific queue given node pool", func(t *testing.T) { mockBroker := &MockBroker{} mgr := NewBatchQueueMgr(t.Context(), structs.BatchQueue{}, mockBroker, hclog.Default()) - stopCh1 := make(chan struct{}) - mgr.nodePoolQueues["test"] = &DynamicPriorityQueue{stopCh: stopCh1} + _, cancel := context.WithCancel(t.Context()) + mgr.nodePoolQueues["test"] = &DynamicPriorityQueue{cancelCtx: cancel} before := mgr.nodePoolQueues["test"] mgr.Update("test", &structs.BatchQueue{}) From e1647b9b9ba19add18d84358401af5bdf4c813a1 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 9 Jun 2026 11:00:17 -0400 Subject: [PATCH 14/14] fix stop test --- nomad/queues/batch_queue_manager_test.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/nomad/queues/batch_queue_manager_test.go b/nomad/queues/batch_queue_manager_test.go index 8f6cb4a1749..0211487359d 100644 --- a/nomad/queues/batch_queue_manager_test.go +++ b/nomad/queues/batch_queue_manager_test.go @@ -107,23 +107,22 @@ func TestBatchQueueManager_SetEnabled(t *testing.T) { t.Run("stops queues when disabled", func(t *testing.T) { mockBroker := &MockBroker{} mgr := NewBatchQueueMgr(t.Context(), structs.BatchQueue{}, mockBroker, hclog.Default()) - stopCh1 := make(chan struct{}) - stopCh2 := make(chan struct{}) - // DynamicPriorityQueue closes the stop chan on Stop(), so we can use it to assert stop is called - _, cancel := context.WithCancel(t.Context()) - mgr.defaultQueue = &DynamicPriorityQueue{cancelCtx: cancel} - mgr.nodePoolQueues["test"] = &DynamicPriorityQueue{} + ctx1, cancel1 := context.WithCancel(t.Context()) + ctx2, cancel2 := context.WithCancel(t.Context()) + // DynamicPriorityQueue called the cancel func when stopped, so we can use it to assert stop is called + mgr.defaultQueue = &DynamicPriorityQueue{cancelCtx: cancel1} + mgr.nodePoolQueues["test"] = &DynamicPriorityQueue{cancelCtx: cancel2} mgr.SetEnabled(false, nil) select { - case <-stopCh1: + case <-ctx1.Done(): case <-time.After(50 * time.Millisecond): t.FailNow() } select { - case <-stopCh2: + case <-ctx2.Done(): case <-time.After(50 * time.Millisecond): t.FailNow() }