diff --git a/api/constraint.go b/api/constraint.go index ac85b682fc5..081b1757f75 100644 --- a/api/constraint.go +++ b/api/constraint.go @@ -3,6 +3,11 @@ package api +import ( + "errors" + "fmt" +) + const ( ConstraintDistinctProperty = "distinct_property" ConstraintDistinctHosts = "distinct_hosts" @@ -31,3 +36,123 @@ func NewConstraint(left, operand, right string) *Constraint { Operand: operand, } } + +type JobDependency struct { + Name string `hcl:"name,optional"` + Status string `hcl:"status,optional"` +} + +// JobDepdendency is kept as an alias for compatibility with callers using the +// legacy misspelled type name. +type JobDepdendency = JobDependency + +func NewJobDependency(name, status string) *JobDependency { + return &JobDependency{ + Name: name, + Status: status, + } +} + +func (d *JobDependency) Canonicalize() { + if d.Status == "" { + d.Status = "completed" + } +} + +func (d *JobDependency) Copy() *JobDependency { + if d == nil { + return nil + } + + copy := *d + return © +} + +func (d *JobDependency) Validate() error { + if d.Name == "" { + return errors.New("dependency job name is required") + } + + if d.Status == "" { + return errors.New("dependency job status is required") + } + + return nil +} + +// Dependency is used to serialize a job placement dependency. +type Dependency struct { + Timeout string `hcl:"timeout,optional"` + ActionOnTimeout string `hcl:"action_on_timeout,optional"` + Jobs []*JobDependency `hcl:"job,block"` +} + +func NewDependency(timeout, actionOnTimeout string, jobs ...JobDepdendency) *Dependency { + copyJobs := make([]*JobDependency, 0, len(jobs)) + for _, job := range jobs { + copyJobs = append(copyJobs, (&job).Copy()) + } + + return &Dependency{ + Timeout: timeout, + Jobs: copyJobs, + ActionOnTimeout: actionOnTimeout, + } +} + +func (d *Dependency) Canonicalize() { + if d.ActionOnTimeout == "" { + d.ActionOnTimeout = "reject" + } + + for _, job := range d.Jobs { + job.Canonicalize() + } +} + +func (d *Dependency) Copy() *Dependency { + if d == nil { + return nil + } + + jobs := make([]*JobDependency, 0, len(d.Jobs)) + for _, job := range d.Jobs { + jobs = append(jobs, job.Copy()) + } + + return &Dependency{ + Timeout: d.Timeout, + ActionOnTimeout: d.ActionOnTimeout, + Jobs: jobs, + } +} + +func (d *Dependency) Validate() error { + if d == nil { + return nil + } + + if d.Timeout == "" { + return errors.New("dependency timeout is required") + } + + if d.ActionOnTimeout == "" { + return errors.New("dependency action_on_timeout is required") + } + + if d.ActionOnTimeout != "reject" { + return fmt.Errorf("invalid dependency action_on_timeout %q", d.ActionOnTimeout) + } + + if len(d.Jobs) == 0 { + return errors.New("dependency requires at least one job block") + } + + for _, job := range d.Jobs { + if err := job.Validate(); err != nil { + return err + } + } + + return nil +} diff --git a/api/constraint_test.go b/api/constraint_test.go index b408374b6d5..5688d9cc879 100644 --- a/api/constraint_test.go +++ b/api/constraint_test.go @@ -21,3 +21,44 @@ func TestCompose_Constraints(t *testing.T) { } must.Eq(t, expect, c) } + +func TestCompose_Dependencies(t *testing.T) { + testutil.Parallel(t) + + d := NewDependency("10m", "reject", JobDepdendency{Name: "service-123", Status: "completed"}) + d.Canonicalize() + + must.Eq(t, "10m", d.Timeout) + must.Eq(t, "reject", d.ActionOnTimeout) + must.Len(t, 1, d.Jobs) + must.Eq(t, "service-123", d.Jobs[0].Name) + must.Eq(t, "completed", d.Jobs[0].Status) + must.NoError(t, d.Validate()) + + copy := d.Copy() + must.Eq(t, d, copy) + must.True(t, d.Jobs[0] != copy.Jobs[0]) +} + +func TestCompose_Dependencies_DefaultsAndValidation(t *testing.T) { + testutil.Parallel(t) + + d := &Dependency{ + Timeout: "10m", + Jobs: []*JobDependency{{ + Name: "service-123", + }}, + } + d.Canonicalize() + + must.Eq(t, "reject", d.ActionOnTimeout) + must.Eq(t, "completed", d.Jobs[0].Status) + must.NoError(t, d.Validate()) + + bad := &Dependency{ + Timeout: "10m", + ActionOnTimeout: "continue", + Jobs: []*JobDependency{{Name: "service-123", Status: "completed"}}, + } + must.Error(t, bad.Validate()) +} diff --git a/api/jobs.go b/api/jobs.go index 3ccc272c498..4b8e640b2f5 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -46,15 +46,11 @@ const ( // For Client configuration, if no region information is given, // the client node will default to be part of the GlobalRegion. GlobalRegion = "global" -) -const ( // RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by // enforcing the job modify index during registers. RegisterEnforceIndexErrPrefix = "Enforcing job modify index" -) -const ( // JobPeriodicLaunchSuffix is the string appended to the periodic jobs ID // when launching derived instances of it. JobPeriodicLaunchSuffix = "/periodic-" @@ -62,6 +58,10 @@ const ( // JobDispatchLaunchSuffix is the string appended to the parameterized job's ID // when dispatching instances of it. JobDispatchLaunchSuffix = "/dispatch-" + + JobStatusPending = "pending" // Pending means the job is waiting on scheduling + JobStatusRunning = "running" // Running means the job has non-terminal allocations + JobStatusDead = "dead" // Dead means all evaluation's and allocations are terminal ) // Jobs is used to access the job-specific endpoints. @@ -1115,6 +1115,7 @@ type Job struct { Datacenters []string `hcl:"datacenters,optional"` NodePool *string `mapstructure:"node_pool" hcl:"node_pool,optional"` Constraints []*Constraint `hcl:"constraint,block"` + Dependencies []*Dependency `hcl:"dependency,block"` Affinities []*Affinity `hcl:"affinity,block"` TaskGroups []*TaskGroup `hcl:"group,block"` Update *UpdateStrategy `hcl:"update,block"` @@ -1247,6 +1248,10 @@ func (j *Job) Canonicalize() { a.Canonicalize() } + for _, d := range j.Dependencies { + d.Canonicalize() + } + if j.UI != nil { j.UI.Canonicalize() } @@ -1399,6 +1404,12 @@ func (j *Job) Constrain(c *Constraint) *Job { return j } +// Depend is used to add a dependency to a job. +func (j *Job) Depend(d *Dependency) *Job { + j.Dependencies = append(j.Dependencies, d) + return j +} + // AddAffinity is used to add an affinity to a job. func (j *Job) AddAffinity(a *Affinity) *Job { j.Affinities = append(j.Affinities, a) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 79206781695..93337d22f1b 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -12,6 +12,7 @@ import ( "slices" "strconv" "strings" + "time" "github.com/golang/snappy" "github.com/hashicorp/nomad/acl" @@ -1145,6 +1146,7 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { Version: *job.Version, Constraints: ApiConstraintsToStructs(job.Constraints), Affinities: ApiAffinitiesToStructs(job.Affinities), + Dependencies: ApiDependenciesToStructs(job.Dependencies), UI: ApiJobUIConfigToStructs(job.UI), VersionTag: ApiJobVersionTagToStructs(job.VersionTag), } @@ -2262,6 +2264,46 @@ func ApiAffinitiesToStructs(in []*api.Affinity) []*structs.Affinity { return out } +func ApiDependenciesToStructs(in []*api.Dependency) *structs.Dependency { + if len(in) == 0 { + return nil + } + + for _, dep := range in { + if dep != nil { + return ApiDependencyToStructs(dep) + } + } + + return nil +} + +func ApiDependencyToStructs(in *api.Dependency) *structs.Dependency { + if in == nil { + return nil + } + + jobs := make([]*structs.JobDependency, 0, len(in.Jobs)) + for _, j := range in.Jobs { + if j == nil { + continue + } + + jobs = append(jobs, &structs.JobDependency{ + Name: j.Name, + Status: j.Status, + }) + } + + timeout, _ := time.ParseDuration(in.Timeout) + + return &structs.Dependency{ + Timeout: timeout, + ActionOnTimeout: in.ActionOnTimeout, + Jobs: jobs, + } +} + func ApiJobUIConfigToStructs(jobUI *api.JobUIConfig) *structs.JobUIConfig { if jobUI == nil { return nil diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 9ad273d7b86..dbfd5fcfd91 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -4676,3 +4676,24 @@ func TestConversion_ApiJobVersionTagToStructs(t *testing.T) { must.Eq(t, expected, result) }) } + +func TestConversion_ApiDependencyToStructs(t *testing.T) { + t.Run("nil dependency", func(t *testing.T) { + must.Nil(t, ApiDependencyToStructs(nil)) + }) + + t.Run("maps timeout, action and nested jobs", func(t *testing.T) { + in := &api.Dependency{ + Timeout: "10m", + ActionOnTimeout: "reject", + Jobs: []*api.JobDependency{ + {Name: "service-123", Status: "completed"}, + }, + } + + out := ApiDependencyToStructs(in) + must.Eq(t, 10*time.Minute, out.Timeout) + must.Eq(t, "reject", out.ActionOnTimeout) + must.Eq(t, []*structs.JobDependency{{Name: "service-123", Status: "completed"}}, out.Jobs) + }) +} diff --git a/jobspec2/hcl_conversions.go b/jobspec2/hcl_conversions.go index 214c53ebd1d..434276b0f4d 100644 --- a/jobspec2/hcl_conversions.go +++ b/jobspec2/hcl_conversions.go @@ -36,6 +36,7 @@ func newHCLDecoder() *gohcl.Decoder { // custom nomad types decoder.RegisterBlockDecoder(reflect.TypeOf(api.Affinity{}), decodeAffinity) decoder.RegisterBlockDecoder(reflect.TypeOf(api.Constraint{}), decodeConstraint) + decoder.RegisterBlockDecoder(reflect.TypeOf(api.Dependency{}), decodeDependency) return decoder } @@ -261,6 +262,11 @@ func decodeConstraint(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl. return diags } +func decodeDependency(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics { + d := val.(*api.Dependency) + return gohcl.DecodeBody(body, ctx, d) +} + func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics { tg := val.(*api.TaskGroup) diff --git a/jobspec2/parse_job.go b/jobspec2/parse_job.go index 72eebfc447a..f57dbcd3a0a 100644 --- a/jobspec2/parse_job.go +++ b/jobspec2/parse_job.go @@ -24,8 +24,16 @@ func normalizeJob(jc *jobConfig) { j.Periodic.SpecType = &v } + if len(j.Dependencies) == 0 && len(jc.Dependencies) != 0 { + j.Dependencies = jc.Dependencies + } + normalizeVault(jc.Vault) + for _, d := range j.Dependencies { + normalizeDependency(d) + } + if len(jc.Tasks) != 0 { alone := make([]*api.TaskGroup, 0, len(jc.Tasks)) for _, t := range jc.Tasks { @@ -96,6 +104,26 @@ func normalizeVault(v *api.Vault) { } } +func normalizeDependency(d *api.Dependency) { + if d == nil { + return + } + + if d.ActionOnTimeout == "" { + d.ActionOnTimeout = "reject" + } + + for _, depJob := range d.Jobs { + if depJob == nil { + continue + } + + if depJob.Status == "" { + depJob.Status = "completed" + } + } +} + func normalizeNetworkPorts(networks []*api.NetworkResource) { if networks == nil { return diff --git a/jobspec2/parse_test.go b/jobspec2/parse_test.go index 1a16d5fa43e..4ee5dca5644 100644 --- a/jobspec2/parse_test.go +++ b/jobspec2/parse_test.go @@ -220,7 +220,6 @@ func TestParse_Locals(t *testing.T) { variables { region_var = "default_region" } - locals { # literal local dc = "local_dc" @@ -262,6 +261,86 @@ job "example" { }) } +func TestParse_Dependencies(t *testing.T) { + t.Parallel() + + hcl := ` +job "example" { + type = "batch" + + dependency { + timeout = "10m" + job { + name = "upstream" + } + } + + group "g" { + task "t" { + driver = "raw_exec" + config { + command = "echo" + } + } + } +} +` + + out, err := ParseWithConfig(&ParseConfig{ + Path: "input.hcl", + Body: []byte(hcl), + AllowFS: true, + }) + require.NoError(t, err) + require.Len(t, out.Dependencies, 1) + require.Equal(t, "10m", out.Dependencies[0].Timeout) + require.Equal(t, "reject", out.Dependencies[0].ActionOnTimeout) + require.Len(t, out.Dependencies[0].Jobs, 1) + require.Equal(t, "upstream", out.Dependencies[0].Jobs[0].Name) + require.Equal(t, "completed", out.Dependencies[0].Jobs[0].Status) +} + +func TestParse_Dependencies_OnlyOneBlockAllowed(t *testing.T) { + t.Parallel() + + hcl := ` +job "example" { + type = "batch" + + dependency { + timeout = "10m" + job { + name = "upstream-a" + } + } + + dependency { + timeout = "10m" + job { + name = "upstream-b" + } + } + + group "g" { + task "t" { + driver = "raw_exec" + config { + command = "echo" + } + } + } +} +` + + _, err := ParseWithConfig(&ParseConfig{ + Path: "input.hcl", + Body: []byte(hcl), + AllowFS: true, + }) + require.Error(t, err) + require.Contains(t, err.Error(), "Duplicate dependency block") +} + func TestParse_FileOperators(t *testing.T) { t.Parallel() diff --git a/jobspec2/types.config.go b/jobspec2/types.config.go index ea6134c23b2..91a23ca424b 100644 --- a/jobspec2/types.config.go +++ b/jobspec2/types.config.go @@ -15,12 +15,13 @@ import ( ) const ( - variablesLabel = "variables" - variableLabel = "variable" - localsLabel = "locals" - vaultLabel = "vault" - taskLabel = "task" - secretLabel = "secret" + variablesLabel = "variables" + variableLabel = "variable" + localsLabel = "locals" + vaultLabel = "vault" + taskLabel = "task" + secretLabel = "secret" + dependencyLabel = "dependency" inputVariablesAccessor = "var" localsAccessor = "local" @@ -32,9 +33,10 @@ type jobConfig struct { ParseConfig *ParseConfig - Vault *api.Vault `hcl:"vault,block"` - Secrets []*api.Secret `hcl:"secret,block"` - Tasks []*api.Task `hcl:"task,block"` + Vault *api.Vault `hcl:"vault,block"` + Secrets []*api.Secret `hcl:"secret,block"` + Tasks []*api.Task `hcl:"task,block"` + Dependencies []*api.Dependency `hcl:"dependency,block"` InputVariables Variables LocalVariables Variables @@ -149,8 +151,10 @@ func (c *jobConfig) decodeTopLevelExtras(content *hcl.BodyContent, ctx *hcl.Eval var diags hcl.Diagnostics var foundVault *hcl.Block + var foundDependency *hcl.Block for _, b := range content.Blocks { - if b.Type == vaultLabel { + switch b.Type { + case vaultLabel: if foundVault != nil { diags = append(diags, &hcl.Diagnostic{ Severity: hcl.DiagError, @@ -169,20 +173,40 @@ func (c *jobConfig) decodeTopLevelExtras(content *hcl.BodyContent, ctx *hcl.Eval diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, v)...) c.Vault = v - } else if b.Type == taskLabel { + case taskLabel: t := &api.Task{} diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, t)...) if len(b.Labels) == 1 { t.Name = b.Labels[0] c.Tasks = append(c.Tasks, t) } - } else if b.Type == secretLabel { + + case secretLabel: t := &api.Secret{} diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, t)...) if len(b.Labels) == 1 { t.Name = b.Labels[0] c.Secrets = append(c.Secrets, t) } + + case dependencyLabel: + if foundDependency != nil { + diags = append(diags, &hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: fmt.Sprintf("Duplicate %s block", b.Type), + Detail: fmt.Sprintf( + "Only one block of type %q is allowed. Previous definition was at %s.", + b.Type, foundDependency.DefRange.String(), + ), + Subject: &b.DefRange, + }) + continue + } + foundDependency = b + + d := &api.Dependency{} + diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, d)...) + c.Dependencies = append(c.Dependencies, d) } } @@ -288,6 +312,7 @@ func (c *jobConfig) decodeJob(content *hcl.BodyContent, ctx *hcl.EvalContext) hc {Type: "vault"}, {Type: "secret", LabelNames: []string{"name"}}, {Type: "task", LabelNames: []string{"name"}}, + {Type: "dependency"}, }, }) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 6cfeae4b38a..ef0155cb74f 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -21,6 +21,10 @@ import ( "golang.org/x/time/rate" ) +type dependencyChecker interface { + HasDependencies(j *structs.Job) (bool, error) +} + // CoreScheduler is a special "scheduler" that is registered // as "_core". It is used to run various administrative work // across the cluster. @@ -39,17 +43,24 @@ type CoreScheduler struct { // (e.g., structs.CoreJobEvalGC) and time.Duration that will be used as GC // threshold value. customThresholdForObject map[string]*time.Duration + dependecyChecker dependencyChecker } // NewCoreScheduler is used to return a new system scheduler instance -func NewCoreScheduler(srv *Server, snap *state.StateSnapshot, planner sstructs.Planner) sstructs.Scheduler { +func NewCoreScheduler(srv *Server, snap *state.StateSnapshot, planner sstructs.Planner, opts ...sstructs.SchedulerOption) sstructs.Scheduler { s := &CoreScheduler{ srv: srv, snap: snap, logger: srv.logger.ResetNamed("core.sched"), planner: planner, customThresholdForObject: make(map[string]*time.Duration), + dependecyChecker: srv.dependencyCoordinator, + } + + for _, opt := range opts { + opt(s) } + return s } @@ -161,6 +172,16 @@ OUTER: continue } + free, err := c.dependecyChecker.HasDependencies(job) + if err != nil { + c.logger.Error("job GC failed to get dependencies for job", "job", job.ID, "error", err) + continue + } + + if free { + continue + } + ws := memdb.NewWatchSet() evals, err := c.snap.EvalsByJob(ws, job.Namespace, job.ID) if err != nil { diff --git a/nomad/server.go b/nomad/server.go index 04e833bf801..1314152cff0 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -53,6 +53,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/nomad/volumewatcher" "github.com/hashicorp/nomad/scheduler" + "github.com/hashicorp/nomad/scheduler/dependency" + "github.com/hashicorp/nomad/scheduler/loop_detection" sstructs "github.com/hashicorp/nomad/scheduler/structs" ) @@ -210,6 +212,8 @@ type Server struct { // capacity changes. blockedEvals *BlockedEvals + dependencyCoordinator *dependency.Coordinator + // evalBroker is used to manage the in-progress evaluations // that are waiting to be brokered to a sub-scheduler evalBroker *EvalBroker @@ -403,6 +407,11 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc // Create the blocked evals s.blockedEvals = NewBlockedEvals(s.evalBroker, s.logger) + // Create the dependency Coordinator + depCoordinator := dependency.NewCoordinator(s.logger, + loop_detection.New(s.logger), s.blockedEvals) + s.dependencyCoordinator = depCoordinator + // Create the RPC handler s.rpcHandler = newRpcHandler(s) diff --git a/nomad/structs/dependency_test.go b/nomad/structs/dependency_test.go new file mode 100644 index 00000000000..14debc7136e --- /dev/null +++ b/nomad/structs/dependency_test.go @@ -0,0 +1,76 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package structs + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" +) + +func TestDependency_CanonicalizeAndValidate(t *testing.T) { + ci.Parallel(t) + + d := &Dependency{ + Timeout: 10 * time.Minute, + Jobs: []*JobDependency{{ + Name: "service-123", + }}, + } + d.Canonicalize() + + must.Eq(t, "reject", d.ActionOnTimeout) + must.Eq(t, "completed", d.Jobs[0].Status) + must.NoError(t, d.Validate()) +} + +func TestDependency_CopyDeep(t *testing.T) { + ci.Parallel(t) + + d := &Dependency{ + Timeout: 10 * time.Minute, + ActionOnTimeout: "reject", + Jobs: []*JobDependency{{ + Name: "service-123", + Status: "completed", + }}, + } + + copy := d.Copy() + must.Eq(t, d, copy) + must.True(t, d.Jobs[0] != copy.Jobs[0]) + + copy.Jobs[0].Status = "running" + must.Eq(t, "completed", d.Jobs[0].Status) +} + +func TestJob_CopyIncludesDependencies(t *testing.T) { + ci.Parallel(t) + + j := &Job{ + ID: "job-id", + Name: "job-name", + Namespace: DefaultNamespace, + Type: JobTypeService, + TaskGroups: []*TaskGroup{{ + Name: "group", + Tasks: []*Task{{Name: "task", Driver: "raw_exec", Config: map[string]interface{}{"command": "/bin/date"}}}, + }}, + Dependencies: &Dependency{ + Timeout: 10 * time.Minute, + ActionOnTimeout: "reject", + Jobs: []*JobDependency{{ + Name: "service-123", + Status: "completed", + }}, + }, + } + + copy := j.Copy() + must.Eq(t, j.Dependencies, copy.Dependencies) + must.True(t, j.Dependencies != copy.Dependencies) + must.True(t, j.Dependencies.Jobs[0] != copy.Dependencies.Jobs[0]) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 722a3befd73..2a615272375 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4443,6 +4443,12 @@ type Job struct { // scheduling preferences that apply to all groups and tasks Affinities []*Affinity + // Dependencies can be specified at a job level, it used to express + // inter-job dependencies, such as "job A cannot start until job B is + // running". This can be used to express complex workflows with multiple + // jobs. + Dependencies *Dependency + // Spread can be specified at the job level to express spreading // allocations across a desired attribute, such as datacenter Spreads []*Spread @@ -4673,6 +4679,10 @@ func (j *Job) Canonicalize() { j.Spreads = nil } + if j.Dependencies != nil { + j.Dependencies.Canonicalize() + } + // Ensure the job is in a namespace. if j.Namespace == "" { j.Namespace = DefaultNamespace @@ -4697,6 +4707,7 @@ func (j *Job) Canonicalize() { if j.Periodic != nil { j.Periodic.Canonicalize() } + } // Copy returns a deep copy of the Job. It is expected that callers use recover. @@ -4710,6 +4721,7 @@ func (j *Job) Copy() *Job { nj.Datacenters = slices.Clone(j.Datacenters) nj.Constraints = CopySliceConstraints(j.Constraints) nj.Affinities = CopySliceAffinities(j.Affinities) + nj.Dependencies = j.Dependencies.Copy() nj.Multiregion = j.Multiregion.Copy() nj.UI = j.UI.Copy() nj.VersionTag = j.VersionTag.Copy() @@ -4777,6 +4789,13 @@ func (j *Job) Validate() error { mErr.Errors = append(mErr.Errors, outer) } } + + if j.Dependencies != nil { + if err := j.Dependencies.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Dependency validation failed: %s", err)) + } + } + if j.Type == JobTypeSystem { if j.Affinities != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have an affinity block")) @@ -10936,3 +10955,160 @@ func NewRpcError(err error, code *int64) *RpcError { func (r *RpcError) Error() string { return r.Message } + +type JobDependency struct { + Name string + Status string +} + +func (d *JobDependency) Equal(o *JobDependency) bool { + if d == nil || o == nil { + return d == o + } + + return d == o || + d.Name == o.Name && + d.Status == o.Status +} + +func (d *JobDependency) Validate() error { + if d == nil { + return errors.New("dependency job block is required") + } + + if d.Name == "" { + return errors.New("dependency job name is mandatory") + } + + if d.Status == "" { + return errors.New("dependency job status is mandatory") + } + + return nil +} + +func (d *JobDependency) Canonicalize() { + if d == nil { + return + } + + if d.Status == "" { + d.Status = "completed" + } +} + +func (d *JobDependency) String() string { + if d == nil { + return "" + } + + return fmt.Sprintf("%s: %s", d.Name, d.Status) +} + +// A Dependency is used to restrict placement options. +type Dependency struct { + Timeout time.Duration + ActionOnTimeout string + Jobs []*JobDependency +} + +// Equal checks if two dependencies are equal. +func (d *Dependency) Equal(o *Dependency) bool { + if d == nil || o == nil { + return d == o + } + + if len(d.Jobs) != len(o.Jobs) { + return false + } + + jEqual := true + for i := range d.Jobs { + jEqual = jEqual && d.Jobs[i].Equal(o.Jobs[i]) + } + + return d == o || + d.Timeout == o.Timeout && + d.ActionOnTimeout == o.ActionOnTimeout && + jEqual +} + +func (d *Dependency) Copy() *Dependency { + if d == nil { + return nil + } + + jobs := make([]*JobDependency, 0, len(d.Jobs)) + for _, job := range d.Jobs { + if job == nil { + jobs = append(jobs, nil) + continue + } + + copy := *job + jobs = append(jobs, ©) + } + + return &Dependency{ + Timeout: d.Timeout, + ActionOnTimeout: d.ActionOnTimeout, + Jobs: jobs, + } +} + +func (d *Dependency) String() string { + jobs := make([]string, 0, len(d.Jobs)) + for _, j := range d.Jobs { + jobs = append(jobs, j.String()) + } + + return fmt.Sprintf("%s %s: %s", d.Timeout, d.ActionOnTimeout, strings.Join(jobs, ", ")) +} + +func (d *Dependency) Validate() error { + var mErr multierror.Error + if d == nil { + return nil + } + + if d.Timeout <= 0 { + mErr.Errors = append(mErr.Errors, errors.New("Missing or invalid timeout in dependency")) + } + + if d.ActionOnTimeout == "" { + mErr.Errors = append(mErr.Errors, errors.New("Missing action_on_timeout in dependency")) + } else if d.ActionOnTimeout != "reject" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Invalid action_on_timeout in dependency: %q", d.ActionOnTimeout)) + } + + if len(d.Jobs) == 0 { + mErr.Errors = append(mErr.Errors, errors.New("Missing job in dependency")) + } + + for idx, job := range d.Jobs { + if err := job.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Dependency job %d validation failed: %v", idx+1, err)) + } + } + + return mErr.ErrorOrNil() +} + +func (d *Dependency) Canonicalize() { + if d == nil { + return + } + + if d.ActionOnTimeout == "" { + d.ActionOnTimeout = "reject" + } + + for _, job := range d.Jobs { + job.Canonicalize() + } +} + +// DiffID fulfills the DiffableWithID interface. +func (d *Dependency) DiffID() string { + return d.String() +} diff --git a/nomad/worker.go b/nomad/worker.go index 8b316b51d43..415dd5b73b9 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -623,9 +623,10 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua // Create the scheduler, or use the special core scheduler var sched sstructs.Scheduler if eval.Type == structs.JobTypeCore { - sched = NewCoreScheduler(w.srv, snap, w) + sched = NewCoreScheduler(w.srv, snap, w, scheduler.WithDependencyChecker(w.srv.dependencyCoordinator)) } else { - sched, err = scheduler.NewScheduler(eval.Type, w.logger, w.srv.workersEventCh, snap, w) + sched, err = scheduler.NewScheduler(eval.Type, w.logger, w.srv.workersEventCh, + snap, w, scheduler.WithDependencyChecker(w.srv.dependencyCoordinator)) if err != nil { return fmt.Errorf("failed to instantiate scheduler: %v", err) } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index b825638eadc..0b623f44f2a 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -50,11 +50,11 @@ func (n *NoopScheduler) Process(eval *structs.Evaluation) error { func init() { scheduler.BuiltinSchedulers["noop"] = func( - logger log.Logger, eventsCh chan<- any, s sstructs.State, p sstructs.Planner, - ) sstructs.Scheduler { + logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, + planner sstructs.Planner, opts ...sstructs.SchedulerOption) sstructs.Scheduler { n := &NoopScheduler{ - state: s, - planner: p, + state: state, + planner: planner, } return n } diff --git a/scheduler/dependency/coordinator.go b/scheduler/dependency/coordinator.go new file mode 100644 index 00000000000..48c706fe0c9 --- /dev/null +++ b/scheduler/dependency/coordinator.go @@ -0,0 +1,255 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package dependency + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler/loop_detection" + sstructs "github.com/hashicorp/nomad/scheduler/structs" +) + +var DefaultTimeout = 10 * time.Minute + +type evalID = string + +type evalUnblocker interface { + Unblock(computedClass string, index uint64) chan struct{} +} + +type loopDetector interface { + AddNodes(dependantJob string, dependeeJob ...string) error + RemoveNode(dependantJob string) error +} + +type dependency struct { + cancelFunc context.CancelFunc + job *structs.Job + dependees []string +} + +type Coordinator struct { + mainContext context.Context + logger hclog.Logger + l sync.RWMutex + + dependencies map[evalID]*dependency + loopDetector loopDetector + blockedEvals evalUnblocker +} + +// NewCoordinator does blah blah blah +func NewCoordinator(logger hclog.Logger, loopDetector loopDetector, + blockedEvals evalUnblocker) *Coordinator { + return &Coordinator{ + mainContext: context.Background(), + logger: logger.Named("dependency-coordinator"), + dependencies: make(map[evalID]*dependency), + loopDetector: loopDetector, + blockedEvals: blockedEvals, + } +} + +func (c *Coordinator) unblockDependencies(eval *structs.Evaluation, dependeeJobs map[string]*structs.Job) error { + for _, job := range dependeeJobs { + + c.blockedEvals.Unblock(eval.ID, job.JobModifyIndex) + + c.l.Lock() + defer c.l.Unlock() + + delete(c.dependencies, eval.ID) + if err := c.loopDetector.RemoveNode(eval.JobID); err != nil { + c.logger.Error("failed to remove dependency", "error", err) + } + } + + return nil +} + +func (c *Coordinator) CheckDependency(state sstructs.State, job *structs.Job, eval *structs.Evaluation) (bool, error) { + + if job.Dependencies == nil { + return true, nil + } + + djSet := map[string]struct{}{} + for _, depJob := range job.Dependencies.Jobs { + if depJob == nil || depJob.Name == "" { + continue + } + + djSet[depJob.Name] = struct{}{} + } + + djIDs := make([]string, 0, len(djSet)) + for jobID := range djSet { + djIDs = append(djIDs, jobID) + } + + djs := map[string]*structs.Job{} + for _, jID := range djIDs { + j, err := state.JobByID(nil, job.Namespace, jID) + if err != nil { + c.logger.Error("failed to get job by ID", "error", err) + continue + } + djs[jID] = j + } + + ready, err := c.verifyDependencies(job, djs) + if err != nil { + c.logger.Error("failed to verify dependencies", "error", err) + } + + if ready { + return true, nil + } + + c.loopDetector.AddNodes(eval.JobID, djIDs...) + + ctx, cancel := context.WithDeadline(c.mainContext, time.Now().Add(dependencyTimeout(job))) + c.dependencies[eval.JobID] = &dependency{ + cancelFunc: cancel, + job: job, + dependees: djIDs, + } + + go c.waitForDependency(ctx, state, eval, djIDs...) + + return false, nil +} + +func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.State, + eval *structs.Evaluation, dependeeJobIDs ...string) { + + for { + ws := memdb.NewWatchSet() + dj := map[string]*structs.Job{} + + for _, jID := range dependeeJobIDs { + j, err := state.JobByID(ws, eval.Namespace, jID) + if err != nil { + c.logger.Error("failed to get job by ID", "error", err) + } + + dj[jID] = j + } + + select { + case <-ws.WatchCh(ctx): + + ready, err := c.verifyDependencies(c.dependencies[eval.JobID].job, dj) + if err != nil { + c.logger.Error("failed to verify dependency", "error", err) + continue + } + + if ready { + err := c.unblockDependencies(eval, dj) + if err != nil { + c.logger.Error("failed to unblock job", "error", err) + } + return + } + + case <-ctx.Done(): + c.unblockDependencies(eval, dj) + c.logger.Debug("dependency timeout reached", "jobID", eval.JobID) + return + } + } +} + +func (c *Coordinator) verifyDependencies(dependantJob *structs.Job, jobs map[string]*structs.Job) (bool, error) { + var mErr multierror.Error + ready := true + + for _, depJob := range dependantJob.Dependencies.Jobs { + if depJob == nil { + continue + } + + job, ok := jobs[depJob.Name] + if !ok { + mErr.Errors = append(mErr.Errors, errors.New("unable to check dependency for job: "+depJob.Name)) + ready = false + break + } + + if job == nil || !statusMatches(job.Status, depJob.Status) { + c.logger.Debug("job not preset yet", "jobID", depJob.Name) + ready = false + break + } + } + + return ready, mErr.ErrorOrNil() +} + +func statusMatches(actual, expected string) bool { + if expected == "" { + return actual == "" + } + + if expected == "completed" { + return actual == structs.JobStatusDead + } + + return actual == expected +} + +func dependencyTimeout(job *structs.Job) time.Duration { + timeout := DefaultTimeout + if job.Dependencies != nil && job.Dependencies.Timeout > 0 { + timeout = job.Dependencies.Timeout + } + + if timeout <= 0 { + return DefaultTimeout + } + + return timeout +} + +func (c *Coordinator) Stop() { + c.mainContext.Done() + c.dependencies = nil +} + +func (c *Coordinator) HasDependencies(j *structs.Job) (bool, error) { + err := c.loopDetector.RemoveNode(j.ID) + if err != nil { + if errors.Is(err, loop_detection.ErrNodeIsDependency) { + return true, nil + } + + if !errors.Is(err, loop_detection.ErrNodeNotFound) { + return false, err + } + } + + return false, nil +} + +func (c *Coordinator) Reload(state sstructs.State, evals ...*structs.Evaluation) { + for _, eval := range evals { + job, err := state.JobByID(nil, eval.Namespace, eval.JobID) + if err != nil { + c.logger.Error("failed to get job by ID", "error", err) + continue + } + _, err = c.CheckDependency(state, job, eval) + if err != nil { + c.logger.Error("failed to check dependency", "error", err) + } + } +} diff --git a/scheduler/feasible/context.go b/scheduler/feasible/context.go index 62d602c2379..df7aaefb1ae 100644 --- a/scheduler/feasible/context.go +++ b/scheduler/feasible/context.go @@ -18,6 +18,7 @@ import ( // Context is used to track contextual information used for placement type Context interface { + ConstraintContext // State is used to inspect the current global state State() sstructs.State @@ -27,9 +28,6 @@ type Context interface { // Logger provides a way to log Logger() log.Logger - // Metrics returns the current metrics - Metrics() *structs.AllocMetric - // Reset is invoked after making a placement Reset() @@ -38,15 +36,6 @@ type Context interface { // placements. ProposedAllocs(nodeID string) ([]*structs.Allocation, error) - // RegexpCache is a cache of regular expressions - RegexpCache() map[string]*regexp.Regexp - - // VersionConstraintCache is a cache of version constraints - VersionConstraintCache() map[string]VerConstraints - - // SemverConstraintCache is a cache of semver constraints - SemverConstraintCache() map[string]VerConstraints - // Eligibility returns a tracker for node eligibility in the context of the // eval. Eligibility() *EvalEligibility @@ -263,7 +252,8 @@ func (e *EvalEligibility) Reset() { // at the job and task group level. func (e *EvalEligibility) SetJob(job *structs.Job) { // Determine whether the job has escaped constraints. - e.jobEscaped = len(structs.EscapedConstraints(job.Constraints)) != 0 + e.jobEscaped = len(structs.EscapedConstraints(job.Constraints)) != 0 || + job.Dependencies != nil // Determine the escaped constraints per task group. for _, tg := range job.TaskGroups { diff --git a/scheduler/feasible/stack.go b/scheduler/feasible/stack.go index 62e68d853eb..765f53ca032 100644 --- a/scheduler/feasible/stack.go +++ b/scheduler/feasible/stack.go @@ -32,6 +32,8 @@ type Stack interface { // Select is used to select a node for the task group Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode + + SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) } type SelectOptions struct { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index bdc3cdcb2b9..8528fab2a62 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -62,7 +62,7 @@ type GenericScheduler struct { plan *structs.Plan planResult *structs.PlanResult ctx *feasible.EvalContext - stack *feasible.GenericStack + stack feasible.Stack // followUpEvals are evals with WaitUntil set, which are delayed until that time // before being rescheduled @@ -70,14 +70,16 @@ type GenericScheduler struct { deployment *structs.Deployment - blocked *structs.Evaluation - failedTGAllocs map[string]*structs.AllocMetric - queuedAllocs map[string]int - planAnnotations *structs.PlanAnnotations + blocked *structs.Evaluation + failedTGAllocs map[string]*structs.AllocMetric + queuedAllocs map[string]int + planAnnotations *structs.PlanAnnotations + getAvailableNodes func(job *structs.Job) ([]*structs.Node, map[string]int, error) } // NewServiceScheduler is a factory function to instantiate a new service scheduler -func NewServiceScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { +func NewServiceScheduler(logger log.Logger, eventsCh chan<- interface{}, + state sstructs.State, planner sstructs.Planner, _ ...sstructs.SchedulerOption) sstructs.Scheduler { s := &GenericScheduler{ logger: logger.Named("service_sched"), eventsCh: eventsCh, @@ -85,18 +87,9 @@ func NewServiceScheduler(logger log.Logger, eventsCh chan<- interface{}, state s planner: planner, batch: false, } - return s -} -// NewBatchScheduler is a factory function to instantiate a new batch scheduler -func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { - s := &GenericScheduler{ - logger: logger.Named("batch_sched"), - eventsCh: eventsCh, - state: state, - planner: planner, - batch: true, - } + s.getAvailableNodes = s.setNodes + return s } @@ -484,11 +477,15 @@ func (s *GenericScheduler) computePlacements( ) error { // Get the base nodes - nodes, byDC, err := s.setNodes(s.job) + nodes, byDC, err := s.getAvailableNodes(s.job) if err != nil { return err } + /* if len(nodes) == 0 { + return fmt.Errorf("no nodes available/pending dependencies to place on") + } */ + var deploymentID string if s.deployment != nil && s.deployment.Active() { deploymentID = s.deployment.ID @@ -546,7 +543,7 @@ func (s *GenericScheduler) computePlacements( s.setJob(downgradedJob) if needsToSetNodes(downgradedJob, s.job) { - nodes, byDC, err = s.setNodes(downgradedJob) + nodes, byDC, err = s.getAvailableNodes(downgradedJob) if err != nil { return err } @@ -588,7 +585,7 @@ func (s *GenericScheduler) computePlacements( s.setJob(s.job) if needsToSetNodes(downgradedJob, s.job) { - nodes, byDC, err = s.setNodes(s.job) + nodes, byDC, err = s.getAvailableNodes(s.job) if err != nil { return err } diff --git a/scheduler/loop_detection/README.md b/scheduler/loop_detection/README.md new file mode 100644 index 00000000000..3c4eed807c2 --- /dev/null +++ b/scheduler/loop_detection/README.md @@ -0,0 +1,91 @@ +# Loop Detection Dependency Graph + +This folder provides a small dependency graph implementation for detecting circular dependencies between jobs. + +Package name: `depgraph` + +## Public API + +```go +type Graph interface { + AddNodes(nodeID string, dependencies ...string) error + RemoveNode(nodeID string) error +} +``` + +Constructor: + +```go +g := depgraph.New() +``` + +## Internal Model + +The implementation keeps both of these structures: + +1. An array of linked lists (`allLists`) for all nodes. +2. A map from node ID to its linked list (`byNode`). + +It also tracks adjacency for efficient checks: + +- `deps`: node -> direct dependencies +- `dependents`: node -> direct dependents + +## Behavior + +### AddNodes + +`AddNodes(nodeID, deps...)` does the following: + +1. Validates IDs are non-empty. +2. Rejects self-dependency (`nodeID` depending on itself). +3. Creates missing nodes on demand. +4. Ignores duplicate dependencies in the same call. +5. Detects cycles before adding each edge. + +Cycle rule: + +- When adding `A -> B`, it checks whether `B` already reaches `A`. +- If yes, the new edge would create a loop and returns an error. + +### RemoveNode + +`RemoveNode(nodeID)` does the following: + +1. Returns `ErrNodeNotFound` if the node does not exist. +2. Returns `ErrNodeIsDependency` if any other node depends on it. +3. Removes the node if it has no dependents. +4. Prunes orphan dependency branches recursively. + +Orphan pruning means if a removed node had dependencies that are no longer required by anyone else, those dependency nodes are also removed. + +## Errors + +- `ErrEmptyNodeID` +- `ErrSelfDependency` +- `ErrNodeNotFound` +- `ErrNodeIsDependency` + +## Example + +```go +g := depgraph.New() + +_ = g.AddNodes("jobA", "jobB", "jobC") +_ = g.AddNodes("jobB", "jobD") + +// Would fail: jobD -> jobA closes a cycle jobA -> jobB -> jobD -> jobA +if err := g.AddNodes("jobD", "jobA"); err != nil { + // handle cycle error +} + +// Would fail while jobA depends on jobB +if err := g.RemoveNode("jobB"); err != nil { + // handle ErrNodeIsDependency +} +``` + +## Tests + +See test cases in `loop_detection_test.go`. +Each test includes an ASCII graph diagram before the test function. diff --git a/scheduler/loop_detection/loop_detection.go b/scheduler/loop_detection/loop_detection.go new file mode 100644 index 00000000000..4bf757aab94 --- /dev/null +++ b/scheduler/loop_detection/loop_detection.go @@ -0,0 +1,260 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package loop_detection + +import ( + "errors" + "fmt" + "sync" + + "github.com/hashicorp/go-hclog" +) + +var ( + ErrEmptyNodeID = errors.New("node id cannot be empty") + ErrSelfDependency = errors.New("node cannot depend on itself") + ErrNodeNotFound = errors.New("node not found") + ErrNodeIsDependency = errors.New("cannot remove node: another node depends on it") +) + +// Store implements Graph. +// Internally it keeps: +// 1) an array of linked lists +// 2) a map[nodeID]*linkedList +type Store struct { + logger hclog.Logger + mu sync.RWMutex + + allLists []*linkedList + byNode map[string]*linkedList + index map[string]int // nodeID -> position in allLists + + // adjacency: node -> dependencies + deps map[string]map[string]struct{} + // reverse adjacency: node -> dependents + dependents map[string]map[string]struct{} +} + +type listNode struct { + id string + next *listNode +} + +type linkedList struct { + head *listNode // head is the owner node + tail *listNode +} + +func newLinkedList(owner string) *linkedList { + h := &listNode{id: owner} + return &linkedList{head: h, tail: h} +} + +func (l *linkedList) appendUnique(dep string) bool { + if l.head == nil { + l.head = &listNode{id: dep} + l.tail = l.head + return true + } + for n := l.head.next; n != nil; n = n.next { + if n.id == dep { + return false + } + } + n := &listNode{id: dep} + l.tail.next = n + l.tail = n + return true +} + +// New creates an empty dependency graph. +func New(logger hclog.Logger) *Store { + return &Store{ + logger: logger.Named("loop-detection"), + allLists: make([]*linkedList, 0), + byNode: make(map[string]*linkedList), + index: make(map[string]int), + deps: make(map[string]map[string]struct{}), + dependents: make(map[string]map[string]struct{}), + } +} + +// AddNodes adds/updates nodeID with dependencies. +// It prevents circular dependencies. +func (s *Store) AddNodes(nodeID string, dependencies ...string) error { + s.mu.Lock() + defer s.mu.Unlock() + + if nodeID == "" { + return ErrEmptyNodeID + } + if err := s.ensureNode(nodeID); err != nil { + return err + } + + seen := make(map[string]struct{}, len(dependencies)) + for _, dep := range dependencies { + if dep == "" { + return ErrEmptyNodeID + } + if dep == nodeID { + return ErrSelfDependency + } + if _, ok := seen[dep]; ok { + continue + } + seen[dep] = struct{}{} + + if err := s.ensureNode(dep); err != nil { + return err + } + + // If dep already reaches nodeID, adding nodeID->dep creates a cycle. + if s.reaches(dep, nodeID) { + return fmt.Errorf("circular dependency detected: %s -> %s would create a loop", nodeID, dep) + } + + if _, ok := s.deps[nodeID][dep]; ok { + continue // edge already exists + } + + s.deps[nodeID][dep] = struct{}{} + s.dependents[dep][nodeID] = struct{}{} + s.byNode[nodeID].appendUnique(dep) + } + + return nil +} + +// RemoveNode removes nodeID if no other node depends on it. +// Also prunes orphan dependency branches that become unreferenced. +func (s *Store) RemoveNode(nodeID string) error { + s.mu.Lock() + defer s.mu.Unlock() + + if nodeID == "" { + return ErrEmptyNodeID + } + if _, ok := s.byNode[nodeID]; !ok { + return ErrNodeNotFound + } + if len(s.dependents[nodeID]) > 0 { + return ErrNodeIsDependency + } + + children := keysSet(s.deps[nodeID]) + + // Remove outgoing edges nodeID -> child. + for child := range s.deps[nodeID] { + delete(s.dependents[child], nodeID) + } + + delete(s.deps, nodeID) + delete(s.dependents, nodeID) + s.removeList(nodeID) + + // Remove orphan sub-branches. + for _, child := range children { + s.pruneOrphan(child) + } + return nil +} + +func (s *Store) ensureNode(nodeID string) error { + if nodeID == "" { + return ErrEmptyNodeID + } + if _, ok := s.byNode[nodeID]; ok { + if _, ok := s.deps[nodeID]; !ok { + s.deps[nodeID] = make(map[string]struct{}) + } + if _, ok := s.dependents[nodeID]; !ok { + s.dependents[nodeID] = make(map[string]struct{}) + } + return nil + } + + ll := newLinkedList(nodeID) + s.byNode[nodeID] = ll + s.index[nodeID] = len(s.allLists) + s.allLists = append(s.allLists, ll) + s.deps[nodeID] = make(map[string]struct{}) + s.dependents[nodeID] = make(map[string]struct{}) + return nil +} + +// reaches checks if start depends (directly/indirectly) on target. +func (s *Store) reaches(start, target string) bool { + if start == target { + return true + } + visited := map[string]struct{}{} + stack := []string{start} + + for len(stack) > 0 { + n := stack[len(stack)-1] + stack = stack[:len(stack)-1] + + if _, ok := visited[n]; ok { + continue + } + visited[n] = struct{}{} + + for dep := range s.deps[n] { + if dep == target { + return true + } + stack = append(stack, dep) + } + } + return false +} + +func (s *Store) pruneOrphan(nodeID string) { + if _, ok := s.byNode[nodeID]; !ok { + return + } + if len(s.dependents[nodeID]) > 0 { + return + } + + deps := s.deps[nodeID] + children := keysSet(deps) + + for child := range s.deps[nodeID] { + delete(s.dependents[child], nodeID) + } + delete(s.deps, nodeID) + delete(s.dependents, nodeID) + s.removeList(nodeID) + + for _, child := range children { + s.pruneOrphan(child) + } +} + +func (s *Store) removeList(nodeID string) { + i, ok := s.index[nodeID] + if !ok { + return + } + last := len(s.allLists) - 1 + if i != last { + s.allLists[i] = s.allLists[last] + owner := s.allLists[i].head.id + s.index[owner] = i + } + s.allLists[last] = nil + s.allLists = s.allLists[:last] + delete(s.index, nodeID) + delete(s.byNode, nodeID) +} + +func keysSet(m map[string]struct{}) []string { + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) + } + return out +} diff --git a/scheduler/loop_detection/loop_detection_test.go b/scheduler/loop_detection/loop_detection_test.go new file mode 100644 index 00000000000..92561847fad --- /dev/null +++ b/scheduler/loop_detection/loop_detection_test.go @@ -0,0 +1,138 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package loop_detection + +import ( + "errors" + "testing" + + "github.com/hashicorp/go-hclog" +) + +// Test case diagram: +// jobA --> jobB +// +// | +// +----> jobC +// +// Expected: +// - Add succeeds +// - No cycle error +func TestAddNodesNoCycle(t *testing.T) { + g := New(hclog.NewNullLogger()) + + if err := g.AddNodes("jobA", "jobB", "jobC"); err != nil { + t.Fatalf("unexpected error adding non-cyclic deps: %v", err) + } +} + +// Test case diagram: +// jobA --> jobB --> jobC +// +// ^ | +// +----------------+ +// +// Operation: +// - add edge jobC -> jobA +// +// Expected: +// - cycle detected (error) +func TestAddNodesDetectsCycle(t *testing.T) { + g := New(hclog.NewNullLogger()) + + if err := g.AddNodes("jobA", "jobB"); err != nil { + t.Fatalf("setup failed: %v", err) + } + if err := g.AddNodes("jobB", "jobC"); err != nil { + t.Fatalf("setup failed: %v", err) + } + + err := g.AddNodes("jobC", "jobA") + if err == nil { + t.Fatal("expected cycle error, got nil") + } +} + +// Test case diagram: +// jobA --> jobB +// +// Operation: +// - remove jobB +// +// Expected: +// - blocked, because jobA depends on jobB +// - returns ErrNodeIsDependency +func TestRemoveNodeBlockedIfDependedUpon(t *testing.T) { + g := New(hclog.NewNullLogger()) + + if err := g.AddNodes("jobA", "jobB"); err != nil { + t.Fatalf("setup failed: %v", err) + } + + err := g.RemoveNode("jobB") + if !errors.Is(err, ErrNodeIsDependency) { + t.Fatalf("expected ErrNodeIsDependency, got: %v", err) + } +} + +// Test case diagram: +// jobA --> jobB --> jobC +// +// Operation: +// - remove jobA +// +// Expected: +// - jobA removed +// - jobB and jobC pruned as orphans (no other dependents) +func TestRemoveNodePrunesOrphanChain(t *testing.T) { + g := New(hclog.NewNullLogger()) + + if err := g.AddNodes("jobA", "jobB"); err != nil { + t.Fatalf("setup failed: %v", err) + } + if err := g.AddNodes("jobB", "jobC"); err != nil { + t.Fatalf("setup failed: %v", err) + } + + if err := g.RemoveNode("jobA"); err != nil { + t.Fatalf("unexpected remove error: %v", err) + } + + if err := g.RemoveNode("jobB"); !errors.Is(err, ErrNodeNotFound) { + t.Fatalf("expected ErrNodeNotFound for pruned jobB, got: %v", err) + } + if err := g.RemoveNode("jobC"); !errors.Is(err, ErrNodeNotFound) { + t.Fatalf("expected ErrNodeNotFound for pruned jobC, got: %v", err) + } +} + +// Test case diagram: +// jobX --> jobY +// +// \\----> jobY (duplicate edge in same call) +// +// Expected: +// - add succeeds +// - duplicate dependency ignored (no error) +func TestAddNodesDuplicateDependenciesIgnored(t *testing.T) { + g := New(hclog.NewNullLogger()) + + if err := g.AddNodes("jobX", "jobY", "jobY"); err != nil { + t.Fatalf("unexpected error for duplicate dependency: %v", err) + } +} + +// Test case diagram: +// jobZ --> jobZ (self loop) +// +// Expected: +// - returns ErrSelfDependency +func TestAddNodesSelfDependencyRejected(t *testing.T) { + g := New(hclog.NewNullLogger()) + + err := g.AddNodes("jobZ", "jobZ") + if !errors.Is(err, ErrSelfDependency) { + t.Fatalf("expected ErrSelfDependency, got: %v", err) + } +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 267827829e0..2c230e5345c 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -8,6 +8,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/scheduler/structs" + sstructs "github.com/hashicorp/nomad/scheduler/structs" ) const ( @@ -27,10 +28,20 @@ var BuiltinSchedulers = map[string]structs.Factory{ "sysbatch": NewSysBatchScheduler, } +func WithDependencyChecker(dependecyChecker DependencyChecker) sstructs.SchedulerOption { + return func(s sstructs.Scheduler) error { + if B, ok := s.(*BatchScheduler); ok { + B.dependencyChecker = dependecyChecker + } + return nil + } +} + // NewScheduler is used to instantiate and return a new scheduler // given the scheduler name, initial state, and planner. func NewScheduler( name string, logger log.Logger, eventsCh chan<- interface{}, state structs.State, planner structs.Planner, + opts ...structs.SchedulerOption, ) (structs.Scheduler, error) { // Lookup the factory function factory, ok := BuiltinSchedulers[name] @@ -39,6 +50,6 @@ func NewScheduler( } // Instantiate the scheduler - sched := factory(logger, eventsCh, state, planner) + sched := factory(logger, eventsCh, state, planner, opts...) return sched, nil } diff --git a/scheduler/scheduler_batch.go b/scheduler/scheduler_batch.go new file mode 100644 index 00000000000..8c264628d2e --- /dev/null +++ b/scheduler/scheduler_batch.go @@ -0,0 +1,56 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package scheduler + +import ( + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/structs" + sstructs "github.com/hashicorp/nomad/scheduler/structs" +) + +type DependencyChecker interface { + CheckDependency(state sstructs.State, job *structs.Job, eval *structs.Evaluation) (bool, error) +} + +type BatchScheduler struct { + dependencyChecker DependencyChecker + GenericScheduler +} + +// NewBatchScheduler is a factory function to instantiate a new batch scheduler +func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, + planner sstructs.Planner, opts ...sstructs.SchedulerOption) sstructs.Scheduler { + bs := &BatchScheduler{ + GenericScheduler: GenericScheduler{ + logger: logger.Named("batch_sched"), + eventsCh: eventsCh, + state: state, + planner: planner, + batch: true, + }, + } + + for _, opt := range opts { + opt(bs) + } + + bs.getAvailableNodes = bs.setNodes + + return bs +} + +func (bs *BatchScheduler) setNodes(job *structs.Job) ([]*structs.Node, map[string]int, error) { + + ready, err := bs.dependencyChecker.CheckDependency(bs.state, job, bs.eval) + if err != nil { + return []*structs.Node{}, nil, err + } + + if !ready { + _, _, byDC, err := readyNodesInDCsAndPool(bs.state, job.Datacenters, job.NodePool) + return []*structs.Node{}, byDC, err + } + + return bs.GenericScheduler.setNodes(job) +} diff --git a/scheduler/scheduler_sysbatch.go b/scheduler/scheduler_sysbatch.go index 1a2646e3f07..dfe3253af80 100644 --- a/scheduler/scheduler_sysbatch.go +++ b/scheduler/scheduler_sysbatch.go @@ -50,7 +50,9 @@ type SysBatchScheduler struct { planAnnotations *structs.PlanAnnotations } -func NewSysBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { +func NewSysBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, + state sstructs.State, planner sstructs.Planner, + _ ...sstructs.SchedulerOption) sstructs.Scheduler { return &SysBatchScheduler{ logger: logger.Named("sysbatch_sched"), eventsCh: eventsCh, diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index b323f111bb3..c07249aa865 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -59,7 +59,9 @@ type SystemScheduler struct { // NewSystemScheduler is a factory function to instantiate a new system // scheduler. -func NewSystemScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { +func NewSystemScheduler(logger log.Logger, eventsCh chan<- interface{}, + state sstructs.State, planner sstructs.Planner, + _ ...sstructs.SchedulerOption) sstructs.Scheduler { return &SystemScheduler{ logger: logger.Named("system_sched"), eventsCh: eventsCh, diff --git a/scheduler/structs/interfaces.go b/scheduler/structs/interfaces.go index 448a7e2d853..8c014eb5a3f 100644 --- a/scheduler/structs/interfaces.go +++ b/scheduler/structs/interfaces.go @@ -16,8 +16,10 @@ import ( * This package contains top-level interfaces used throughout the scheduler. */ +type SchedulerOption func(Scheduler) error + // Factory is used to instantiate a new Scheduler -type Factory func(log.Logger, chan<- interface{}, State, Planner) Scheduler +type Factory func(log.Logger, chan<- interface{}, State, Planner, ...SchedulerOption) Scheduler // Scheduler is the top level instance for a scheduler. A scheduler is // meant to only encapsulate business logic, pushing the various plumbing diff --git a/scheduler/util.go b/scheduler/util.go index 955da6a658b..93f7e604e5b 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -165,6 +165,11 @@ var same = comparison{modified: false} // tasksUpdated creates a comparison between task groups to see if the tasks, their // drivers, environment variables or config have been modified. func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) comparison { + if (jobA.Dependencies == nil) != (jobB.Dependencies == nil) || + (jobA.Dependencies != nil && !jobA.Dependencies.Equal(jobB.Dependencies)) { + return difference("job dependencies", jobA.Dependencies, jobB.Dependencies) + } + a := jobA.LookupTaskGroup(taskGroup) b := jobB.LookupTaskGroup(taskGroup) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 001bbbd802c..55fc1ad5a92 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -455,6 +455,22 @@ func TestTasksUpdated(t *testing.T) { must.True(t, tasksUpdated(j32, j33, name).modified) + // Change job dependency timeout + j34 := mock.Job() + j34.Dependencies = &structs.Dependency{ + Timeout: 10 * time.Minute, + ActionOnTimeout: "reject", + Jobs: []*structs.JobDependency{{ + Name: "service-123", + Status: "completed", + }}, + } + j35 := j34.Copy() + must.False(t, tasksUpdated(j34, j35, name).modified) + + j35.Dependencies.Timeout = 15 * time.Minute + must.True(t, tasksUpdated(j34, j35, name).modified) + } func TestTasksUpdated_connectServiceUpdated(t *testing.T) {