From 964f7ef89dbe53981a69fb262a54233d9627a2fe Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Thu, 28 May 2026 17:08:13 +0200 Subject: [PATCH 01/10] func: add new dependency constraint --- api/constraint.go | 36 +++++++++ api/jobs.go | 19 ++++- command/agent/job_endpoint.go | 26 ++++++ jobspec2/hcl_conversions.go | 33 ++++++++ jobspec2/parse_job.go | 22 ++++++ jobspec2/parse_test.go | 35 +++++++- jobspec2/types.config.go | 37 ++++++--- nomad/fsm.go | 92 +++++++++++++++++++-- nomad/structs/structs.go | 48 +++++++++++ scheduler/feasible/context.go | 16 +--- scheduler/feasible/dependencies.go | 66 ++++++++++++++++ scheduler/feasible/flowchart.md | 123 +++++++++++++++++++++++++++++ scheduler/feasible/stack.go | 7 +- 13 files changed, 521 insertions(+), 39 deletions(-) create mode 100644 scheduler/feasible/dependencies.go create mode 100644 scheduler/feasible/flowchart.md diff --git a/api/constraint.go b/api/constraint.go index ac85b682fc5..dac50ab314c 100644 --- a/api/constraint.go +++ b/api/constraint.go @@ -31,3 +31,39 @@ func NewConstraint(left, operand, right string) *Constraint { Operand: operand, } } + +// Dependency is used to serialize a job placement dependency. +type Dependency struct { + Name string `hcl:"name,label"` + Output string `hcl:"output,optional"` + Job string `hcl:"job"` +} + +func NewDependency(name, job, output string) *Dependency { + return &Dependency{ + Name: name, + Job: job, + Output: output, + } +} + +func (d *Dependency) Canonicalize() { + if d.Name == "" { + d.Name = d.Job + } + + if d.Output == "" { + d.Output = "dead" + } +} + +func (d *Dependency) Copy() *Dependency { + if d == nil { + return nil + } + return &Dependency{ + Job: d.Job, + Output: d.Output, + Name: d.Name, + } +} diff --git a/api/jobs.go b/api/jobs.go index 3ccc272c498..4b8e640b2f5 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -46,15 +46,11 @@ const ( // For Client configuration, if no region information is given, // the client node will default to be part of the GlobalRegion. GlobalRegion = "global" -) -const ( // RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by // enforcing the job modify index during registers. RegisterEnforceIndexErrPrefix = "Enforcing job modify index" -) -const ( // JobPeriodicLaunchSuffix is the string appended to the periodic jobs ID // when launching derived instances of it. JobPeriodicLaunchSuffix = "/periodic-" @@ -62,6 +58,10 @@ const ( // JobDispatchLaunchSuffix is the string appended to the parameterized job's ID // when dispatching instances of it. JobDispatchLaunchSuffix = "/dispatch-" + + JobStatusPending = "pending" // Pending means the job is waiting on scheduling + JobStatusRunning = "running" // Running means the job has non-terminal allocations + JobStatusDead = "dead" // Dead means all evaluation's and allocations are terminal ) // Jobs is used to access the job-specific endpoints. @@ -1115,6 +1115,7 @@ type Job struct { Datacenters []string `hcl:"datacenters,optional"` NodePool *string `mapstructure:"node_pool" hcl:"node_pool,optional"` Constraints []*Constraint `hcl:"constraint,block"` + Dependencies []*Dependency `hcl:"dependency,block"` Affinities []*Affinity `hcl:"affinity,block"` TaskGroups []*TaskGroup `hcl:"group,block"` Update *UpdateStrategy `hcl:"update,block"` @@ -1247,6 +1248,10 @@ func (j *Job) Canonicalize() { a.Canonicalize() } + for _, d := range j.Dependencies { + d.Canonicalize() + } + if j.UI != nil { j.UI.Canonicalize() } @@ -1399,6 +1404,12 @@ func (j *Job) Constrain(c *Constraint) *Job { return j } +// Depend is used to add a dependency to a job. +func (j *Job) Depend(d *Dependency) *Job { + j.Dependencies = append(j.Dependencies, d) + return j +} + // AddAffinity is used to add an affinity to a job. func (j *Job) AddAffinity(a *Affinity) *Job { j.Affinities = append(j.Affinities, a) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 79206781695..655c00ea1df 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1145,6 +1145,7 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { Version: *job.Version, Constraints: ApiConstraintsToStructs(job.Constraints), Affinities: ApiAffinitiesToStructs(job.Affinities), + Dependencies: ApiDependenciesToStructs(job.Dependencies), UI: ApiJobUIConfigToStructs(job.UI), VersionTag: ApiJobVersionTagToStructs(job.VersionTag), } @@ -2262,6 +2263,31 @@ func ApiAffinitiesToStructs(in []*api.Affinity) []*structs.Affinity { return out } +func ApiDependenciesToStructs(in []*api.Dependency) []*structs.Dependency { + if in == nil { + return nil + } + + out := make([]*structs.Dependency, len(in)) + for i, dep := range in { + out[i] = ApiDependencyToStructs(dep) + } + + return out +} + +func ApiDependencyToStructs(in *api.Dependency) *structs.Dependency { + if in == nil { + return nil + } + + return &structs.Dependency{ + Job: in.Job, + Output: in.Output, + Name: in.Name, + } +} + func ApiJobUIConfigToStructs(jobUI *api.JobUIConfig) *structs.JobUIConfig { if jobUI == nil { return nil diff --git a/jobspec2/hcl_conversions.go b/jobspec2/hcl_conversions.go index 214c53ebd1d..c20f891def4 100644 --- a/jobspec2/hcl_conversions.go +++ b/jobspec2/hcl_conversions.go @@ -36,6 +36,7 @@ func newHCLDecoder() *gohcl.Decoder { // custom nomad types decoder.RegisterBlockDecoder(reflect.TypeOf(api.Affinity{}), decodeAffinity) decoder.RegisterBlockDecoder(reflect.TypeOf(api.Constraint{}), decodeConstraint) + decoder.RegisterBlockDecoder(reflect.TypeOf(api.Dependency{}), decodeDependency) return decoder } @@ -261,6 +262,38 @@ func decodeConstraint(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl. return diags } +var dependencySpec = hcldec.ObjectSpec{ + "job": &hcldec.AttrSpec{Name: "job", Type: cty.String, Required: true}, + "output": &hcldec.AttrSpec{Name: "output", Type: cty.String, Required: false}, + "name": &hcldec.AttrSpec{Name: "name", Type: cty.String, Required: false}, +} + +func decodeDependency(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics { + d := val.(*api.Dependency) + + v, diags := hcldec.Decode(body, dependencySpec, ctx) + if len(diags) != 0 { + return diags + } + + attr := func(name string) string { + a := v.GetAttr(name) + if a.IsNull() { + return "" + } + return a.AsString() + } + + d.Job = attr("job") + d.Output = attr("output") + + if d.Name == "" { + d.Name = attr("name") + } + + return diags +} + func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics { tg := val.(*api.TaskGroup) diff --git a/jobspec2/parse_job.go b/jobspec2/parse_job.go index 72eebfc447a..24a00e6170a 100644 --- a/jobspec2/parse_job.go +++ b/jobspec2/parse_job.go @@ -24,8 +24,16 @@ func normalizeJob(jc *jobConfig) { j.Periodic.SpecType = &v } + if len(j.Dependencies) == 0 && len(jc.Dependencies) != 0 { + j.Dependencies = jc.Dependencies + } + normalizeVault(jc.Vault) + for _, d := range j.Dependencies { + normalizeDependency(d) + } + if len(jc.Tasks) != 0 { alone := make([]*api.TaskGroup, 0, len(jc.Tasks)) for _, t := range jc.Tasks { @@ -96,6 +104,20 @@ func normalizeVault(v *api.Vault) { } } +func normalizeDependency(d *api.Dependency) { + if d == nil { + return + } + + if d.Output == "" { + d.Output = "completed" + } + + if d.Name == "" { + d.Name = d.Job + } +} + func normalizeNetworkPorts(networks []*api.NetworkResource) { if networks == nil { return diff --git a/jobspec2/parse_test.go b/jobspec2/parse_test.go index 1a16d5fa43e..0bc591bbaed 100644 --- a/jobspec2/parse_test.go +++ b/jobspec2/parse_test.go @@ -220,7 +220,6 @@ func TestParse_Locals(t *testing.T) { variables { region_var = "default_region" } - locals { # literal local dc = "local_dc" @@ -262,6 +261,40 @@ job "example" { }) } +func TestParse_Dependencies(t *testing.T) { + t.Parallel() + + hcl := ` +job "example" { + type = "batch" + + dependency "alpha" { + job = "upstream" + } + + group "g" { + task "t" { + driver = "raw_exec" + config { + command = "echo" + } + } + } +} +` + + out, err := ParseWithConfig(&ParseConfig{ + Path: "input.hcl", + Body: []byte(hcl), + AllowFS: true, + }) + require.NoError(t, err) + require.Len(t, out.Dependencies, 1) + require.Equal(t, "alpha", out.Dependencies[0].Name) + require.Equal(t, "upstream", out.Dependencies[0].Job) + require.Equal(t, "completed", out.Dependencies[0].Output) +} + func TestParse_FileOperators(t *testing.T) { t.Parallel() diff --git a/jobspec2/types.config.go b/jobspec2/types.config.go index ea6134c23b2..c3767811e79 100644 --- a/jobspec2/types.config.go +++ b/jobspec2/types.config.go @@ -15,12 +15,13 @@ import ( ) const ( - variablesLabel = "variables" - variableLabel = "variable" - localsLabel = "locals" - vaultLabel = "vault" - taskLabel = "task" - secretLabel = "secret" + variablesLabel = "variables" + variableLabel = "variable" + localsLabel = "locals" + vaultLabel = "vault" + taskLabel = "task" + secretLabel = "secret" + dependencyLabel = "dependency" inputVariablesAccessor = "var" localsAccessor = "local" @@ -32,9 +33,10 @@ type jobConfig struct { ParseConfig *ParseConfig - Vault *api.Vault `hcl:"vault,block"` - Secrets []*api.Secret `hcl:"secret,block"` - Tasks []*api.Task `hcl:"task,block"` + Vault *api.Vault `hcl:"vault,block"` + Secrets []*api.Secret `hcl:"secret,block"` + Tasks []*api.Task `hcl:"task,block"` + Dependencies []*api.Dependency `hcl:"dependency,block"` InputVariables Variables LocalVariables Variables @@ -150,7 +152,8 @@ func (c *jobConfig) decodeTopLevelExtras(content *hcl.BodyContent, ctx *hcl.Eval var foundVault *hcl.Block for _, b := range content.Blocks { - if b.Type == vaultLabel { + switch b.Type { + case vaultLabel: if foundVault != nil { diags = append(diags, &hcl.Diagnostic{ Severity: hcl.DiagError, @@ -169,20 +172,29 @@ func (c *jobConfig) decodeTopLevelExtras(content *hcl.BodyContent, ctx *hcl.Eval diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, v)...) c.Vault = v - } else if b.Type == taskLabel { + case taskLabel: t := &api.Task{} diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, t)...) if len(b.Labels) == 1 { t.Name = b.Labels[0] c.Tasks = append(c.Tasks, t) } - } else if b.Type == secretLabel { + + case secretLabel: t := &api.Secret{} diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, t)...) if len(b.Labels) == 1 { t.Name = b.Labels[0] c.Secrets = append(c.Secrets, t) } + + case dependencyLabel: + d := &api.Dependency{} + diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, d)...) + if len(b.Labels) == 1 { + d.Name = b.Labels[0] + c.Dependencies = append(c.Dependencies, d) + } } } @@ -288,6 +300,7 @@ func (c *jobConfig) decodeJob(content *hcl.BodyContent, ctx *hcl.EvalContext) hc {Type: "vault"}, {Type: "secret", LabelNames: []string{"name"}}, {Type: "task", LabelNames: []string{"name"}}, + {Type: "dependency", LabelNames: []string{"name"}}, }, }) diff --git a/nomad/fsm.go b/nomad/fsm.go index 3a43d2a691f..1d1c51c9157 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -4,6 +4,7 @@ package nomad import ( + "context" "errors" "fmt" "io" @@ -17,6 +18,7 @@ import ( metrics "github.com/hashicorp/go-metrics/compat" "github.com/hashicorp/go-msgpack/v2/codec" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -767,7 +769,7 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index } if req.Deployment != nil { - // Cancel any preivous deployment. + // Cancel any previous deployment. lastDeployment, err := n.state.LatestDeploymentByJobID(ws, req.Job.Namespace, req.Job.ID) if err != nil { return fmt.Errorf("failed to retrieve latest deployment: %v", err) @@ -1053,15 +1055,17 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt // Unblock evals for the nodes computed node class if the client has // finished running an allocation. for _, alloc := range req.Alloc { + + nodeID := alloc.NodeID + node, err := n.state.NodeByID(ws, nodeID) + if err != nil || node == nil { + n.logger.Error("looking up node failed", "node_id", nodeID, "error", err) + return err + + } + if alloc.ClientStatus == structs.AllocClientStatusComplete || alloc.ClientStatus == structs.AllocClientStatusFailed { - nodeID := alloc.NodeID - node, err := n.state.NodeByID(ws, nodeID) - if err != nil || node == nil { - n.logger.Error("looking up node failed", "node_id", nodeID, "error", err) - return err - - } // Unblock any associated quota quota, err := n.allocQuota(alloc.ID) @@ -1073,6 +1077,7 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt n.blockedEvals.UnblockClassAndQuota(node.ComputedClass, quota, index) n.blockedEvals.UnblockNode(node.ID) } + n.blockedEvals.Unblock(node.ComputedClass, index) } // It's possible that allocs on different nodes were marked unknown in the @@ -3403,3 +3408,74 @@ type TimeTableEntry struct { Index uint64 Time time.Time } + +type dependecy struct { + cancelFunc context.CancelFunc + job *structs.Job +} + +type Coordinator struct { + logger hclog.Logger + state sstructs.State + l sync.RWMutex + dependecies map[string]*dependecy + evalBroker *nomad.EvalBroker +} + +func (c *Coordinator) AddDependecy(ctx context.Context, eval *structs.Evaluation) { + + d := time.Now().Add(100 * time.Millisecond) + + job, err := c.state.JobByID(nil, eval.Namespace, eval.ID) + if err != nil { + c.logger.Error("coordinator error: looking up job: %w", err) + } + + ctx, cancel := context.WithDeadline(ctx, d) //configuredMaximumWaitTime) + c.dependecies[eval.JobID] = &dependecy{ + cancelFunc: cancel, + job: job, + } + + go c.waitForDependency(ctx, eval) +} + +func (c *Coordinator) waitForDependency(ctx context.Context, eval *structs.Evaluation) { + + for { + ws := memdb.NewWatchSet() + dj := []*structs.Job{} + for _, dep := range c.dependecies[eval.JobID].job.Dependencies { + j, err := c.state.JobByID(ws, eval.Namespace, dep.Job) + if err != nil { + c.logger.Error("coordinator error: looking up job: %w", err) + } + + dj = append(dj, j) + } + + select { + case <-ws.WatchCh(ctx): + ready, err := c.verifyDependency(eval.JobID, dj...) + if err != nil { + c.logger.Error("dependecy error: %w", err) + return + } + + if ready { + c.l.Lock() + defer c.l.Unlock() + + delete(c.dependecies, eval.ID) + return + } + + case <-ctx.Done(): + return + } + } +} + +func (c *Coordinator) verifyDependency(dependantJob string, dependeeJob ...*structs.Job) (bool, error) { + return true, nil +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 722a3befd73..19838989eab 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4443,6 +4443,12 @@ type Job struct { // scheduling preferences that apply to all groups and tasks Affinities []*Affinity + // Dependencies can be specified at a job level, it used to express + // inter-job dependencies, such as "job A cannot start until job B is + // running". This can be used to express complex workflows with multiple + // jobs. + Dependencies []*Dependency + // Spread can be specified at the job level to express spreading // allocations across a desired attribute, such as datacenter Spreads []*Spread @@ -10936,3 +10942,45 @@ func NewRpcError(err error, code *int64) *RpcError { func (r *RpcError) Error() string { return r.Message } + +// A Dependency is used to restrict placement options. +type Dependency struct { + Name string + Output string + Job string +} + +// Equal checks if two dependencies are equal. +func (d *Dependency) Equal(o *Dependency) bool { + return d == o || + d.Output == o.Output && + d.Job == o.Job +} + +func (d *Dependency) Copy() *Dependency { + if d == nil { + return nil + } + return &Dependency{ + Output: d.Output, + Job: d.Job, + } +} + +func (d *Dependency) String() string { + return fmt.Sprintf("%s: %s %s", d.Name, d.Output, d.Job) +} + +func (d *Dependency) Validate() error { + var mErr multierror.Error + if d.Job == "" { + mErr.Errors = append(mErr.Errors, errors.New("Missing job in dependency")) + } + + return mErr.ErrorOrNil() +} + +// DiffID fulfills the DiffableWithID interface. +func (d *Dependency) DiffID() string { + return d.String() +} diff --git a/scheduler/feasible/context.go b/scheduler/feasible/context.go index 62d602c2379..39e8b514988 100644 --- a/scheduler/feasible/context.go +++ b/scheduler/feasible/context.go @@ -18,6 +18,7 @@ import ( // Context is used to track contextual information used for placement type Context interface { + ConstraintContext // State is used to inspect the current global state State() sstructs.State @@ -27,9 +28,6 @@ type Context interface { // Logger provides a way to log Logger() log.Logger - // Metrics returns the current metrics - Metrics() *structs.AllocMetric - // Reset is invoked after making a placement Reset() @@ -38,15 +36,6 @@ type Context interface { // placements. ProposedAllocs(nodeID string) ([]*structs.Allocation, error) - // RegexpCache is a cache of regular expressions - RegexpCache() map[string]*regexp.Regexp - - // VersionConstraintCache is a cache of version constraints - VersionConstraintCache() map[string]VerConstraints - - // SemverConstraintCache is a cache of semver constraints - SemverConstraintCache() map[string]VerConstraints - // Eligibility returns a tracker for node eligibility in the context of the // eval. Eligibility() *EvalEligibility @@ -263,7 +252,8 @@ func (e *EvalEligibility) Reset() { // at the job and task group level. func (e *EvalEligibility) SetJob(job *structs.Job) { // Determine whether the job has escaped constraints. - e.jobEscaped = len(structs.EscapedConstraints(job.Constraints)) != 0 + e.jobEscaped = len(structs.EscapedConstraints(job.Constraints)) != 0 || + len(job.Dependencies) != 0 // Determine the escaped constraints per task group. for _, tg := range job.TaskGroups { diff --git a/scheduler/feasible/dependencies.go b/scheduler/feasible/dependencies.go new file mode 100644 index 00000000000..6dacde49e7d --- /dev/null +++ b/scheduler/feasible/dependencies.go @@ -0,0 +1,66 @@ +// Copyright IBM Corp. 2015, 2025 +// SPDX-License-Identifier: BUSL-1.1 + +package feasible + +import ( + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/structs" + sstructs "github.com/hashicorp/nomad/scheduler/structs" +) + +// DependencyChecker is a FeasibilityChecker which returns nodes that match a +// given set of dependencies. This is used to filter on job, task group, and task +// dependencies. +type DependencyChecker struct { + log log.Logger + state sstructs.State + job *structs.Job + metrics *structs.AllocMetric + ready bool +} + +// NewDependencyChecker creates a DependencyChecker for a set of dependencies +func NewDependencyChecker(c Context, dependencies []*structs.Dependency) *DependencyChecker { + return &DependencyChecker{ + log: c.Logger().Named("dependency_checker"), + state: c.State(), + ready: true, + metrics: c.Metrics(), + job: c.Plan().Job, + } +} + +func (dc *DependencyChecker) SetDependencies(dependencies []*structs.Dependency) { + for _, dep := range dependencies { + job, err := dc.state.JobByID(nil, dc.job.Namespace, dep.Job) + if err != nil { + dc.log.Error("error looking up dependency job", "dependency_job_id", dep.Job, "error", err) + } + + ready, err := dc.verifyDependency(dc.job.ID, job) + if err != nil { + dc.log.Error("error verify dependency for job", "dependency_job_id", dep.Job, "error", err) + } + + dc.ready = dc.ready && ready + } +} + +func (dc *DependencyChecker) Feasible(option *structs.Node) bool { + if !dc.ready { + dc.metrics.FilterNode(option, "dependency_not_ready") + return false + } + + return true +} + +func (c *DependencyChecker) verifyDependency(dependantJob string, dependeeJob ...*structs.Job) (bool, error) { + return true, nil +} + +type Dependecy struct { + updates <-chan struct{} + pendingJob *structs.Job +} diff --git a/scheduler/feasible/flowchart.md b/scheduler/feasible/flowchart.md new file mode 100644 index 00000000000..55f57b349bd --- /dev/null +++ b/scheduler/feasible/flowchart.md @@ -0,0 +1,123 @@ +# Feasible Package Flow + +This package is easiest to understand in two phases: + +- stack construction: build a chain of iterators and checkers +- selection: push one task group through that chain until one node wins + +## 1. Generic stack construction + +```mermaid +flowchart TD + A[Base node iterator] + A --> B[FeasibilityWrapper] + B --> B1[job checks] + B1 --> B1a[DependencyChecker] + B1 --> B1b[ConstraintChecker] + B --> B2[task group checks] + B2 --> B2a[DriverChecker] + B2 --> B2b[ConstraintChecker] + B2 --> B2c[DeviceChecker] + B2 --> B2d[NetworkChecker] + B2 --> B2e[SecretsProviderChecker] + B --> B3[availability checks] + B3 --> B3a[HostVolumeChecker] + B3 --> B3b[CSIVolumeChecker] + B --> C[DistinctHostsIterator] + C --> D[DistinctPropertyIterator] + D --> E[QuotaIterator] + E --> F[FeasibleRankIterator] + F --> G[BinPackIterator] + G --> H[JobAntiAffinityIterator] + H --> I[NodeReschedulingPenaltyIterator] + I --> J[NodeAffinityIterator] + J --> K[SpreadIterator] + K --> L[PreemptionScoringIterator] + L --> M[ScoreNormalizationIterator] + M --> N[LimitIterator] + N --> O[MaxScoreIterator] +``` + +This is the main idea in [scheduler/feasible/stack.go](/Users/juanita.delacuestamorales/go/src/github.com/hashicorp/nomad/scheduler/feasible/stack.go): + +- everything before `FeasibleRankIterator` is still filtering nodes out +- everything after `FeasibleRankIterator` is scoring or selecting among feasible nodes +- `MaxScoreIterator` is what finally chooses the winning node + +## 2. What happens during Select + +```mermaid +flowchart TD + A[Select called with task group and options] --> B{Preferred nodes present} + B -->|yes| C[Try only preferred nodes first] + C --> D{Found a feasible ranked node} + D -->|yes| Z[Return that node] + D -->|no| E[Restore full node set] + B -->|no| F[Reset scores and eval context] + E --> F + + F --> G[Collect task group requirements] + G --> H[drivers] + G --> I[constraints] + G --> J[devices] + G --> K[networks] + G --> L[secrets] + G --> M[volumes] + G --> N[affinities and spread] + + H --> O[Update iterators with current task group] + I --> O + J --> O + K --> O + L --> O + M --> O + N --> O + + O --> P[Begin iterating candidate nodes] + P --> Q{Computed class already known} + Q -->|ineligible| P + Q -->|eligible or unknown| R[Run job level feasibility] + + R --> R1{Dependencies ready} + R1 -->|no| P + R1 -->|yes| R2{Job constraints pass} + R2 -->|no| P + R2 -->|yes| S[Run task group feasibility] + + S --> S1{Drivers constraints devices network secrets pass} + S1 -->|no| P + S1 -->|yes| T[Run availability checks] + + T --> T1{Host volumes and CSI volumes available} + T1 -->|no| P + T1 -->|yes| U[Pass node into ranking pipeline] + + U --> V[Apply distinct host and distinct property rules] + V --> W[Apply quota check] + W --> X[Score node] + X --> X1[binpack] + X --> X2[job anti affinity] + X --> X3[reschedule penalty] + X --> X4[node affinity] + X --> X5[spread] + X --> X6[preemption scoring] + X --> Y[Normalize scores then limit search] + Y --> Z[Return highest scoring node] +``` + +## How to read the mechanism + +1. `SetNodes` chooses the starting population of nodes. In the generic stack it also shuffles them and sets a search limit. +2. `SetJob` pushes job-wide state into the iterators: job constraints, dependencies, distinctness, affinity, spread, quota context, and namespace or job IDs for volume checks. +3. `Select` pushes task-group-specific state into the same chain: drivers, constraints, devices, volumes, network, secrets, and any scoring context. +4. The feasibility wrapper is the hard gate. A node that fails there never reaches ranking. +5. The first important split is filter versus score. Filters answer "can this node run the task group at all". Scorers answer "which feasible node is best". +6. Dependencies are part of the job-level filter stage. If they are not ready, the node is rejected before any later ranking matters. +7. Distinct host, distinct property, and quota still behave like feasibility filters even though they are implemented as iterators later in the chain. +8. The final answer comes from the max-score step after normalization and limit logic. + +## Files to map back to code + +- [scheduler/feasible/stack.go](/Users/juanita.delacuestamorales/go/src/github.com/hashicorp/nomad/scheduler/feasible/stack.go) builds the iterator chain and drives `Select`. +- [scheduler/feasible/feasible.go](/Users/juanita.delacuestamorales/go/src/github.com/hashicorp/nomad/scheduler/feasible/feasible.go) contains the concrete feasibility checks. +- [scheduler/feasible/dependencies.go](/Users/juanita.delacuestamorales/go/src/github.com/hashicorp/nomad/scheduler/feasible/dependencies.go) handles job dependency readiness. diff --git a/scheduler/feasible/stack.go b/scheduler/feasible/stack.go index 62e68d853eb..1eaa76c1f3b 100644 --- a/scheduler/feasible/stack.go +++ b/scheduler/feasible/stack.go @@ -55,6 +55,7 @@ type GenericStack struct { jobNamespace string jobID string jobConstraint *ConstraintChecker + jobDependencies *DependencyChecker taskGroupDrivers *DriverChecker taskGroupConstraint *ConstraintChecker taskGroupDevices *DeviceChecker @@ -110,6 +111,7 @@ func (s *GenericStack) SetJob(job *structs.Job) { s.jobID = job.ID s.jobConstraint.SetConstraints(job.Constraints) + s.jobDependencies.SetDependencies(job.Dependencies) s.distinctHostsConstraint.SetJob(job) s.distinctPropertyConstraint.SetJob(job) s.binPack.SetJob(job) @@ -406,6 +408,9 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // Attach the job constraints. The job is filled in later. s.jobConstraint = NewConstraintChecker(ctx, nil) + // Attach the job dependencies checker. The job is filled in later. + s.jobDependencies = NewDependencyChecker(ctx, nil) + // Filter on task group drivers first as they are faster s.taskGroupDrivers = NewDriverChecker(ctx, nil) @@ -431,7 +436,7 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // which feasibility checking can be skipped if the computed node class has // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. - jobs := []FeasibilityChecker{s.jobConstraint} + jobs := []FeasibilityChecker{s.jobDependencies, s.jobConstraint} tgs := []FeasibilityChecker{ s.taskGroupDrivers, s.taskGroupConstraint, From 7871d906e5038ff103aba1d8b1d68045d6312675 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Thu, 4 Jun 2026 15:43:52 +0200 Subject: [PATCH 02/10] fix dependencies --- nomad/fsm.go | 73 ------------------------ scheduler/batch_scheduler.go | 53 ++++++++++++++++++ scheduler/dependency/coordinator.go | 86 +++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 73 deletions(-) create mode 100644 scheduler/batch_scheduler.go create mode 100644 scheduler/dependency/coordinator.go diff --git a/nomad/fsm.go b/nomad/fsm.go index 1d1c51c9157..f1d7a6b0d10 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -4,7 +4,6 @@ package nomad import ( - "context" "errors" "fmt" "io" @@ -18,7 +17,6 @@ import ( metrics "github.com/hashicorp/go-metrics/compat" "github.com/hashicorp/go-msgpack/v2/codec" "github.com/hashicorp/nomad/helper/uuid" - "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -3408,74 +3406,3 @@ type TimeTableEntry struct { Index uint64 Time time.Time } - -type dependecy struct { - cancelFunc context.CancelFunc - job *structs.Job -} - -type Coordinator struct { - logger hclog.Logger - state sstructs.State - l sync.RWMutex - dependecies map[string]*dependecy - evalBroker *nomad.EvalBroker -} - -func (c *Coordinator) AddDependecy(ctx context.Context, eval *structs.Evaluation) { - - d := time.Now().Add(100 * time.Millisecond) - - job, err := c.state.JobByID(nil, eval.Namespace, eval.ID) - if err != nil { - c.logger.Error("coordinator error: looking up job: %w", err) - } - - ctx, cancel := context.WithDeadline(ctx, d) //configuredMaximumWaitTime) - c.dependecies[eval.JobID] = &dependecy{ - cancelFunc: cancel, - job: job, - } - - go c.waitForDependency(ctx, eval) -} - -func (c *Coordinator) waitForDependency(ctx context.Context, eval *structs.Evaluation) { - - for { - ws := memdb.NewWatchSet() - dj := []*structs.Job{} - for _, dep := range c.dependecies[eval.JobID].job.Dependencies { - j, err := c.state.JobByID(ws, eval.Namespace, dep.Job) - if err != nil { - c.logger.Error("coordinator error: looking up job: %w", err) - } - - dj = append(dj, j) - } - - select { - case <-ws.WatchCh(ctx): - ready, err := c.verifyDependency(eval.JobID, dj...) - if err != nil { - c.logger.Error("dependecy error: %w", err) - return - } - - if ready { - c.l.Lock() - defer c.l.Unlock() - - delete(c.dependecies, eval.ID) - return - } - - case <-ctx.Done(): - return - } - } -} - -func (c *Coordinator) verifyDependency(dependantJob string, dependeeJob ...*structs.Job) (bool, error) { - return true, nil -} diff --git a/scheduler/batch_scheduler.go b/scheduler/batch_scheduler.go new file mode 100644 index 00000000000..27261630a7a --- /dev/null +++ b/scheduler/batch_scheduler.go @@ -0,0 +1,53 @@ +// Copyright IBM Corp. 2015, 2025 +// SPDX-License-Identifier: BUSL-1.1 + +package scheduler + +/* +type BatchScheduler struct { + dependencyChecker DependencyChecker + GenericScheduler +} + +type DependencyChecker interface { + DependenciesMeet(job *structs.Job) (bool, error) +} + +// NewBatchScheduler is a factory function to instantiate a new batch scheduler +func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, + dc DependencyChecker, planner sstructs.Planner) sstructs.Scheduler { + s := &BatchScheduler{ + GenericScheduler: GenericScheduler{ + logger: logger.Named("batch_sched"), + eventsCh: eventsCh, + state: state, + planner: planner, + batch: true, + }, + dependencyChecker: dc, + } + + return s +} + +func (s *BatchScheduler) evaluateDependencies(ws memdb.WatchSet) (bool, error) { + + var mErr multierror.Error + ready := true + + for _, dep := range s.job.Dependencies { + s.logger.Debug("watching dependency job for changes", "dependency_job_id", dep.Job) + + job, err := s.GenericScheduler.state.JobByID(ws, s.job.Namespace, dep.Job) + if err != nil { + mErr = *multierror.Append(&mErr, err) + } + + if job == nil || job.Status != dep.Output { + ready = false + } + } + + return ready, mErr.ErrorOrNil() +} +*/ diff --git a/scheduler/dependency/coordinator.go b/scheduler/dependency/coordinator.go new file mode 100644 index 00000000000..654d078771c --- /dev/null +++ b/scheduler/dependency/coordinator.go @@ -0,0 +1,86 @@ +package dependency + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad" + "github.com/hashicorp/nomad/nomad/structs" + sstructs "github.com/hashicorp/nomad/scheduler/structs" +) + +type dependency struct { + cancelFunc context.CancelFunc + job *structs.Job +} + +type Coordinator struct { + logger hclog.Logger + state sstructs.State + l sync.RWMutex + + dependencies map[string]*dependency + evalBroker *nomad.EvalBroker +} + +func (c *Coordinator) AddDependecy(ctx context.Context, eval *structs.Evaluation) { + + job, err := c.state.JobByID(nil, eval.Namespace, eval.ID) + if err != nil { + c.logger.Error("failed to get job by ID", "error", err) + return + } + + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(10*time.Minute)) + c.dependencies[eval.JobID] = &dependency{ + cancelFunc: cancel, + job: job, + } + + go c.waitForDependency(ctx, eval) +} + +func (c *Coordinator) waitForDependency(ctx context.Context, eval *structs.Evaluation) { + + for { + ws := memdb.NewWatchSet() + dj := []*structs.Job{} + + for _, dep := range c.dependencies[eval.JobID].job.Dependencies { + j, err := c.state.JobByID(ws, eval.Namespace, dep.Job) + if err != nil { + c.logger.Error("failed to get job by ID", "error", err) + } + dj = append(dj, j) + } + + select { + case <-ws.WatchCh(ctx): + ready, err := c.verifyDependency(eval.JobID, dj...) + if err != nil { + c.logger.Error("failed to verify dependency", "error", err) + continue + } + + if ready { + + c.l.Lock() + defer c.l.Unlock() + + c.evalBroker.Enqueue(eval) + delete(c.dependencies, eval.ID) + return + } + + case <-ctx.Done(): + return + } + } +} + +func (c *Coordinator) verifyDependency(dependantJob string, dependeeJob ...*structs.Job) (bool, error) { + return true, nil +} From 14d4c37f18ea1f0cdefc16a639cac2ab1a8a2ba4 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Mon, 8 Jun 2026 13:19:27 +0200 Subject: [PATCH 03/10] func: add coordinator and loop detection --- nomad/dependency/coordinator.go | 136 +++++++ .../loop_detection/loop_detection.go | 349 ++++++++++++++++++ nomad/server.go | 9 + nomad/worker.go | 3 +- scheduler/batch_scheduler.go | 46 +-- scheduler/dependency/coordinator.go | 86 ----- scheduler/feasible/dependencies.go | 66 ---- scheduler/feasible/stack.go | 9 +- scheduler/generic_sched.go | 21 +- scheduler/scheduler.go | 13 +- scheduler/scheduler_sysbatch.go | 4 +- scheduler/scheduler_system.go | 4 +- scheduler/structs/interfaces.go | 4 +- 13 files changed, 546 insertions(+), 204 deletions(-) create mode 100644 nomad/dependency/coordinator.go create mode 100644 nomad/dependency/loop_detection/loop_detection.go delete mode 100644 scheduler/dependency/coordinator.go delete mode 100644 scheduler/feasible/dependencies.go diff --git a/nomad/dependency/coordinator.go b/nomad/dependency/coordinator.go new file mode 100644 index 00000000000..62dbaad297c --- /dev/null +++ b/nomad/dependency/coordinator.go @@ -0,0 +1,136 @@ +package dependency + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad" + "github.com/hashicorp/nomad/nomad/structs" + sstructs "github.com/hashicorp/nomad/scheduler/structs" +) + +type evalID = string + +type StateStorage interface { + JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) +} + +type LoopDetector interface { + AddNodes(dependantJob string, dependeeJob ...string) error + RemoveNode(dependantJob string) error +} + +type dependency struct { + cancelFunc context.CancelFunc + job *structs.Job + dependees []string +} + +type Coordinator struct { + logger hclog.Logger + l sync.RWMutex + + dependencies map[evalID]*dependency + loopDetector LoopDetector + blockedEvals *nomad.BlockedEvals +} + +// TODO: Think how to rebuild out of evals! +func NewCoordinator(logger hclog.Logger, loopDetector LoopDetector, + blockedEvals *nomad.BlockedEvals) *Coordinator { + return &Coordinator{ + logger: logger, + dependencies: make(map[evalID]*dependency), + loopDetector: loopDetector, + blockedEvals: blockedEvals, + } +} + +func (c *Coordinator) unblockDependencies(eval *structs.Evaluation, dependeeJobs ...*structs.Job) error { + for _, job := range dependeeJobs { + + c.blockedEvals.Unblock(eval.ID, job.JobModifyIndex) + + c.l.Lock() + defer c.l.Unlock() + + delete(c.dependencies, eval.ID) + if err := c.loopDetector.RemoveNode(eval.JobID); err != nil { + c.logger.Error("failed to remove dependency", "error", err) + } + } + + return nil +} + +func (c *Coordinator) AddDependency(ctx context.Context, state sstructs.State, eval *structs.Evaluation) error { + + job, err := state.JobByID(nil, eval.Namespace, eval.ID) + if err != nil { + c.logger.Error("failed to get job by ID", "error", err) + return err + } + + djIDs := []string{} + for _, d := range job.Dependencies { + djIDs = append(djIDs, d.Job) + } + + c.loopDetector.AddNodes(eval.JobID, djIDs...) + + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(10*time.Minute)) + c.dependencies[eval.JobID] = &dependency{ + cancelFunc: cancel, + job: job, + dependees: djIDs, + } + + go c.waitForDependency(ctx, state, eval, djIDs...) + + return nil +} + +func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.State, + eval *structs.Evaluation, dependeeJobIDs ...string) { + + for { + ws := memdb.NewWatchSet() + dj := []*structs.Job{} + + for _, jID := range dependeeJobIDs { + j, err := state.JobByID(ws, eval.Namespace, jID) + if err != nil { + c.logger.Error("failed to get job by ID", "error", err) + } + + dj = append(dj, j) + } + + select { + case <-ws.WatchCh(ctx): + ready, err := c.verifyDependency(c.dependencies[eval.JobID].job, dj...) + if err != nil { + c.logger.Error("failed to verify dependency", "error", err) + continue + } + + if ready { + err := c.unblockDependencies(eval, dj...) + if err != nil { + c.logger.Error("failed to unblock job", "error", err) + } + return + } + + case <-ctx.Done(): + return + } + } +} + +func (c *Coordinator) verifyDependency(dependantJob *structs.Job, dependeeJob ...*structs.Job) (bool, error) { + return true, nil +} diff --git a/nomad/dependency/loop_detection/loop_detection.go b/nomad/dependency/loop_detection/loop_detection.go new file mode 100644 index 00000000000..f6a18f36b3b --- /dev/null +++ b/nomad/dependency/loop_detection/loop_detection.go @@ -0,0 +1,349 @@ +package loop_detection + +import ( + "fmt" + "sort" + "sync" + + log "github.com/hashicorp/go-hclog" +) + +// Loop Detection + Dependency Chain Update Flow +// +// [Start: incoming dependency edge job -> dependee] +// | +// v +// [Ensure nodes exist in graph index/map] +// | +// v +// [Check whether job or dependee already has chains] +// | +// v +// [Build/lookup candidate chain path(s)] +// | +// v +// [Run cycle/loop detection on path] +// | +// +---------+---------+ +// | | +// v v +// [Loop found? YES] [Loop found? NO] +// | | +// v v +// [Return error/reject] [Add edge job -> dependee] +// | +// v +// [Update node->chains mapping/index] +// | +// v +// [Dependency accepted / continue] +// +// ------------------------------------------------------- +// +// [Event: job becomes unblocked / completed / removed] +// | +// v +// [Remove job node and its outgoing/incoming edges] +// | +// v +// [Recompute/prune affected dependency chains] +// | +// v +// [Update node->chains mapping/index] +// | +// v +// [Done] + +// Detector tracks a directed dependency graph and maintains a chains index +// for each node. Edges are directed as "job -> dependee". +type Detector struct { + logger log.Logger + + mu sync.RWMutex + + // deps stores outgoing edges: job -> dependees. + deps map[string]map[string]struct{} + + // reverse stores incoming edges: dependee -> jobs that depend on it. + reverse map[string]map[string]struct{} + + // chains stores all root-to-leaf dependency chains for each node. + chains map[string][][]string + + // nodes stores pointers to graph nodes by job id. + nodes map[string]*Node + + // graphs stores dependency chain paths as arrays of node pointers. + graphs [][]*Node +} + +// Node represents a graph node and its outgoing dependencies. +type Node struct { + ID string + Dependencies []string +} + +// NewDetector creates an empty dependency detector. +func NewDetector(logger log.Logger) *Detector { + return &Detector{ + logger: logger.Named("loop_detector"), + deps: make(map[string]map[string]struct{}), + reverse: make(map[string]map[string]struct{}), + chains: make(map[string][][]string), + nodes: make(map[string]*Node), + graphs: make([][]*Node, 0), + } +} + +func (d *Detector) AddNodes(nodeID string, dependencies ...string) error { + if err := d.addNode(Node{ID: nodeID, Dependencies: dependencies}); err != nil { + return fmt.Errorf("failed to add node %q: %v", nodeID, err) + } + return nil +} + +// AddNode ensures node exists and adds edges node.ID -> dependency for each +// listed dependency. +// +// If any dependency would create a loop, AddNode returns an error and the +// graph remains unchanged. +func (d *Detector) addNode(node Node) error { + d.mu.Lock() + defer d.mu.Unlock() + + if node.ID == "" { + return fmt.Errorf("node id must be non-empty") + } + + d.ensureNode(node.ID) + + uniqueDeps := make(map[string]struct{}, len(node.Dependencies)) + for _, dep := range node.Dependencies { + if dep == "" { + return fmt.Errorf("dependency for %q must be non-empty", node.ID) + } + if dep == node.ID { + return fmt.Errorf("self-dependency is not allowed for %q", node.ID) + } + uniqueDeps[dep] = struct{}{} + } + + // Validate all candidate edges first so updates are all-or-nothing. + for dep := range uniqueDeps { + d.ensureNode(dep) + if _, exists := d.deps[node.ID][dep]; exists { + continue + } + if d.pathExists(dep, node.ID) { + return fmt.Errorf("adding dependency %q -> %q creates a loop", node.ID, dep) + } + } + + for dep := range uniqueDeps { + d.deps[node.ID][dep] = struct{}{} + d.reverse[dep][node.ID] = struct{}{} + } + + d.recomputeChainsLocked() + d.syncNodeIndexesLocked() + + return nil +} + +// RemoveNode removes the target node and its direct dependees (one hop only). +// Removal is non-cascading. +func (d *Detector) RemoveNode(node string) error { + d.mu.Lock() + defer d.mu.Unlock() + + if node == "" { + return fmt.Errorf("node id must be non-empty") + } + + if _, exists := d.deps[node]; !exists { + return nil + } + + directDependees := make([]string, 0, len(d.deps[node])) + for dependee := range d.deps[node] { + directDependees = append(directDependees, dependee) + } + + d.removeSingleNodeLocked(node) + for _, dependee := range directDependees { + d.removeSingleNodeLocked(dependee) + } + + d.recomputeChainsLocked() + d.syncNodeIndexesLocked() + + return nil +} + +func (d *Detector) ensureNode(node string) { + if _, ok := d.deps[node]; !ok { + d.deps[node] = make(map[string]struct{}) + } + if _, ok := d.reverse[node]; !ok { + d.reverse[node] = make(map[string]struct{}) + } + if _, ok := d.nodes[node]; !ok { + d.nodes[node] = &Node{ID: node} + } +} + +func (d *Detector) pathExists(from, to string) bool { + if from == to { + return true + } + + seen := make(map[string]struct{}) + stack := []string{from} + + for len(stack) > 0 { + n := stack[len(stack)-1] + stack = stack[:len(stack)-1] + + if n == to { + return true + } + if _, ok := seen[n]; ok { + continue + } + seen[n] = struct{}{} + + for next := range d.deps[n] { + if _, ok := seen[next]; !ok { + stack = append(stack, next) + } + } + } + + return false +} + +func (d *Detector) recomputeChainsLocked() { + allNodes := make(map[string]struct{}, len(d.deps)+len(d.reverse)) + for n := range d.deps { + allNodes[n] = struct{}{} + } + for n := range d.reverse { + allNodes[n] = struct{}{} + } + + d.chains = make(map[string][][]string, len(allNodes)) + memo := make(map[string][][]string, len(allNodes)) + visiting := make(map[string]bool, len(allNodes)) + + for n := range allNodes { + d.chains[n] = d.buildChainsFromLocked(n, memo, visiting) + } +} + +func (d *Detector) removeSingleNodeLocked(node string) { + // Remove outgoing edges: node -> dependee. + for dependee := range d.deps[node] { + delete(d.reverse[dependee], node) + if len(d.reverse[dependee]) == 0 { + delete(d.reverse, dependee) + } + } + + // Remove incoming edges: depender -> node. + for depender := range d.reverse[node] { + delete(d.deps[depender], node) + if len(d.deps[depender]) == 0 { + delete(d.deps, depender) + } + } + + delete(d.deps, node) + delete(d.reverse, node) + delete(d.chains, node) + delete(d.nodes, node) +} + +func (d *Detector) syncNodeIndexesLocked() { + for id := range d.deps { + d.ensureNode(id) + deps := make([]string, 0, len(d.deps[id])) + for dep := range d.deps[id] { + deps = append(deps, dep) + } + sort.Strings(deps) + d.nodes[id].Dependencies = deps + } + + for id := range d.nodes { + if _, ok := d.deps[id]; !ok { + delete(d.nodes, id) + } + } + + graphs := make([][]*Node, 0) + for _, paths := range d.chains { + for _, path := range paths { + nodePath := make([]*Node, 0, len(path)) + for _, id := range path { + n, ok := d.nodes[id] + if !ok { + d.ensureNode(id) + n = d.nodes[id] + } + nodePath = append(nodePath, n) + } + graphs = append(graphs, nodePath) + } + } + d.graphs = graphs +} + +func (d *Detector) buildChainsFromLocked(node string, memo map[string][][]string, visiting map[string]bool) [][]string { + if cached, ok := memo[node]; ok { + return copyPaths(cached) + } + + if visiting[node] { + // Guard for unexpected cycles. Cycles should not be reachable because + // AddDependency rejects loop-inducing edges. + return [][]string{{node}} + } + + visiting[node] = true + defer func() { visiting[node] = false }() + + nextNodes := make([]string, 0, len(d.deps[node])) + for next := range d.deps[node] { + nextNodes = append(nextNodes, next) + } + sort.Strings(nextNodes) + + if len(nextNodes) == 0 { + memo[node] = [][]string{{node}} + return copyPaths(memo[node]) + } + + paths := make([][]string, 0) + for _, next := range nextNodes { + subPaths := d.buildChainsFromLocked(next, memo, visiting) + for _, sub := range subPaths { + path := make([]string, 0, len(sub)+1) + path = append(path, node) + path = append(path, sub...) + paths = append(paths, path) + } + } + + memo[node] = copyPaths(paths) + return copyPaths(paths) +} + +func copyPaths(paths [][]string) [][]string { + out := make([][]string, len(paths)) + for i, p := range paths { + cp := make([]string, len(p)) + copy(cp, p) + out[i] = cp + } + return out +} diff --git a/nomad/server.go b/nomad/server.go index 04e833bf801..04c9dfa45a6 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -43,6 +43,8 @@ import ( "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/nomad/lib/auth/oidc" "github.com/hashicorp/nomad/nomad/auth" + "github.com/hashicorp/nomad/nomad/dependency" + "github.com/hashicorp/nomad/nomad/dependency/loop_detection" "github.com/hashicorp/nomad/nomad/deploymentwatcher" "github.com/hashicorp/nomad/nomad/drainer" "github.com/hashicorp/nomad/nomad/lock" @@ -210,6 +212,8 @@ type Server struct { // capacity changes. blockedEvals *BlockedEvals + dependencyCoordinator *dependency.Coordinator + // evalBroker is used to manage the in-progress evaluations // that are waiting to be brokered to a sub-scheduler evalBroker *EvalBroker @@ -403,6 +407,11 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc // Create the blocked evals s.blockedEvals = NewBlockedEvals(s.evalBroker, s.logger) + // Create the dependency Coordinator + depCoordinator := dependency.NewCoordinator(s.logger, + loop_detection.NewDetector(s.logger), s.blockedEvals) + s.dependencyCoordinator = depCoordinator + // Create the RPC handler s.rpcHandler = newRpcHandler(s) diff --git a/nomad/worker.go b/nomad/worker.go index 8b316b51d43..02f1810c394 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -625,7 +625,8 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua if eval.Type == structs.JobTypeCore { sched = NewCoreScheduler(w.srv, snap, w) } else { - sched, err = scheduler.NewScheduler(eval.Type, w.logger, w.srv.workersEventCh, snap, w) + sched, err = scheduler.NewScheduler(eval.Type, w.logger, w.srv.workersEventCh, + snap, w, scheduler.WithDependencyChecker(w.srv.dependencyCoordinator)) if err != nil { return fmt.Errorf("failed to instantiate scheduler: %v", err) } diff --git a/scheduler/batch_scheduler.go b/scheduler/batch_scheduler.go index 27261630a7a..73a7720e2cd 100644 --- a/scheduler/batch_scheduler.go +++ b/scheduler/batch_scheduler.go @@ -3,19 +3,26 @@ package scheduler -/* +import ( + "context" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/structs" + sstructs "github.com/hashicorp/nomad/scheduler/structs" +) + +type DependencyChecker interface { + AddDependency(ctx context.Context, state sstructs.State, eval *structs.Evaluation) error +} + type BatchScheduler struct { dependencyChecker DependencyChecker GenericScheduler } -type DependencyChecker interface { - DependenciesMeet(job *structs.Job) (bool, error) -} - // NewBatchScheduler is a factory function to instantiate a new batch scheduler func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, - dc DependencyChecker, planner sstructs.Planner) sstructs.Scheduler { + planner sstructs.Planner, opts ...sstructs.SchedulerOption) sstructs.Scheduler { s := &BatchScheduler{ GenericScheduler: GenericScheduler{ logger: logger.Named("batch_sched"), @@ -24,30 +31,15 @@ func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sst planner: planner, batch: true, }, - dependencyChecker: dc, + } + + for _, opt := range opts { + opt(s) } return s } -func (s *BatchScheduler) evaluateDependencies(ws memdb.WatchSet) (bool, error) { - - var mErr multierror.Error - ready := true - - for _, dep := range s.job.Dependencies { - s.logger.Debug("watching dependency job for changes", "dependency_job_id", dep.Job) - - job, err := s.GenericScheduler.state.JobByID(ws, s.job.Namespace, dep.Job) - if err != nil { - mErr = *multierror.Append(&mErr, err) - } - - if job == nil || job.Status != dep.Output { - ready = false - } - } - - return ready, mErr.ErrorOrNil() +func (s *BatchScheduler) setNodes(job *structs.Job) ([]*structs.Node, map[string]int, error) { + return s.GenericScheduler.setNodes(job) } -*/ diff --git a/scheduler/dependency/coordinator.go b/scheduler/dependency/coordinator.go deleted file mode 100644 index 654d078771c..00000000000 --- a/scheduler/dependency/coordinator.go +++ /dev/null @@ -1,86 +0,0 @@ -package dependency - -import ( - "context" - "sync" - "time" - - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/nomad" - "github.com/hashicorp/nomad/nomad/structs" - sstructs "github.com/hashicorp/nomad/scheduler/structs" -) - -type dependency struct { - cancelFunc context.CancelFunc - job *structs.Job -} - -type Coordinator struct { - logger hclog.Logger - state sstructs.State - l sync.RWMutex - - dependencies map[string]*dependency - evalBroker *nomad.EvalBroker -} - -func (c *Coordinator) AddDependecy(ctx context.Context, eval *structs.Evaluation) { - - job, err := c.state.JobByID(nil, eval.Namespace, eval.ID) - if err != nil { - c.logger.Error("failed to get job by ID", "error", err) - return - } - - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(10*time.Minute)) - c.dependencies[eval.JobID] = &dependency{ - cancelFunc: cancel, - job: job, - } - - go c.waitForDependency(ctx, eval) -} - -func (c *Coordinator) waitForDependency(ctx context.Context, eval *structs.Evaluation) { - - for { - ws := memdb.NewWatchSet() - dj := []*structs.Job{} - - for _, dep := range c.dependencies[eval.JobID].job.Dependencies { - j, err := c.state.JobByID(ws, eval.Namespace, dep.Job) - if err != nil { - c.logger.Error("failed to get job by ID", "error", err) - } - dj = append(dj, j) - } - - select { - case <-ws.WatchCh(ctx): - ready, err := c.verifyDependency(eval.JobID, dj...) - if err != nil { - c.logger.Error("failed to verify dependency", "error", err) - continue - } - - if ready { - - c.l.Lock() - defer c.l.Unlock() - - c.evalBroker.Enqueue(eval) - delete(c.dependencies, eval.ID) - return - } - - case <-ctx.Done(): - return - } - } -} - -func (c *Coordinator) verifyDependency(dependantJob string, dependeeJob ...*structs.Job) (bool, error) { - return true, nil -} diff --git a/scheduler/feasible/dependencies.go b/scheduler/feasible/dependencies.go deleted file mode 100644 index 6dacde49e7d..00000000000 --- a/scheduler/feasible/dependencies.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright IBM Corp. 2015, 2025 -// SPDX-License-Identifier: BUSL-1.1 - -package feasible - -import ( - log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/nomad/structs" - sstructs "github.com/hashicorp/nomad/scheduler/structs" -) - -// DependencyChecker is a FeasibilityChecker which returns nodes that match a -// given set of dependencies. This is used to filter on job, task group, and task -// dependencies. -type DependencyChecker struct { - log log.Logger - state sstructs.State - job *structs.Job - metrics *structs.AllocMetric - ready bool -} - -// NewDependencyChecker creates a DependencyChecker for a set of dependencies -func NewDependencyChecker(c Context, dependencies []*structs.Dependency) *DependencyChecker { - return &DependencyChecker{ - log: c.Logger().Named("dependency_checker"), - state: c.State(), - ready: true, - metrics: c.Metrics(), - job: c.Plan().Job, - } -} - -func (dc *DependencyChecker) SetDependencies(dependencies []*structs.Dependency) { - for _, dep := range dependencies { - job, err := dc.state.JobByID(nil, dc.job.Namespace, dep.Job) - if err != nil { - dc.log.Error("error looking up dependency job", "dependency_job_id", dep.Job, "error", err) - } - - ready, err := dc.verifyDependency(dc.job.ID, job) - if err != nil { - dc.log.Error("error verify dependency for job", "dependency_job_id", dep.Job, "error", err) - } - - dc.ready = dc.ready && ready - } -} - -func (dc *DependencyChecker) Feasible(option *structs.Node) bool { - if !dc.ready { - dc.metrics.FilterNode(option, "dependency_not_ready") - return false - } - - return true -} - -func (c *DependencyChecker) verifyDependency(dependantJob string, dependeeJob ...*structs.Job) (bool, error) { - return true, nil -} - -type Dependecy struct { - updates <-chan struct{} - pendingJob *structs.Job -} diff --git a/scheduler/feasible/stack.go b/scheduler/feasible/stack.go index 1eaa76c1f3b..765f53ca032 100644 --- a/scheduler/feasible/stack.go +++ b/scheduler/feasible/stack.go @@ -32,6 +32,8 @@ type Stack interface { // Select is used to select a node for the task group Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode + + SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) } type SelectOptions struct { @@ -55,7 +57,6 @@ type GenericStack struct { jobNamespace string jobID string jobConstraint *ConstraintChecker - jobDependencies *DependencyChecker taskGroupDrivers *DriverChecker taskGroupConstraint *ConstraintChecker taskGroupDevices *DeviceChecker @@ -111,7 +112,6 @@ func (s *GenericStack) SetJob(job *structs.Job) { s.jobID = job.ID s.jobConstraint.SetConstraints(job.Constraints) - s.jobDependencies.SetDependencies(job.Dependencies) s.distinctHostsConstraint.SetJob(job) s.distinctPropertyConstraint.SetJob(job) s.binPack.SetJob(job) @@ -408,9 +408,6 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // Attach the job constraints. The job is filled in later. s.jobConstraint = NewConstraintChecker(ctx, nil) - // Attach the job dependencies checker. The job is filled in later. - s.jobDependencies = NewDependencyChecker(ctx, nil) - // Filter on task group drivers first as they are faster s.taskGroupDrivers = NewDriverChecker(ctx, nil) @@ -436,7 +433,7 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // which feasibility checking can be skipped if the computed node class has // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. - jobs := []FeasibilityChecker{s.jobDependencies, s.jobConstraint} + jobs := []FeasibilityChecker{s.jobConstraint} tgs := []FeasibilityChecker{ s.taskGroupDrivers, s.taskGroupConstraint, diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index bdc3cdcb2b9..bba891d1109 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -62,7 +62,7 @@ type GenericScheduler struct { plan *structs.Plan planResult *structs.PlanResult ctx *feasible.EvalContext - stack *feasible.GenericStack + stack feasible.Stack // followUpEvals are evals with WaitUntil set, which are delayed until that time // before being rescheduled @@ -77,7 +77,8 @@ type GenericScheduler struct { } // NewServiceScheduler is a factory function to instantiate a new service scheduler -func NewServiceScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { +func NewServiceScheduler(logger log.Logger, eventsCh chan<- interface{}, + state sstructs.State, planner sstructs.Planner, _ ...sstructs.SchedulerOption) sstructs.Scheduler { s := &GenericScheduler{ logger: logger.Named("service_sched"), eventsCh: eventsCh, @@ -88,18 +89,6 @@ func NewServiceScheduler(logger log.Logger, eventsCh chan<- interface{}, state s return s } -// NewBatchScheduler is a factory function to instantiate a new batch scheduler -func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { - s := &GenericScheduler{ - logger: logger.Named("batch_sched"), - eventsCh: eventsCh, - state: state, - planner: planner, - batch: true, - } - return s -} - // Process is used to handle a single evaluation func (s *GenericScheduler) Process(eval *structs.Evaluation) (err error) { @@ -489,6 +478,10 @@ func (s *GenericScheduler) computePlacements( return err } + if len(nodes) == 0 { + return fmt.Errorf("no nodes available/pending dependencies to place on") + } + var deploymentID string if s.deployment != nil && s.deployment.Active() { deploymentID = s.deployment.ID diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 267827829e0..2c230e5345c 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -8,6 +8,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/scheduler/structs" + sstructs "github.com/hashicorp/nomad/scheduler/structs" ) const ( @@ -27,10 +28,20 @@ var BuiltinSchedulers = map[string]structs.Factory{ "sysbatch": NewSysBatchScheduler, } +func WithDependencyChecker(dependecyChecker DependencyChecker) sstructs.SchedulerOption { + return func(s sstructs.Scheduler) error { + if B, ok := s.(*BatchScheduler); ok { + B.dependencyChecker = dependecyChecker + } + return nil + } +} + // NewScheduler is used to instantiate and return a new scheduler // given the scheduler name, initial state, and planner. func NewScheduler( name string, logger log.Logger, eventsCh chan<- interface{}, state structs.State, planner structs.Planner, + opts ...structs.SchedulerOption, ) (structs.Scheduler, error) { // Lookup the factory function factory, ok := BuiltinSchedulers[name] @@ -39,6 +50,6 @@ func NewScheduler( } // Instantiate the scheduler - sched := factory(logger, eventsCh, state, planner) + sched := factory(logger, eventsCh, state, planner, opts...) return sched, nil } diff --git a/scheduler/scheduler_sysbatch.go b/scheduler/scheduler_sysbatch.go index 1a2646e3f07..dfe3253af80 100644 --- a/scheduler/scheduler_sysbatch.go +++ b/scheduler/scheduler_sysbatch.go @@ -50,7 +50,9 @@ type SysBatchScheduler struct { planAnnotations *structs.PlanAnnotations } -func NewSysBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { +func NewSysBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, + state sstructs.State, planner sstructs.Planner, + _ ...sstructs.SchedulerOption) sstructs.Scheduler { return &SysBatchScheduler{ logger: logger.Named("sysbatch_sched"), eventsCh: eventsCh, diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index b323f111bb3..c07249aa865 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -59,7 +59,9 @@ type SystemScheduler struct { // NewSystemScheduler is a factory function to instantiate a new system // scheduler. -func NewSystemScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner) sstructs.Scheduler { +func NewSystemScheduler(logger log.Logger, eventsCh chan<- interface{}, + state sstructs.State, planner sstructs.Planner, + _ ...sstructs.SchedulerOption) sstructs.Scheduler { return &SystemScheduler{ logger: logger.Named("system_sched"), eventsCh: eventsCh, diff --git a/scheduler/structs/interfaces.go b/scheduler/structs/interfaces.go index 448a7e2d853..8c014eb5a3f 100644 --- a/scheduler/structs/interfaces.go +++ b/scheduler/structs/interfaces.go @@ -16,8 +16,10 @@ import ( * This package contains top-level interfaces used throughout the scheduler. */ +type SchedulerOption func(Scheduler) error + // Factory is used to instantiate a new Scheduler -type Factory func(log.Logger, chan<- interface{}, State, Planner) Scheduler +type Factory func(log.Logger, chan<- interface{}, State, Planner, ...SchedulerOption) Scheduler // Scheduler is the top level instance for a scheduler. A scheduler is // meant to only encapsulate business logic, pushing the various plumbing From 38d785370100527b0cd548c290faa62d5c697b78 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Mon, 8 Jun 2026 19:36:38 +0200 Subject: [PATCH 04/10] connect all the moving parts --- nomad/fsm.go | 19 ++- nomad/server.go | 4 +- scheduler/batch_scheduler.go | 20 ++- .../dependency/coordinator.go | 18 +-- scheduler/feasible/flowchart.md | 123 ------------------ scheduler/generic_sched.go | 18 ++- .../loop_detection/loop_detection.go | 4 + 7 files changed, 49 insertions(+), 157 deletions(-) rename {nomad => scheduler}/dependency/coordinator.go (88%) delete mode 100644 scheduler/feasible/flowchart.md rename {nomad/dependency => scheduler}/loop_detection/loop_detection.go (99%) diff --git a/nomad/fsm.go b/nomad/fsm.go index f1d7a6b0d10..3a43d2a691f 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -767,7 +767,7 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index } if req.Deployment != nil { - // Cancel any previous deployment. + // Cancel any preivous deployment. lastDeployment, err := n.state.LatestDeploymentByJobID(ws, req.Job.Namespace, req.Job.ID) if err != nil { return fmt.Errorf("failed to retrieve latest deployment: %v", err) @@ -1053,17 +1053,15 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt // Unblock evals for the nodes computed node class if the client has // finished running an allocation. for _, alloc := range req.Alloc { - - nodeID := alloc.NodeID - node, err := n.state.NodeByID(ws, nodeID) - if err != nil || node == nil { - n.logger.Error("looking up node failed", "node_id", nodeID, "error", err) - return err - - } - if alloc.ClientStatus == structs.AllocClientStatusComplete || alloc.ClientStatus == structs.AllocClientStatusFailed { + nodeID := alloc.NodeID + node, err := n.state.NodeByID(ws, nodeID) + if err != nil || node == nil { + n.logger.Error("looking up node failed", "node_id", nodeID, "error", err) + return err + + } // Unblock any associated quota quota, err := n.allocQuota(alloc.ID) @@ -1075,7 +1073,6 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt n.blockedEvals.UnblockClassAndQuota(node.ComputedClass, quota, index) n.blockedEvals.UnblockNode(node.ID) } - n.blockedEvals.Unblock(node.ComputedClass, index) } // It's possible that allocs on different nodes were marked unknown in the diff --git a/nomad/server.go b/nomad/server.go index 04c9dfa45a6..f381f1d9981 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -43,8 +43,6 @@ import ( "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/nomad/lib/auth/oidc" "github.com/hashicorp/nomad/nomad/auth" - "github.com/hashicorp/nomad/nomad/dependency" - "github.com/hashicorp/nomad/nomad/dependency/loop_detection" "github.com/hashicorp/nomad/nomad/deploymentwatcher" "github.com/hashicorp/nomad/nomad/drainer" "github.com/hashicorp/nomad/nomad/lock" @@ -55,6 +53,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/nomad/volumewatcher" "github.com/hashicorp/nomad/scheduler" + "github.com/hashicorp/nomad/scheduler/dependency" + "github.com/hashicorp/nomad/scheduler/loop_detection" sstructs "github.com/hashicorp/nomad/scheduler/structs" ) diff --git a/scheduler/batch_scheduler.go b/scheduler/batch_scheduler.go index 73a7720e2cd..a4fb0c3d511 100644 --- a/scheduler/batch_scheduler.go +++ b/scheduler/batch_scheduler.go @@ -1,4 +1,4 @@ -// Copyright IBM Corp. 2015, 2025 +// Copyright IBM Corp. 2015, 2026 // SPDX-License-Identifier: BUSL-1.1 package scheduler @@ -23,7 +23,7 @@ type BatchScheduler struct { // NewBatchScheduler is a factory function to instantiate a new batch scheduler func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, planner sstructs.Planner, opts ...sstructs.SchedulerOption) sstructs.Scheduler { - s := &BatchScheduler{ + bs := &BatchScheduler{ GenericScheduler: GenericScheduler{ logger: logger.Named("batch_sched"), eventsCh: eventsCh, @@ -34,12 +34,20 @@ func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sst } for _, opt := range opts { - opt(s) + opt(bs) } - return s + bs.getAvailableNodes = bs.setNodes + + return bs } -func (s *BatchScheduler) setNodes(job *structs.Job) ([]*structs.Node, map[string]int, error) { - return s.GenericScheduler.setNodes(job) +func (bs *BatchScheduler) setNodes(job *structs.Job) ([]*structs.Node, map[string]int, error) { + if len(job.Dependencies) > 0 { + if err := bs.dependencyChecker.AddDependency(context.Background(), bs.state, bs.eval); err != nil { + return []*structs.Node{}, nil, err + } + } + + return bs.GenericScheduler.setNodes(job) } diff --git a/nomad/dependency/coordinator.go b/scheduler/dependency/coordinator.go similarity index 88% rename from nomad/dependency/coordinator.go rename to scheduler/dependency/coordinator.go index 62dbaad297c..a8b10216ced 100644 --- a/nomad/dependency/coordinator.go +++ b/scheduler/dependency/coordinator.go @@ -1,3 +1,6 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + package dependency import ( @@ -7,18 +10,17 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" sstructs "github.com/hashicorp/nomad/scheduler/structs" ) type evalID = string -type StateStorage interface { - JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) +type evalUnblocker interface { + Unblock(computedClass string, index uint64) chan struct{} } -type LoopDetector interface { +type loopDetector interface { AddNodes(dependantJob string, dependeeJob ...string) error RemoveNode(dependantJob string) error } @@ -34,13 +36,13 @@ type Coordinator struct { l sync.RWMutex dependencies map[evalID]*dependency - loopDetector LoopDetector - blockedEvals *nomad.BlockedEvals + loopDetector loopDetector + blockedEvals evalUnblocker } // TODO: Think how to rebuild out of evals! -func NewCoordinator(logger hclog.Logger, loopDetector LoopDetector, - blockedEvals *nomad.BlockedEvals) *Coordinator { +func NewCoordinator(logger hclog.Logger, loopDetector loopDetector, + blockedEvals evalUnblocker) *Coordinator { return &Coordinator{ logger: logger, dependencies: make(map[evalID]*dependency), diff --git a/scheduler/feasible/flowchart.md b/scheduler/feasible/flowchart.md deleted file mode 100644 index 55f57b349bd..00000000000 --- a/scheduler/feasible/flowchart.md +++ /dev/null @@ -1,123 +0,0 @@ -# Feasible Package Flow - -This package is easiest to understand in two phases: - -- stack construction: build a chain of iterators and checkers -- selection: push one task group through that chain until one node wins - -## 1. Generic stack construction - -```mermaid -flowchart TD - A[Base node iterator] - A --> B[FeasibilityWrapper] - B --> B1[job checks] - B1 --> B1a[DependencyChecker] - B1 --> B1b[ConstraintChecker] - B --> B2[task group checks] - B2 --> B2a[DriverChecker] - B2 --> B2b[ConstraintChecker] - B2 --> B2c[DeviceChecker] - B2 --> B2d[NetworkChecker] - B2 --> B2e[SecretsProviderChecker] - B --> B3[availability checks] - B3 --> B3a[HostVolumeChecker] - B3 --> B3b[CSIVolumeChecker] - B --> C[DistinctHostsIterator] - C --> D[DistinctPropertyIterator] - D --> E[QuotaIterator] - E --> F[FeasibleRankIterator] - F --> G[BinPackIterator] - G --> H[JobAntiAffinityIterator] - H --> I[NodeReschedulingPenaltyIterator] - I --> J[NodeAffinityIterator] - J --> K[SpreadIterator] - K --> L[PreemptionScoringIterator] - L --> M[ScoreNormalizationIterator] - M --> N[LimitIterator] - N --> O[MaxScoreIterator] -``` - -This is the main idea in [scheduler/feasible/stack.go](/Users/juanita.delacuestamorales/go/src/github.com/hashicorp/nomad/scheduler/feasible/stack.go): - -- everything before `FeasibleRankIterator` is still filtering nodes out -- everything after `FeasibleRankIterator` is scoring or selecting among feasible nodes -- `MaxScoreIterator` is what finally chooses the winning node - -## 2. What happens during Select - -```mermaid -flowchart TD - A[Select called with task group and options] --> B{Preferred nodes present} - B -->|yes| C[Try only preferred nodes first] - C --> D{Found a feasible ranked node} - D -->|yes| Z[Return that node] - D -->|no| E[Restore full node set] - B -->|no| F[Reset scores and eval context] - E --> F - - F --> G[Collect task group requirements] - G --> H[drivers] - G --> I[constraints] - G --> J[devices] - G --> K[networks] - G --> L[secrets] - G --> M[volumes] - G --> N[affinities and spread] - - H --> O[Update iterators with current task group] - I --> O - J --> O - K --> O - L --> O - M --> O - N --> O - - O --> P[Begin iterating candidate nodes] - P --> Q{Computed class already known} - Q -->|ineligible| P - Q -->|eligible or unknown| R[Run job level feasibility] - - R --> R1{Dependencies ready} - R1 -->|no| P - R1 -->|yes| R2{Job constraints pass} - R2 -->|no| P - R2 -->|yes| S[Run task group feasibility] - - S --> S1{Drivers constraints devices network secrets pass} - S1 -->|no| P - S1 -->|yes| T[Run availability checks] - - T --> T1{Host volumes and CSI volumes available} - T1 -->|no| P - T1 -->|yes| U[Pass node into ranking pipeline] - - U --> V[Apply distinct host and distinct property rules] - V --> W[Apply quota check] - W --> X[Score node] - X --> X1[binpack] - X --> X2[job anti affinity] - X --> X3[reschedule penalty] - X --> X4[node affinity] - X --> X5[spread] - X --> X6[preemption scoring] - X --> Y[Normalize scores then limit search] - Y --> Z[Return highest scoring node] -``` - -## How to read the mechanism - -1. `SetNodes` chooses the starting population of nodes. In the generic stack it also shuffles them and sets a search limit. -2. `SetJob` pushes job-wide state into the iterators: job constraints, dependencies, distinctness, affinity, spread, quota context, and namespace or job IDs for volume checks. -3. `Select` pushes task-group-specific state into the same chain: drivers, constraints, devices, volumes, network, secrets, and any scoring context. -4. The feasibility wrapper is the hard gate. A node that fails there never reaches ranking. -5. The first important split is filter versus score. Filters answer "can this node run the task group at all". Scorers answer "which feasible node is best". -6. Dependencies are part of the job-level filter stage. If they are not ready, the node is rejected before any later ranking matters. -7. Distinct host, distinct property, and quota still behave like feasibility filters even though they are implemented as iterators later in the chain. -8. The final answer comes from the max-score step after normalization and limit logic. - -## Files to map back to code - -- [scheduler/feasible/stack.go](/Users/juanita.delacuestamorales/go/src/github.com/hashicorp/nomad/scheduler/feasible/stack.go) builds the iterator chain and drives `Select`. -- [scheduler/feasible/feasible.go](/Users/juanita.delacuestamorales/go/src/github.com/hashicorp/nomad/scheduler/feasible/feasible.go) contains the concrete feasibility checks. -- [scheduler/feasible/dependencies.go](/Users/juanita.delacuestamorales/go/src/github.com/hashicorp/nomad/scheduler/feasible/dependencies.go) handles job dependency readiness. diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index bba891d1109..8a6c4d41dfb 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -70,10 +70,11 @@ type GenericScheduler struct { deployment *structs.Deployment - blocked *structs.Evaluation - failedTGAllocs map[string]*structs.AllocMetric - queuedAllocs map[string]int - planAnnotations *structs.PlanAnnotations + blocked *structs.Evaluation + failedTGAllocs map[string]*structs.AllocMetric + queuedAllocs map[string]int + planAnnotations *structs.PlanAnnotations + getAvailableNodes func(job *structs.Job) ([]*structs.Node, map[string]int, error) } // NewServiceScheduler is a factory function to instantiate a new service scheduler @@ -86,6 +87,9 @@ func NewServiceScheduler(logger log.Logger, eventsCh chan<- interface{}, planner: planner, batch: false, } + + s.getAvailableNodes = s.setNodes + return s } @@ -473,7 +477,7 @@ func (s *GenericScheduler) computePlacements( ) error { // Get the base nodes - nodes, byDC, err := s.setNodes(s.job) + nodes, byDC, err := s.getAvailableNodes(s.job) if err != nil { return err } @@ -539,7 +543,7 @@ func (s *GenericScheduler) computePlacements( s.setJob(downgradedJob) if needsToSetNodes(downgradedJob, s.job) { - nodes, byDC, err = s.setNodes(downgradedJob) + nodes, byDC, err = s.getAvailableNodes(downgradedJob) if err != nil { return err } @@ -581,7 +585,7 @@ func (s *GenericScheduler) computePlacements( s.setJob(s.job) if needsToSetNodes(downgradedJob, s.job) { - nodes, byDC, err = s.setNodes(s.job) + nodes, byDC, err = s.getAvailableNodes(s.job) if err != nil { return err } diff --git a/nomad/dependency/loop_detection/loop_detection.go b/scheduler/loop_detection/loop_detection.go similarity index 99% rename from nomad/dependency/loop_detection/loop_detection.go rename to scheduler/loop_detection/loop_detection.go index f6a18f36b3b..74a2f807874 100644 --- a/nomad/dependency/loop_detection/loop_detection.go +++ b/scheduler/loop_detection/loop_detection.go @@ -1,3 +1,6 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + package loop_detection import ( @@ -8,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" ) +// WIP // Loop Detection + Dependency Chain Update Flow // // [Start: incoming dependency edge job -> dependee] From 47ef343207bf60f65f2e22739386875adf1d25b0 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 9 Jun 2026 13:02:36 +0200 Subject: [PATCH 05/10] improve on the loop detection package --- scheduler/dependency/coordinator.go | 4 +- scheduler/loop_detection/README.md | 91 ++++ scheduler/loop_detection/loop_detection.go | 444 +++++++----------- .../loop_detection/loop_detection_test.go | 130 +++++ 4 files changed, 398 insertions(+), 271 deletions(-) create mode 100644 scheduler/loop_detection/README.md create mode 100644 scheduler/loop_detection/loop_detection_test.go diff --git a/scheduler/dependency/coordinator.go b/scheduler/dependency/coordinator.go index a8b10216ced..f594cdfdaec 100644 --- a/scheduler/dependency/coordinator.go +++ b/scheduler/dependency/coordinator.go @@ -113,7 +113,7 @@ func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.Stat select { case <-ws.WatchCh(ctx): - ready, err := c.verifyDependency(c.dependencies[eval.JobID].job, dj...) + ready, err := c.verifyDependencies(c.dependencies[eval.JobID].job, dj...) if err != nil { c.logger.Error("failed to verify dependency", "error", err) continue @@ -133,6 +133,6 @@ func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.Stat } } -func (c *Coordinator) verifyDependency(dependantJob *structs.Job, dependeeJob ...*structs.Job) (bool, error) { +func (c *Coordinator) verifyDependencies(dependantJob *structs.Job, dependeeJob ...*structs.Job) (bool, error) { return true, nil } diff --git a/scheduler/loop_detection/README.md b/scheduler/loop_detection/README.md new file mode 100644 index 00000000000..3c4eed807c2 --- /dev/null +++ b/scheduler/loop_detection/README.md @@ -0,0 +1,91 @@ +# Loop Detection Dependency Graph + +This folder provides a small dependency graph implementation for detecting circular dependencies between jobs. + +Package name: `depgraph` + +## Public API + +```go +type Graph interface { + AddNodes(nodeID string, dependencies ...string) error + RemoveNode(nodeID string) error +} +``` + +Constructor: + +```go +g := depgraph.New() +``` + +## Internal Model + +The implementation keeps both of these structures: + +1. An array of linked lists (`allLists`) for all nodes. +2. A map from node ID to its linked list (`byNode`). + +It also tracks adjacency for efficient checks: + +- `deps`: node -> direct dependencies +- `dependents`: node -> direct dependents + +## Behavior + +### AddNodes + +`AddNodes(nodeID, deps...)` does the following: + +1. Validates IDs are non-empty. +2. Rejects self-dependency (`nodeID` depending on itself). +3. Creates missing nodes on demand. +4. Ignores duplicate dependencies in the same call. +5. Detects cycles before adding each edge. + +Cycle rule: + +- When adding `A -> B`, it checks whether `B` already reaches `A`. +- If yes, the new edge would create a loop and returns an error. + +### RemoveNode + +`RemoveNode(nodeID)` does the following: + +1. Returns `ErrNodeNotFound` if the node does not exist. +2. Returns `ErrNodeIsDependency` if any other node depends on it. +3. Removes the node if it has no dependents. +4. Prunes orphan dependency branches recursively. + +Orphan pruning means if a removed node had dependencies that are no longer required by anyone else, those dependency nodes are also removed. + +## Errors + +- `ErrEmptyNodeID` +- `ErrSelfDependency` +- `ErrNodeNotFound` +- `ErrNodeIsDependency` + +## Example + +```go +g := depgraph.New() + +_ = g.AddNodes("jobA", "jobB", "jobC") +_ = g.AddNodes("jobB", "jobD") + +// Would fail: jobD -> jobA closes a cycle jobA -> jobB -> jobD -> jobA +if err := g.AddNodes("jobD", "jobA"); err != nil { + // handle cycle error +} + +// Would fail while jobA depends on jobB +if err := g.RemoveNode("jobB"); err != nil { + // handle ErrNodeIsDependency +} +``` + +## Tests + +See test cases in `loop_detection_test.go`. +Each test includes an ASCII graph diagram before the test function. diff --git a/scheduler/loop_detection/loop_detection.go b/scheduler/loop_detection/loop_detection.go index 74a2f807874..b20c7d32271 100644 --- a/scheduler/loop_detection/loop_detection.go +++ b/scheduler/loop_detection/loop_detection.go @@ -1,353 +1,259 @@ -// Copyright IBM Corp. 2015, 2026 -// SPDX-License-Identifier: BUSL-1.1 - -package loop_detection +package depgraph import ( + "errors" "fmt" - "sort" "sync" +) - log "github.com/hashicorp/go-hclog" +var ( + ErrEmptyNodeID = errors.New("node id cannot be empty") + ErrSelfDependency = errors.New("node cannot depend on itself") + ErrNodeNotFound = errors.New("node not found") + ErrNodeIsDependency = errors.New("cannot remove node: another node depends on it") ) -// WIP -// Loop Detection + Dependency Chain Update Flow -// -// [Start: incoming dependency edge job -> dependee] -// | -// v -// [Ensure nodes exist in graph index/map] -// | -// v -// [Check whether job or dependee already has chains] -// | -// v -// [Build/lookup candidate chain path(s)] -// | -// v -// [Run cycle/loop detection on path] -// | -// +---------+---------+ -// | | -// v v -// [Loop found? YES] [Loop found? NO] -// | | -// v v -// [Return error/reject] [Add edge job -> dependee] -// | -// v -// [Update node->chains mapping/index] -// | -// v -// [Dependency accepted / continue] -// -// ------------------------------------------------------- -// -// [Event: job becomes unblocked / completed / removed] -// | -// v -// [Remove job node and its outgoing/incoming edges] -// | -// v -// [Recompute/prune affected dependency chains] -// | -// v -// [Update node->chains mapping/index] -// | -// v -// [Done] - -// Detector tracks a directed dependency graph and maintains a chains index -// for each node. Edges are directed as "job -> dependee". -type Detector struct { - logger log.Logger +// Graph is the only public interface. +type Graph interface { + AddNodes(nodeID string, dependencies ...string) error + RemoveNode(nodeID string) error +} +// Store implements Graph. +// Internally it keeps: +// 1) an array of linked lists +// 2) a map[nodeID]*linkedList +type Store struct { mu sync.RWMutex - // deps stores outgoing edges: job -> dependees. - deps map[string]map[string]struct{} - - // reverse stores incoming edges: dependee -> jobs that depend on it. - reverse map[string]map[string]struct{} + allLists []*linkedList + byNode map[string]*linkedList + index map[string]int // nodeID -> position in allLists - // chains stores all root-to-leaf dependency chains for each node. - chains map[string][][]string + // adjacency: node -> dependencies + deps map[string]map[string]struct{} + // reverse adjacency: node -> dependents + dependents map[string]map[string]struct{} +} - // nodes stores pointers to graph nodes by job id. - nodes map[string]*Node +type listNode struct { + id string + next *listNode +} - // graphs stores dependency chain paths as arrays of node pointers. - graphs [][]*Node +type linkedList struct { + head *listNode // head is the owner node + tail *listNode } -// Node represents a graph node and its outgoing dependencies. -type Node struct { - ID string - Dependencies []string +func newLinkedList(owner string) *linkedList { + h := &listNode{id: owner} + return &linkedList{head: h, tail: h} } -// NewDetector creates an empty dependency detector. -func NewDetector(logger log.Logger) *Detector { - return &Detector{ - logger: logger.Named("loop_detector"), - deps: make(map[string]map[string]struct{}), - reverse: make(map[string]map[string]struct{}), - chains: make(map[string][][]string), - nodes: make(map[string]*Node), - graphs: make([][]*Node, 0), +func (l *linkedList) appendUnique(dep string) bool { + if l.head == nil { + l.head = &listNode{id: dep} + l.tail = l.head + return true } + for n := l.head.next; n != nil; n = n.next { + if n.id == dep { + return false + } + } + n := &listNode{id: dep} + l.tail.next = n + l.tail = n + return true } -func (d *Detector) AddNodes(nodeID string, dependencies ...string) error { - if err := d.addNode(Node{ID: nodeID, Dependencies: dependencies}); err != nil { - return fmt.Errorf("failed to add node %q: %v", nodeID, err) +// New creates an empty dependency graph. +func New() *Store { + return &Store{ + allLists: make([]*linkedList, 0), + byNode: make(map[string]*linkedList), + index: make(map[string]int), + deps: make(map[string]map[string]struct{}), + dependents: make(map[string]map[string]struct{}), } - return nil } -// AddNode ensures node exists and adds edges node.ID -> dependency for each -// listed dependency. -// -// If any dependency would create a loop, AddNode returns an error and the -// graph remains unchanged. -func (d *Detector) addNode(node Node) error { - d.mu.Lock() - defer d.mu.Unlock() - - if node.ID == "" { - return fmt.Errorf("node id must be non-empty") - } +// AddNodes adds/updates nodeID with dependencies. +// It prevents circular dependencies. +func (s *Store) AddNodes(nodeID string, dependencies ...string) error { + s.mu.Lock() + defer s.mu.Unlock() - d.ensureNode(node.ID) + if nodeID == "" { + return ErrEmptyNodeID + } + if err := s.ensureNode(nodeID); err != nil { + return err + } - uniqueDeps := make(map[string]struct{}, len(node.Dependencies)) - for _, dep := range node.Dependencies { + seen := make(map[string]struct{}, len(dependencies)) + for _, dep := range dependencies { if dep == "" { - return fmt.Errorf("dependency for %q must be non-empty", node.ID) + return ErrEmptyNodeID } - if dep == node.ID { - return fmt.Errorf("self-dependency is not allowed for %q", node.ID) + if dep == nodeID { + return ErrSelfDependency } - uniqueDeps[dep] = struct{}{} - } - - // Validate all candidate edges first so updates are all-or-nothing. - for dep := range uniqueDeps { - d.ensureNode(dep) - if _, exists := d.deps[node.ID][dep]; exists { + if _, ok := seen[dep]; ok { continue } - if d.pathExists(dep, node.ID) { - return fmt.Errorf("adding dependency %q -> %q creates a loop", node.ID, dep) + seen[dep] = struct{}{} + + if err := s.ensureNode(dep); err != nil { + return err } - } - for dep := range uniqueDeps { - d.deps[node.ID][dep] = struct{}{} - d.reverse[dep][node.ID] = struct{}{} - } + // If dep already reaches nodeID, adding nodeID->dep creates a cycle. + if s.reaches(dep, nodeID) { + return fmt.Errorf("circular dependency detected: %s -> %s would create a loop", nodeID, dep) + } - d.recomputeChainsLocked() - d.syncNodeIndexesLocked() + if _, ok := s.deps[nodeID][dep]; ok { + continue // edge already exists + } + + s.deps[nodeID][dep] = struct{}{} + s.dependents[dep][nodeID] = struct{}{} + s.byNode[nodeID].appendUnique(dep) + } return nil } -// RemoveNode removes the target node and its direct dependees (one hop only). -// Removal is non-cascading. -func (d *Detector) RemoveNode(node string) error { - d.mu.Lock() - defer d.mu.Unlock() +// RemoveNode removes nodeID if no other node depends on it. +// Also prunes orphan dependency branches that become unreferenced. +func (s *Store) RemoveNode(nodeID string) error { + s.mu.Lock() + defer s.mu.Unlock() - if node == "" { - return fmt.Errorf("node id must be non-empty") + if nodeID == "" { + return ErrEmptyNodeID } - - if _, exists := d.deps[node]; !exists { - return nil + if _, ok := s.byNode[nodeID]; !ok { + return ErrNodeNotFound } - - directDependees := make([]string, 0, len(d.deps[node])) - for dependee := range d.deps[node] { - directDependees = append(directDependees, dependee) + if len(s.dependents[nodeID]) > 0 { + return ErrNodeIsDependency } - d.removeSingleNodeLocked(node) - for _, dependee := range directDependees { - d.removeSingleNodeLocked(dependee) + children := keysSet(s.deps[nodeID]) + + // Remove outgoing edges nodeID -> child. + for child := range s.deps[nodeID] { + delete(s.dependents[child], nodeID) } - d.recomputeChainsLocked() - d.syncNodeIndexesLocked() + delete(s.deps, nodeID) + delete(s.dependents, nodeID) + s.removeList(nodeID) + // Remove orphan sub-branches. + for _, child := range children { + s.pruneOrphan(child) + } return nil } -func (d *Detector) ensureNode(node string) { - if _, ok := d.deps[node]; !ok { - d.deps[node] = make(map[string]struct{}) +func (s *Store) ensureNode(nodeID string) error { + if nodeID == "" { + return ErrEmptyNodeID } - if _, ok := d.reverse[node]; !ok { - d.reverse[node] = make(map[string]struct{}) - } - if _, ok := d.nodes[node]; !ok { - d.nodes[node] = &Node{ID: node} + if _, ok := s.byNode[nodeID]; ok { + if _, ok := s.deps[nodeID]; !ok { + s.deps[nodeID] = make(map[string]struct{}) + } + if _, ok := s.dependents[nodeID]; !ok { + s.dependents[nodeID] = make(map[string]struct{}) + } + return nil } + + ll := newLinkedList(nodeID) + s.byNode[nodeID] = ll + s.index[nodeID] = len(s.allLists) + s.allLists = append(s.allLists, ll) + s.deps[nodeID] = make(map[string]struct{}) + s.dependents[nodeID] = make(map[string]struct{}) + return nil } -func (d *Detector) pathExists(from, to string) bool { - if from == to { +// reaches checks if start depends (directly/indirectly) on target. +func (s *Store) reaches(start, target string) bool { + if start == target { return true } - - seen := make(map[string]struct{}) - stack := []string{from} + visited := map[string]struct{}{} + stack := []string{start} for len(stack) > 0 { n := stack[len(stack)-1] stack = stack[:len(stack)-1] - if n == to { - return true - } - if _, ok := seen[n]; ok { + if _, ok := visited[n]; ok { continue } - seen[n] = struct{}{} + visited[n] = struct{}{} - for next := range d.deps[n] { - if _, ok := seen[next]; !ok { - stack = append(stack, next) + for dep := range s.deps[n] { + if dep == target { + return true } + stack = append(stack, dep) } } - return false } -func (d *Detector) recomputeChainsLocked() { - allNodes := make(map[string]struct{}, len(d.deps)+len(d.reverse)) - for n := range d.deps { - allNodes[n] = struct{}{} - } - for n := range d.reverse { - allNodes[n] = struct{}{} - } - - d.chains = make(map[string][][]string, len(allNodes)) - memo := make(map[string][][]string, len(allNodes)) - visiting := make(map[string]bool, len(allNodes)) - - for n := range allNodes { - d.chains[n] = d.buildChainsFromLocked(n, memo, visiting) - } -} - -func (d *Detector) removeSingleNodeLocked(node string) { - // Remove outgoing edges: node -> dependee. - for dependee := range d.deps[node] { - delete(d.reverse[dependee], node) - if len(d.reverse[dependee]) == 0 { - delete(d.reverse, dependee) - } +func (s *Store) pruneOrphan(nodeID string) { + if _, ok := s.byNode[nodeID]; !ok { + return } - - // Remove incoming edges: depender -> node. - for depender := range d.reverse[node] { - delete(d.deps[depender], node) - if len(d.deps[depender]) == 0 { - delete(d.deps, depender) - } + if len(s.dependents[nodeID]) > 0 { + return } - delete(d.deps, node) - delete(d.reverse, node) - delete(d.chains, node) - delete(d.nodes, node) -} - -func (d *Detector) syncNodeIndexesLocked() { - for id := range d.deps { - d.ensureNode(id) - deps := make([]string, 0, len(d.deps[id])) - for dep := range d.deps[id] { - deps = append(deps, dep) - } - sort.Strings(deps) - d.nodes[id].Dependencies = deps - } + deps := s.deps[nodeID] + children := keysSet(deps) - for id := range d.nodes { - if _, ok := d.deps[id]; !ok { - delete(d.nodes, id) - } + for child := range s.deps[nodeID] { + delete(s.dependents[child], nodeID) } + delete(s.deps, nodeID) + delete(s.dependents, nodeID) + s.removeList(nodeID) - graphs := make([][]*Node, 0) - for _, paths := range d.chains { - for _, path := range paths { - nodePath := make([]*Node, 0, len(path)) - for _, id := range path { - n, ok := d.nodes[id] - if !ok { - d.ensureNode(id) - n = d.nodes[id] - } - nodePath = append(nodePath, n) - } - graphs = append(graphs, nodePath) - } + for _, child := range children { + s.pruneOrphan(child) } - d.graphs = graphs } -func (d *Detector) buildChainsFromLocked(node string, memo map[string][][]string, visiting map[string]bool) [][]string { - if cached, ok := memo[node]; ok { - return copyPaths(cached) - } - - if visiting[node] { - // Guard for unexpected cycles. Cycles should not be reachable because - // AddDependency rejects loop-inducing edges. - return [][]string{{node}} +func (s *Store) removeList(nodeID string) { + i, ok := s.index[nodeID] + if !ok { + return } - - visiting[node] = true - defer func() { visiting[node] = false }() - - nextNodes := make([]string, 0, len(d.deps[node])) - for next := range d.deps[node] { - nextNodes = append(nextNodes, next) - } - sort.Strings(nextNodes) - - if len(nextNodes) == 0 { - memo[node] = [][]string{{node}} - return copyPaths(memo[node]) + last := len(s.allLists) - 1 + if i != last { + s.allLists[i] = s.allLists[last] + owner := s.allLists[i].head.id + s.index[owner] = i } - - paths := make([][]string, 0) - for _, next := range nextNodes { - subPaths := d.buildChainsFromLocked(next, memo, visiting) - for _, sub := range subPaths { - path := make([]string, 0, len(sub)+1) - path = append(path, node) - path = append(path, sub...) - paths = append(paths, path) - } - } - - memo[node] = copyPaths(paths) - return copyPaths(paths) + s.allLists[last] = nil + s.allLists = s.allLists[:last] + delete(s.index, nodeID) + delete(s.byNode, nodeID) } -func copyPaths(paths [][]string) [][]string { - out := make([][]string, len(paths)) - for i, p := range paths { - cp := make([]string, len(p)) - copy(cp, p) - out[i] = cp +func keysSet(m map[string]struct{}) []string { + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) } return out } diff --git a/scheduler/loop_detection/loop_detection_test.go b/scheduler/loop_detection/loop_detection_test.go new file mode 100644 index 00000000000..6b13044faa9 --- /dev/null +++ b/scheduler/loop_detection/loop_detection_test.go @@ -0,0 +1,130 @@ +package depgraph + +import ( + "errors" + "testing" +) + +// Test case diagram: +// jobA --> jobB +// | +// +----> jobC +// +// Expected: +// - Add succeeds +// - No cycle error +func TestAddNodesNoCycle(t *testing.T) { + g := New() + + if err := g.AddNodes("jobA", "jobB", "jobC"); err != nil { + t.Fatalf("unexpected error adding non-cyclic deps: %v", err) + } +} + +// Test case diagram: +// jobA --> jobB --> jobC +// ^ | +// +----------------+ +// +// Operation: +// - add edge jobC -> jobA +// +// Expected: +// - cycle detected (error) +func TestAddNodesDetectsCycle(t *testing.T) { + g := New() + + if err := g.AddNodes("jobA", "jobB"); err != nil { + t.Fatalf("setup failed: %v", err) + } + if err := g.AddNodes("jobB", "jobC"); err != nil { + t.Fatalf("setup failed: %v", err) + } + + err := g.AddNodes("jobC", "jobA") + if err == nil { + t.Fatal("expected cycle error, got nil") + } +} + +// Test case diagram: +// jobA --> jobB +// +// Operation: +// - remove jobB +// +// Expected: +// - blocked, because jobA depends on jobB +// - returns ErrNodeIsDependency +func TestRemoveNodeBlockedIfDependedUpon(t *testing.T) { + g := New() + + if err := g.AddNodes("jobA", "jobB"); err != nil { + t.Fatalf("setup failed: %v", err) + } + + err := g.RemoveNode("jobB") + if !errors.Is(err, ErrNodeIsDependency) { + t.Fatalf("expected ErrNodeIsDependency, got: %v", err) + } +} + +// Test case diagram: +// jobA --> jobB --> jobC +// +// Operation: +// - remove jobA +// +// Expected: +// - jobA removed +// - jobB and jobC pruned as orphans (no other dependents) +func TestRemoveNodePrunesOrphanChain(t *testing.T) { + g := New() + + if err := g.AddNodes("jobA", "jobB"); err != nil { + t.Fatalf("setup failed: %v", err) + } + if err := g.AddNodes("jobB", "jobC"); err != nil { + t.Fatalf("setup failed: %v", err) + } + + if err := g.RemoveNode("jobA"); err != nil { + t.Fatalf("unexpected remove error: %v", err) + } + + if err := g.RemoveNode("jobB"); !errors.Is(err, ErrNodeNotFound) { + t.Fatalf("expected ErrNodeNotFound for pruned jobB, got: %v", err) + } + if err := g.RemoveNode("jobC"); !errors.Is(err, ErrNodeNotFound) { + t.Fatalf("expected ErrNodeNotFound for pruned jobC, got: %v", err) + } +} + +// Test case diagram: +// jobX --> jobY +// \\----> jobY (duplicate edge in same call) +// +// Expected: +// - add succeeds +// - duplicate dependency ignored (no error) +func TestAddNodesDuplicateDependenciesIgnored(t *testing.T) { + g := New() + + if err := g.AddNodes("jobX", "jobY", "jobY"); err != nil { + t.Fatalf("unexpected error for duplicate dependency: %v", err) + } +} + +// Test case diagram: +// jobZ --> jobZ (self loop) +// +// Expected: +// - returns ErrSelfDependency +func TestAddNodesSelfDependencyRejected(t *testing.T) { + g := New() + + err := g.AddNodes("jobZ", "jobZ") + if !errors.Is(err, ErrSelfDependency) { + t.Fatalf("expected ErrSelfDependency, got: %v", err) + } +} From 0486bca24928360cdbe23e13a7845775b55e2a1f Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 16 Jun 2026 13:38:17 +0200 Subject: [PATCH 06/10] func: adapt the add dependency function to only add jobs with pending dependecies --- nomad/core_sched.go | 2 +- nomad/server.go | 2 +- nomad/worker_test.go | 8 +- scheduler/batch_scheduler.go | 17 ++-- scheduler/dependency/coordinator.go | 81 ++++++++++++++----- scheduler/generic_sched.go | 4 +- scheduler/loop_detection/loop_detection.go | 19 ++--- .../loop_detection/loop_detection_test.go | 32 +++++--- 8 files changed, 111 insertions(+), 54 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 6cfeae4b38a..8f7f2a191ef 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -42,7 +42,7 @@ type CoreScheduler struct { } // NewCoreScheduler is used to return a new system scheduler instance -func NewCoreScheduler(srv *Server, snap *state.StateSnapshot, planner sstructs.Planner) sstructs.Scheduler { +func NewCoreScheduler(srv *Server, snap *state.StateSnapshot, planner sstructs.Planner, _ ...sstructs.SchedulerOption) sstructs.Scheduler { s := &CoreScheduler{ srv: srv, snap: snap, diff --git a/nomad/server.go b/nomad/server.go index f381f1d9981..1314152cff0 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -409,7 +409,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc // Create the dependency Coordinator depCoordinator := dependency.NewCoordinator(s.logger, - loop_detection.NewDetector(s.logger), s.blockedEvals) + loop_detection.New(s.logger), s.blockedEvals) s.dependencyCoordinator = depCoordinator // Create the RPC handler diff --git a/nomad/worker_test.go b/nomad/worker_test.go index b825638eadc..0b623f44f2a 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -50,11 +50,11 @@ func (n *NoopScheduler) Process(eval *structs.Evaluation) error { func init() { scheduler.BuiltinSchedulers["noop"] = func( - logger log.Logger, eventsCh chan<- any, s sstructs.State, p sstructs.Planner, - ) sstructs.Scheduler { + logger log.Logger, eventsCh chan<- interface{}, state sstructs.State, + planner sstructs.Planner, opts ...sstructs.SchedulerOption) sstructs.Scheduler { n := &NoopScheduler{ - state: s, - planner: p, + state: state, + planner: planner, } return n } diff --git a/scheduler/batch_scheduler.go b/scheduler/batch_scheduler.go index a4fb0c3d511..8c264628d2e 100644 --- a/scheduler/batch_scheduler.go +++ b/scheduler/batch_scheduler.go @@ -4,15 +4,13 @@ package scheduler import ( - "context" - log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" sstructs "github.com/hashicorp/nomad/scheduler/structs" ) type DependencyChecker interface { - AddDependency(ctx context.Context, state sstructs.State, eval *structs.Evaluation) error + CheckDependency(state sstructs.State, job *structs.Job, eval *structs.Evaluation) (bool, error) } type BatchScheduler struct { @@ -43,10 +41,15 @@ func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state sst } func (bs *BatchScheduler) setNodes(job *structs.Job) ([]*structs.Node, map[string]int, error) { - if len(job.Dependencies) > 0 { - if err := bs.dependencyChecker.AddDependency(context.Background(), bs.state, bs.eval); err != nil { - return []*structs.Node{}, nil, err - } + + ready, err := bs.dependencyChecker.CheckDependency(bs.state, job, bs.eval) + if err != nil { + return []*structs.Node{}, nil, err + } + + if !ready { + _, _, byDC, err := readyNodesInDCsAndPool(bs.state, job.Datacenters, job.NodePool) + return []*structs.Node{}, byDC, err } return bs.GenericScheduler.setNodes(job) diff --git a/scheduler/dependency/coordinator.go b/scheduler/dependency/coordinator.go index f594cdfdaec..47ba0b6509f 100644 --- a/scheduler/dependency/coordinator.go +++ b/scheduler/dependency/coordinator.go @@ -5,15 +5,19 @@ package dependency import ( "context" + "errors" "sync" "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" sstructs "github.com/hashicorp/nomad/scheduler/structs" ) +var DefaultTimeout = 10 * time.Minute + type evalID = string type evalUnblocker interface { @@ -32,26 +36,28 @@ type dependency struct { } type Coordinator struct { - logger hclog.Logger - l sync.RWMutex + mainContext context.Context + logger hclog.Logger + l sync.RWMutex dependencies map[evalID]*dependency loopDetector loopDetector blockedEvals evalUnblocker } -// TODO: Think how to rebuild out of evals! +// NewCoordinator does blah blah blah func NewCoordinator(logger hclog.Logger, loopDetector loopDetector, blockedEvals evalUnblocker) *Coordinator { return &Coordinator{ - logger: logger, + mainContext: context.Background(), + logger: logger.Named("dependency-coordinator"), dependencies: make(map[evalID]*dependency), loopDetector: loopDetector, blockedEvals: blockedEvals, } } -func (c *Coordinator) unblockDependencies(eval *structs.Evaluation, dependeeJobs ...*structs.Job) error { +func (c *Coordinator) unblockDependencies(eval *structs.Evaluation, dependeeJobs map[string]*structs.Job) error { for _, job := range dependeeJobs { c.blockedEvals.Unblock(eval.ID, job.JobModifyIndex) @@ -68,12 +74,10 @@ func (c *Coordinator) unblockDependencies(eval *structs.Evaluation, dependeeJobs return nil } -func (c *Coordinator) AddDependency(ctx context.Context, state sstructs.State, eval *structs.Evaluation) error { +func (c *Coordinator) CheckDependency(state sstructs.State, job *structs.Job, eval *structs.Evaluation) (bool, error) { - job, err := state.JobByID(nil, eval.Namespace, eval.ID) - if err != nil { - c.logger.Error("failed to get job by ID", "error", err) - return err + if len(job.Dependencies) == 0 { + return true, nil } djIDs := []string{} @@ -81,9 +85,28 @@ func (c *Coordinator) AddDependency(ctx context.Context, state sstructs.State, e djIDs = append(djIDs, d.Job) } + djs := map[string]*structs.Job{} + for _, jID := range djIDs { + j, err := state.JobByID(nil, job.Namespace, jID) + if err != nil { + c.logger.Error("failed to get job by ID", "error", err) + continue + } + djs[jID] = j + } + + ready, err := c.verifyDependencies(job, djs) + if err != nil { + c.logger.Error("failed to verify dependencies", "error", err) + } + + if ready { + return true, nil + } + c.loopDetector.AddNodes(eval.JobID, djIDs...) - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(10*time.Minute)) + ctx, cancel := context.WithDeadline(c.mainContext, time.Now().Add(DefaultTimeout)) c.dependencies[eval.JobID] = &dependency{ cancelFunc: cancel, job: job, @@ -92,7 +115,7 @@ func (c *Coordinator) AddDependency(ctx context.Context, state sstructs.State, e go c.waitForDependency(ctx, state, eval, djIDs...) - return nil + return false, nil } func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.State, @@ -100,7 +123,7 @@ func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.Stat for { ws := memdb.NewWatchSet() - dj := []*structs.Job{} + dj := map[string]*structs.Job{} for _, jID := range dependeeJobIDs { j, err := state.JobByID(ws, eval.Namespace, jID) @@ -108,19 +131,19 @@ func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.Stat c.logger.Error("failed to get job by ID", "error", err) } - dj = append(dj, j) + dj[jID] = j } select { case <-ws.WatchCh(ctx): - ready, err := c.verifyDependencies(c.dependencies[eval.JobID].job, dj...) + ready, err := c.verifyDependencies(c.dependencies[eval.JobID].job, dj) if err != nil { c.logger.Error("failed to verify dependency", "error", err) continue } if ready { - err := c.unblockDependencies(eval, dj...) + err := c.unblockDependencies(eval, dj) if err != nil { c.logger.Error("failed to unblock job", "error", err) } @@ -133,6 +156,28 @@ func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.Stat } } -func (c *Coordinator) verifyDependencies(dependantJob *structs.Job, dependeeJob ...*structs.Job) (bool, error) { - return true, nil +func (c *Coordinator) verifyDependencies(dependantJob *structs.Job, jobs map[string]*structs.Job) (bool, error) { + var mErr multierror.Error + ready := true + + for _, d := range dependantJob.Dependencies { + job, ok := jobs[d.Job] + + if !ok { + mErr.Errors = append(mErr.Errors, errors.New("unable to check dependency for job: "+d.Job)) + continue + } + + if job.Status != d.Output { + ready = false + break + } + } + + return ready, mErr.ErrorOrNil() +} + +func (c *Coordinator) Stop() { + c.mainContext.Done() + c.dependencies = nil } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 8a6c4d41dfb..8528fab2a62 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -482,9 +482,9 @@ func (s *GenericScheduler) computePlacements( return err } - if len(nodes) == 0 { + /* if len(nodes) == 0 { return fmt.Errorf("no nodes available/pending dependencies to place on") - } + } */ var deploymentID string if s.deployment != nil && s.deployment.Active() { diff --git a/scheduler/loop_detection/loop_detection.go b/scheduler/loop_detection/loop_detection.go index b20c7d32271..4bf757aab94 100644 --- a/scheduler/loop_detection/loop_detection.go +++ b/scheduler/loop_detection/loop_detection.go @@ -1,9 +1,14 @@ -package depgraph +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package loop_detection import ( "errors" "fmt" "sync" + + "github.com/hashicorp/go-hclog" ) var ( @@ -13,18 +18,13 @@ var ( ErrNodeIsDependency = errors.New("cannot remove node: another node depends on it") ) -// Graph is the only public interface. -type Graph interface { - AddNodes(nodeID string, dependencies ...string) error - RemoveNode(nodeID string) error -} - // Store implements Graph. // Internally it keeps: // 1) an array of linked lists // 2) a map[nodeID]*linkedList type Store struct { - mu sync.RWMutex + logger hclog.Logger + mu sync.RWMutex allLists []*linkedList byNode map[string]*linkedList @@ -69,8 +69,9 @@ func (l *linkedList) appendUnique(dep string) bool { } // New creates an empty dependency graph. -func New() *Store { +func New(logger hclog.Logger) *Store { return &Store{ + logger: logger.Named("loop-detection"), allLists: make([]*linkedList, 0), byNode: make(map[string]*linkedList), index: make(map[string]int), diff --git a/scheduler/loop_detection/loop_detection_test.go b/scheduler/loop_detection/loop_detection_test.go index 6b13044faa9..92561847fad 100644 --- a/scheduler/loop_detection/loop_detection_test.go +++ b/scheduler/loop_detection/loop_detection_test.go @@ -1,20 +1,26 @@ -package depgraph +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package loop_detection import ( "errors" "testing" + + "github.com/hashicorp/go-hclog" ) // Test case diagram: // jobA --> jobB -// | -// +----> jobC +// +// | +// +----> jobC // // Expected: // - Add succeeds // - No cycle error func TestAddNodesNoCycle(t *testing.T) { - g := New() + g := New(hclog.NewNullLogger()) if err := g.AddNodes("jobA", "jobB", "jobC"); err != nil { t.Fatalf("unexpected error adding non-cyclic deps: %v", err) @@ -23,8 +29,9 @@ func TestAddNodesNoCycle(t *testing.T) { // Test case diagram: // jobA --> jobB --> jobC -// ^ | -// +----------------+ +// +// ^ | +// +----------------+ // // Operation: // - add edge jobC -> jobA @@ -32,7 +39,7 @@ func TestAddNodesNoCycle(t *testing.T) { // Expected: // - cycle detected (error) func TestAddNodesDetectsCycle(t *testing.T) { - g := New() + g := New(hclog.NewNullLogger()) if err := g.AddNodes("jobA", "jobB"); err != nil { t.Fatalf("setup failed: %v", err) @@ -57,7 +64,7 @@ func TestAddNodesDetectsCycle(t *testing.T) { // - blocked, because jobA depends on jobB // - returns ErrNodeIsDependency func TestRemoveNodeBlockedIfDependedUpon(t *testing.T) { - g := New() + g := New(hclog.NewNullLogger()) if err := g.AddNodes("jobA", "jobB"); err != nil { t.Fatalf("setup failed: %v", err) @@ -79,7 +86,7 @@ func TestRemoveNodeBlockedIfDependedUpon(t *testing.T) { // - jobA removed // - jobB and jobC pruned as orphans (no other dependents) func TestRemoveNodePrunesOrphanChain(t *testing.T) { - g := New() + g := New(hclog.NewNullLogger()) if err := g.AddNodes("jobA", "jobB"); err != nil { t.Fatalf("setup failed: %v", err) @@ -102,13 +109,14 @@ func TestRemoveNodePrunesOrphanChain(t *testing.T) { // Test case diagram: // jobX --> jobY -// \\----> jobY (duplicate edge in same call) +// +// \\----> jobY (duplicate edge in same call) // // Expected: // - add succeeds // - duplicate dependency ignored (no error) func TestAddNodesDuplicateDependenciesIgnored(t *testing.T) { - g := New() + g := New(hclog.NewNullLogger()) if err := g.AddNodes("jobX", "jobY", "jobY"); err != nil { t.Fatalf("unexpected error for duplicate dependency: %v", err) @@ -121,7 +129,7 @@ func TestAddNodesDuplicateDependenciesIgnored(t *testing.T) { // Expected: // - returns ErrSelfDependency func TestAddNodesSelfDependencyRejected(t *testing.T) { - g := New() + g := New(hclog.NewNullLogger()) err := g.AddNodes("jobZ", "jobZ") if !errors.Is(err, ErrSelfDependency) { From fadb959c7bacd973a47b2f04884afabaa3efa50c Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Tue, 16 Jun 2026 17:03:06 +0200 Subject: [PATCH 07/10] func: add a function to check if a job has dependencies --- scheduler/dependency/coordinator.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/scheduler/dependency/coordinator.go b/scheduler/dependency/coordinator.go index 47ba0b6509f..5371fe2ac5e 100644 --- a/scheduler/dependency/coordinator.go +++ b/scheduler/dependency/coordinator.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler/loop_detection" sstructs "github.com/hashicorp/nomad/scheduler/structs" ) @@ -181,3 +182,15 @@ func (c *Coordinator) Stop() { c.mainContext.Done() c.dependencies = nil } + +func (c *Coordinator) HasDependencies(j *structs.Job) (bool, error) { + err := c.loopDetector.RemoveNode(j.ID) + if err != nil { + if errors.Is(err, loop_detection.ErrNodeIsDependency) { + return true, nil + } + return false, err + } + + return false, nil +} From 920e056f5044461ecc79ccadf32435ea35487991 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Thu, 18 Jun 2026 17:12:54 +0200 Subject: [PATCH 08/10] func: add the non dependencies condition to the garbage collector --- nomad/core_sched.go | 22 ++++++++++++++++++- nomad/worker.go | 2 +- scheduler/dependency/coordinator.go | 21 +++++++++++++++--- ...{batch_scheduler.go => scheduler_batch.go} | 0 4 files changed, 40 insertions(+), 5 deletions(-) rename scheduler/{batch_scheduler.go => scheduler_batch.go} (100%) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 8f7f2a191ef..eefb53685e3 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -21,6 +21,10 @@ import ( "golang.org/x/time/rate" ) +type dependencyChecker interface { + HasDependencies(j *structs.Job) (bool, error) +} + // CoreScheduler is a special "scheduler" that is registered // as "_core". It is used to run various administrative work // across the cluster. @@ -39,10 +43,11 @@ type CoreScheduler struct { // (e.g., structs.CoreJobEvalGC) and time.Duration that will be used as GC // threshold value. customThresholdForObject map[string]*time.Duration + dependecyChecker dependencyChecker } // NewCoreScheduler is used to return a new system scheduler instance -func NewCoreScheduler(srv *Server, snap *state.StateSnapshot, planner sstructs.Planner, _ ...sstructs.SchedulerOption) sstructs.Scheduler { +func NewCoreScheduler(srv *Server, snap *state.StateSnapshot, planner sstructs.Planner, opts ...sstructs.SchedulerOption) sstructs.Scheduler { s := &CoreScheduler{ srv: srv, snap: snap, @@ -50,6 +55,11 @@ func NewCoreScheduler(srv *Server, snap *state.StateSnapshot, planner sstructs.P planner: planner, customThresholdForObject: make(map[string]*time.Duration), } + + for _, opt := range opts { + opt(s) + } + return s } @@ -161,6 +171,16 @@ OUTER: continue } + free, err := c.dependecyChecker.HasDependencies(job) + if err != nil { + c.logger.Error("job GC failed to get dependencies for job", "job", job.ID, "error", err) + continue + } + + if free { + continue + } + ws := memdb.NewWatchSet() evals, err := c.snap.EvalsByJob(ws, job.Namespace, job.ID) if err != nil { diff --git a/nomad/worker.go b/nomad/worker.go index 02f1810c394..415dd5b73b9 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -623,7 +623,7 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua // Create the scheduler, or use the special core scheduler var sched sstructs.Scheduler if eval.Type == structs.JobTypeCore { - sched = NewCoreScheduler(w.srv, snap, w) + sched = NewCoreScheduler(w.srv, snap, w, scheduler.WithDependencyChecker(w.srv.dependencyCoordinator)) } else { sched, err = scheduler.NewScheduler(eval.Type, w.logger, w.srv.workersEventCh, snap, w, scheduler.WithDependencyChecker(w.srv.dependencyCoordinator)) diff --git a/scheduler/dependency/coordinator.go b/scheduler/dependency/coordinator.go index 5371fe2ac5e..096027826bc 100644 --- a/scheduler/dependency/coordinator.go +++ b/scheduler/dependency/coordinator.go @@ -163,13 +163,14 @@ func (c *Coordinator) verifyDependencies(dependantJob *structs.Job, jobs map[str for _, d := range dependantJob.Dependencies { job, ok := jobs[d.Job] - if !ok { mErr.Errors = append(mErr.Errors, errors.New("unable to check dependency for job: "+d.Job)) - continue + ready = false + break } - if job.Status != d.Output { + if (job != nil && job.Status != d.Output) || job == nil { + c.logger.Debug("job not preset yet", "jobID", d.Job) ready = false break } @@ -194,3 +195,17 @@ func (c *Coordinator) HasDependencies(j *structs.Job) (bool, error) { return false, nil } + +func (c *Coordinator) Reload(state sstructs.State, evals ...*structs.Evaluation) { + for _, eval := range evals { + job, err := state.JobByID(nil, eval.Namespace, eval.JobID) + if err != nil { + c.logger.Error("failed to get job by ID", "error", err) + continue + } + _, err = c.CheckDependency(state, job, eval) + if err != nil { + c.logger.Error("failed to check dependency", "error", err) + } + } +} diff --git a/scheduler/batch_scheduler.go b/scheduler/scheduler_batch.go similarity index 100% rename from scheduler/batch_scheduler.go rename to scheduler/scheduler_batch.go From e8df51aa386b1c06c350f186cf498b5a975003ef Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Fri, 26 Jun 2026 12:30:22 +0200 Subject: [PATCH 09/10] modify the dependency definition to add multiple jobs and extra params --- api/constraint.go | 117 +++++++++++++++++++--- api/constraint_test.go | 41 ++++++++ command/agent/job_endpoint.go | 34 +++++-- command/agent/job_endpoint_test.go | 21 ++++ jobspec2/hcl_conversions.go | 29 +----- jobspec2/parse_job.go | 14 ++- jobspec2/parse_test.go | 56 ++++++++++- jobspec2/types.config.go | 22 ++++- nomad/structs/dependency_test.go | 76 ++++++++++++++ nomad/structs/structs.go | 148 ++++++++++++++++++++++++++-- scheduler/dependency/coordinator.go | 59 +++++++++-- scheduler/feasible/context.go | 2 +- scheduler/util.go | 5 + scheduler/util_test.go | 16 +++ 14 files changed, 554 insertions(+), 86 deletions(-) create mode 100644 nomad/structs/dependency_test.go diff --git a/api/constraint.go b/api/constraint.go index dac50ab314c..081b1757f75 100644 --- a/api/constraint.go +++ b/api/constraint.go @@ -3,6 +3,11 @@ package api +import ( + "errors" + "fmt" +) + const ( ConstraintDistinctProperty = "distinct_property" ConstraintDistinctHosts = "distinct_hosts" @@ -32,28 +37,76 @@ func NewConstraint(left, operand, right string) *Constraint { } } +type JobDependency struct { + Name string `hcl:"name,optional"` + Status string `hcl:"status,optional"` +} + +// JobDepdendency is kept as an alias for compatibility with callers using the +// legacy misspelled type name. +type JobDepdendency = JobDependency + +func NewJobDependency(name, status string) *JobDependency { + return &JobDependency{ + Name: name, + Status: status, + } +} + +func (d *JobDependency) Canonicalize() { + if d.Status == "" { + d.Status = "completed" + } +} + +func (d *JobDependency) Copy() *JobDependency { + if d == nil { + return nil + } + + copy := *d + return © +} + +func (d *JobDependency) Validate() error { + if d.Name == "" { + return errors.New("dependency job name is required") + } + + if d.Status == "" { + return errors.New("dependency job status is required") + } + + return nil +} + // Dependency is used to serialize a job placement dependency. type Dependency struct { - Name string `hcl:"name,label"` - Output string `hcl:"output,optional"` - Job string `hcl:"job"` + Timeout string `hcl:"timeout,optional"` + ActionOnTimeout string `hcl:"action_on_timeout,optional"` + Jobs []*JobDependency `hcl:"job,block"` } -func NewDependency(name, job, output string) *Dependency { +func NewDependency(timeout, actionOnTimeout string, jobs ...JobDepdendency) *Dependency { + copyJobs := make([]*JobDependency, 0, len(jobs)) + for _, job := range jobs { + copyJobs = append(copyJobs, (&job).Copy()) + } + return &Dependency{ - Name: name, - Job: job, - Output: output, + Timeout: timeout, + Jobs: copyJobs, + ActionOnTimeout: actionOnTimeout, } } func (d *Dependency) Canonicalize() { - if d.Name == "" { - d.Name = d.Job + if d.ActionOnTimeout == "" { + d.ActionOnTimeout = "reject" } - if d.Output == "" { - d.Output = "dead" + for _, job := range d.Jobs { + job.Canonicalize() } } @@ -61,9 +114,45 @@ func (d *Dependency) Copy() *Dependency { if d == nil { return nil } + + jobs := make([]*JobDependency, 0, len(d.Jobs)) + for _, job := range d.Jobs { + jobs = append(jobs, job.Copy()) + } + return &Dependency{ - Job: d.Job, - Output: d.Output, - Name: d.Name, + Timeout: d.Timeout, + ActionOnTimeout: d.ActionOnTimeout, + Jobs: jobs, + } +} + +func (d *Dependency) Validate() error { + if d == nil { + return nil + } + + if d.Timeout == "" { + return errors.New("dependency timeout is required") + } + + if d.ActionOnTimeout == "" { + return errors.New("dependency action_on_timeout is required") + } + + if d.ActionOnTimeout != "reject" { + return fmt.Errorf("invalid dependency action_on_timeout %q", d.ActionOnTimeout) + } + + if len(d.Jobs) == 0 { + return errors.New("dependency requires at least one job block") } + + for _, job := range d.Jobs { + if err := job.Validate(); err != nil { + return err + } + } + + return nil } diff --git a/api/constraint_test.go b/api/constraint_test.go index b408374b6d5..5688d9cc879 100644 --- a/api/constraint_test.go +++ b/api/constraint_test.go @@ -21,3 +21,44 @@ func TestCompose_Constraints(t *testing.T) { } must.Eq(t, expect, c) } + +func TestCompose_Dependencies(t *testing.T) { + testutil.Parallel(t) + + d := NewDependency("10m", "reject", JobDepdendency{Name: "service-123", Status: "completed"}) + d.Canonicalize() + + must.Eq(t, "10m", d.Timeout) + must.Eq(t, "reject", d.ActionOnTimeout) + must.Len(t, 1, d.Jobs) + must.Eq(t, "service-123", d.Jobs[0].Name) + must.Eq(t, "completed", d.Jobs[0].Status) + must.NoError(t, d.Validate()) + + copy := d.Copy() + must.Eq(t, d, copy) + must.True(t, d.Jobs[0] != copy.Jobs[0]) +} + +func TestCompose_Dependencies_DefaultsAndValidation(t *testing.T) { + testutil.Parallel(t) + + d := &Dependency{ + Timeout: "10m", + Jobs: []*JobDependency{{ + Name: "service-123", + }}, + } + d.Canonicalize() + + must.Eq(t, "reject", d.ActionOnTimeout) + must.Eq(t, "completed", d.Jobs[0].Status) + must.NoError(t, d.Validate()) + + bad := &Dependency{ + Timeout: "10m", + ActionOnTimeout: "continue", + Jobs: []*JobDependency{{Name: "service-123", Status: "completed"}}, + } + must.Error(t, bad.Validate()) +} diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 655c00ea1df..93337d22f1b 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -12,6 +12,7 @@ import ( "slices" "strconv" "strings" + "time" "github.com/golang/snappy" "github.com/hashicorp/nomad/acl" @@ -2263,17 +2264,18 @@ func ApiAffinitiesToStructs(in []*api.Affinity) []*structs.Affinity { return out } -func ApiDependenciesToStructs(in []*api.Dependency) []*structs.Dependency { - if in == nil { +func ApiDependenciesToStructs(in []*api.Dependency) *structs.Dependency { + if len(in) == 0 { return nil } - out := make([]*structs.Dependency, len(in)) - for i, dep := range in { - out[i] = ApiDependencyToStructs(dep) + for _, dep := range in { + if dep != nil { + return ApiDependencyToStructs(dep) + } } - return out + return nil } func ApiDependencyToStructs(in *api.Dependency) *structs.Dependency { @@ -2281,10 +2283,24 @@ func ApiDependencyToStructs(in *api.Dependency) *structs.Dependency { return nil } + jobs := make([]*structs.JobDependency, 0, len(in.Jobs)) + for _, j := range in.Jobs { + if j == nil { + continue + } + + jobs = append(jobs, &structs.JobDependency{ + Name: j.Name, + Status: j.Status, + }) + } + + timeout, _ := time.ParseDuration(in.Timeout) + return &structs.Dependency{ - Job: in.Job, - Output: in.Output, - Name: in.Name, + Timeout: timeout, + ActionOnTimeout: in.ActionOnTimeout, + Jobs: jobs, } } diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 9ad273d7b86..dbfd5fcfd91 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -4676,3 +4676,24 @@ func TestConversion_ApiJobVersionTagToStructs(t *testing.T) { must.Eq(t, expected, result) }) } + +func TestConversion_ApiDependencyToStructs(t *testing.T) { + t.Run("nil dependency", func(t *testing.T) { + must.Nil(t, ApiDependencyToStructs(nil)) + }) + + t.Run("maps timeout, action and nested jobs", func(t *testing.T) { + in := &api.Dependency{ + Timeout: "10m", + ActionOnTimeout: "reject", + Jobs: []*api.JobDependency{ + {Name: "service-123", Status: "completed"}, + }, + } + + out := ApiDependencyToStructs(in) + must.Eq(t, 10*time.Minute, out.Timeout) + must.Eq(t, "reject", out.ActionOnTimeout) + must.Eq(t, []*structs.JobDependency{{Name: "service-123", Status: "completed"}}, out.Jobs) + }) +} diff --git a/jobspec2/hcl_conversions.go b/jobspec2/hcl_conversions.go index c20f891def4..434276b0f4d 100644 --- a/jobspec2/hcl_conversions.go +++ b/jobspec2/hcl_conversions.go @@ -262,36 +262,9 @@ func decodeConstraint(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl. return diags } -var dependencySpec = hcldec.ObjectSpec{ - "job": &hcldec.AttrSpec{Name: "job", Type: cty.String, Required: true}, - "output": &hcldec.AttrSpec{Name: "output", Type: cty.String, Required: false}, - "name": &hcldec.AttrSpec{Name: "name", Type: cty.String, Required: false}, -} - func decodeDependency(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics { d := val.(*api.Dependency) - - v, diags := hcldec.Decode(body, dependencySpec, ctx) - if len(diags) != 0 { - return diags - } - - attr := func(name string) string { - a := v.GetAttr(name) - if a.IsNull() { - return "" - } - return a.AsString() - } - - d.Job = attr("job") - d.Output = attr("output") - - if d.Name == "" { - d.Name = attr("name") - } - - return diags + return gohcl.DecodeBody(body, ctx, d) } func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics { diff --git a/jobspec2/parse_job.go b/jobspec2/parse_job.go index 24a00e6170a..f57dbcd3a0a 100644 --- a/jobspec2/parse_job.go +++ b/jobspec2/parse_job.go @@ -109,12 +109,18 @@ func normalizeDependency(d *api.Dependency) { return } - if d.Output == "" { - d.Output = "completed" + if d.ActionOnTimeout == "" { + d.ActionOnTimeout = "reject" } - if d.Name == "" { - d.Name = d.Job + for _, depJob := range d.Jobs { + if depJob == nil { + continue + } + + if depJob.Status == "" { + depJob.Status = "completed" + } } } diff --git a/jobspec2/parse_test.go b/jobspec2/parse_test.go index 0bc591bbaed..4ee5dca5644 100644 --- a/jobspec2/parse_test.go +++ b/jobspec2/parse_test.go @@ -268,8 +268,11 @@ func TestParse_Dependencies(t *testing.T) { job "example" { type = "batch" - dependency "alpha" { - job = "upstream" + dependency { + timeout = "10m" + job { + name = "upstream" + } } group "g" { @@ -290,9 +293,52 @@ job "example" { }) require.NoError(t, err) require.Len(t, out.Dependencies, 1) - require.Equal(t, "alpha", out.Dependencies[0].Name) - require.Equal(t, "upstream", out.Dependencies[0].Job) - require.Equal(t, "completed", out.Dependencies[0].Output) + require.Equal(t, "10m", out.Dependencies[0].Timeout) + require.Equal(t, "reject", out.Dependencies[0].ActionOnTimeout) + require.Len(t, out.Dependencies[0].Jobs, 1) + require.Equal(t, "upstream", out.Dependencies[0].Jobs[0].Name) + require.Equal(t, "completed", out.Dependencies[0].Jobs[0].Status) +} + +func TestParse_Dependencies_OnlyOneBlockAllowed(t *testing.T) { + t.Parallel() + + hcl := ` +job "example" { + type = "batch" + + dependency { + timeout = "10m" + job { + name = "upstream-a" + } + } + + dependency { + timeout = "10m" + job { + name = "upstream-b" + } + } + + group "g" { + task "t" { + driver = "raw_exec" + config { + command = "echo" + } + } + } +} +` + + _, err := ParseWithConfig(&ParseConfig{ + Path: "input.hcl", + Body: []byte(hcl), + AllowFS: true, + }) + require.Error(t, err) + require.Contains(t, err.Error(), "Duplicate dependency block") } func TestParse_FileOperators(t *testing.T) { diff --git a/jobspec2/types.config.go b/jobspec2/types.config.go index c3767811e79..91a23ca424b 100644 --- a/jobspec2/types.config.go +++ b/jobspec2/types.config.go @@ -151,6 +151,7 @@ func (c *jobConfig) decodeTopLevelExtras(content *hcl.BodyContent, ctx *hcl.Eval var diags hcl.Diagnostics var foundVault *hcl.Block + var foundDependency *hcl.Block for _, b := range content.Blocks { switch b.Type { case vaultLabel: @@ -189,12 +190,23 @@ func (c *jobConfig) decodeTopLevelExtras(content *hcl.BodyContent, ctx *hcl.Eval } case dependencyLabel: + if foundDependency != nil { + diags = append(diags, &hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: fmt.Sprintf("Duplicate %s block", b.Type), + Detail: fmt.Sprintf( + "Only one block of type %q is allowed. Previous definition was at %s.", + b.Type, foundDependency.DefRange.String(), + ), + Subject: &b.DefRange, + }) + continue + } + foundDependency = b + d := &api.Dependency{} diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, d)...) - if len(b.Labels) == 1 { - d.Name = b.Labels[0] - c.Dependencies = append(c.Dependencies, d) - } + c.Dependencies = append(c.Dependencies, d) } } @@ -300,7 +312,7 @@ func (c *jobConfig) decodeJob(content *hcl.BodyContent, ctx *hcl.EvalContext) hc {Type: "vault"}, {Type: "secret", LabelNames: []string{"name"}}, {Type: "task", LabelNames: []string{"name"}}, - {Type: "dependency", LabelNames: []string{"name"}}, + {Type: "dependency"}, }, }) diff --git a/nomad/structs/dependency_test.go b/nomad/structs/dependency_test.go new file mode 100644 index 00000000000..14debc7136e --- /dev/null +++ b/nomad/structs/dependency_test.go @@ -0,0 +1,76 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package structs + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" +) + +func TestDependency_CanonicalizeAndValidate(t *testing.T) { + ci.Parallel(t) + + d := &Dependency{ + Timeout: 10 * time.Minute, + Jobs: []*JobDependency{{ + Name: "service-123", + }}, + } + d.Canonicalize() + + must.Eq(t, "reject", d.ActionOnTimeout) + must.Eq(t, "completed", d.Jobs[0].Status) + must.NoError(t, d.Validate()) +} + +func TestDependency_CopyDeep(t *testing.T) { + ci.Parallel(t) + + d := &Dependency{ + Timeout: 10 * time.Minute, + ActionOnTimeout: "reject", + Jobs: []*JobDependency{{ + Name: "service-123", + Status: "completed", + }}, + } + + copy := d.Copy() + must.Eq(t, d, copy) + must.True(t, d.Jobs[0] != copy.Jobs[0]) + + copy.Jobs[0].Status = "running" + must.Eq(t, "completed", d.Jobs[0].Status) +} + +func TestJob_CopyIncludesDependencies(t *testing.T) { + ci.Parallel(t) + + j := &Job{ + ID: "job-id", + Name: "job-name", + Namespace: DefaultNamespace, + Type: JobTypeService, + TaskGroups: []*TaskGroup{{ + Name: "group", + Tasks: []*Task{{Name: "task", Driver: "raw_exec", Config: map[string]interface{}{"command": "/bin/date"}}}, + }}, + Dependencies: &Dependency{ + Timeout: 10 * time.Minute, + ActionOnTimeout: "reject", + Jobs: []*JobDependency{{ + Name: "service-123", + Status: "completed", + }}, + }, + } + + copy := j.Copy() + must.Eq(t, j.Dependencies, copy.Dependencies) + must.True(t, j.Dependencies != copy.Dependencies) + must.True(t, j.Dependencies.Jobs[0] != copy.Dependencies.Jobs[0]) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 19838989eab..2a615272375 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4447,7 +4447,7 @@ type Job struct { // inter-job dependencies, such as "job A cannot start until job B is // running". This can be used to express complex workflows with multiple // jobs. - Dependencies []*Dependency + Dependencies *Dependency // Spread can be specified at the job level to express spreading // allocations across a desired attribute, such as datacenter @@ -4679,6 +4679,10 @@ func (j *Job) Canonicalize() { j.Spreads = nil } + if j.Dependencies != nil { + j.Dependencies.Canonicalize() + } + // Ensure the job is in a namespace. if j.Namespace == "" { j.Namespace = DefaultNamespace @@ -4703,6 +4707,7 @@ func (j *Job) Canonicalize() { if j.Periodic != nil { j.Periodic.Canonicalize() } + } // Copy returns a deep copy of the Job. It is expected that callers use recover. @@ -4716,6 +4721,7 @@ func (j *Job) Copy() *Job { nj.Datacenters = slices.Clone(j.Datacenters) nj.Constraints = CopySliceConstraints(j.Constraints) nj.Affinities = CopySliceAffinities(j.Affinities) + nj.Dependencies = j.Dependencies.Copy() nj.Multiregion = j.Multiregion.Copy() nj.UI = j.UI.Copy() nj.VersionTag = j.VersionTag.Copy() @@ -4783,6 +4789,13 @@ func (j *Job) Validate() error { mErr.Errors = append(mErr.Errors, outer) } } + + if j.Dependencies != nil { + if err := j.Dependencies.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Dependency validation failed: %s", err)) + } + } + if j.Type == JobTypeSystem { if j.Affinities != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have an affinity block")) @@ -10943,43 +10956,158 @@ func (r *RpcError) Error() string { return r.Message } +type JobDependency struct { + Name string + Status string +} + +func (d *JobDependency) Equal(o *JobDependency) bool { + if d == nil || o == nil { + return d == o + } + + return d == o || + d.Name == o.Name && + d.Status == o.Status +} + +func (d *JobDependency) Validate() error { + if d == nil { + return errors.New("dependency job block is required") + } + + if d.Name == "" { + return errors.New("dependency job name is mandatory") + } + + if d.Status == "" { + return errors.New("dependency job status is mandatory") + } + + return nil +} + +func (d *JobDependency) Canonicalize() { + if d == nil { + return + } + + if d.Status == "" { + d.Status = "completed" + } +} + +func (d *JobDependency) String() string { + if d == nil { + return "" + } + + return fmt.Sprintf("%s: %s", d.Name, d.Status) +} + // A Dependency is used to restrict placement options. type Dependency struct { - Name string - Output string - Job string + Timeout time.Duration + ActionOnTimeout string + Jobs []*JobDependency } // Equal checks if two dependencies are equal. func (d *Dependency) Equal(o *Dependency) bool { + if d == nil || o == nil { + return d == o + } + + if len(d.Jobs) != len(o.Jobs) { + return false + } + + jEqual := true + for i := range d.Jobs { + jEqual = jEqual && d.Jobs[i].Equal(o.Jobs[i]) + } + return d == o || - d.Output == o.Output && - d.Job == o.Job + d.Timeout == o.Timeout && + d.ActionOnTimeout == o.ActionOnTimeout && + jEqual } func (d *Dependency) Copy() *Dependency { if d == nil { return nil } + + jobs := make([]*JobDependency, 0, len(d.Jobs)) + for _, job := range d.Jobs { + if job == nil { + jobs = append(jobs, nil) + continue + } + + copy := *job + jobs = append(jobs, ©) + } + return &Dependency{ - Output: d.Output, - Job: d.Job, + Timeout: d.Timeout, + ActionOnTimeout: d.ActionOnTimeout, + Jobs: jobs, } } func (d *Dependency) String() string { - return fmt.Sprintf("%s: %s %s", d.Name, d.Output, d.Job) + jobs := make([]string, 0, len(d.Jobs)) + for _, j := range d.Jobs { + jobs = append(jobs, j.String()) + } + + return fmt.Sprintf("%s %s: %s", d.Timeout, d.ActionOnTimeout, strings.Join(jobs, ", ")) } func (d *Dependency) Validate() error { var mErr multierror.Error - if d.Job == "" { + if d == nil { + return nil + } + + if d.Timeout <= 0 { + mErr.Errors = append(mErr.Errors, errors.New("Missing or invalid timeout in dependency")) + } + + if d.ActionOnTimeout == "" { + mErr.Errors = append(mErr.Errors, errors.New("Missing action_on_timeout in dependency")) + } else if d.ActionOnTimeout != "reject" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Invalid action_on_timeout in dependency: %q", d.ActionOnTimeout)) + } + + if len(d.Jobs) == 0 { mErr.Errors = append(mErr.Errors, errors.New("Missing job in dependency")) } + for idx, job := range d.Jobs { + if err := job.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Dependency job %d validation failed: %v", idx+1, err)) + } + } + return mErr.ErrorOrNil() } +func (d *Dependency) Canonicalize() { + if d == nil { + return + } + + if d.ActionOnTimeout == "" { + d.ActionOnTimeout = "reject" + } + + for _, job := range d.Jobs { + job.Canonicalize() + } +} + // DiffID fulfills the DiffableWithID interface. func (d *Dependency) DiffID() string { return d.String() diff --git a/scheduler/dependency/coordinator.go b/scheduler/dependency/coordinator.go index 096027826bc..c77c3022c1d 100644 --- a/scheduler/dependency/coordinator.go +++ b/scheduler/dependency/coordinator.go @@ -77,13 +77,22 @@ func (c *Coordinator) unblockDependencies(eval *structs.Evaluation, dependeeJobs func (c *Coordinator) CheckDependency(state sstructs.State, job *structs.Job, eval *structs.Evaluation) (bool, error) { - if len(job.Dependencies) == 0 { + if job.Dependencies == nil { return true, nil } - djIDs := []string{} - for _, d := range job.Dependencies { - djIDs = append(djIDs, d.Job) + djSet := map[string]struct{}{} + for _, depJob := range job.Dependencies.Jobs { + if depJob == nil || depJob.Name == "" { + continue + } + + djSet[depJob.Name] = struct{}{} + } + + djIDs := make([]string, 0, len(djSet)) + for jobID := range djSet { + djIDs = append(djIDs, jobID) } djs := map[string]*structs.Job{} @@ -107,7 +116,7 @@ func (c *Coordinator) CheckDependency(state sstructs.State, job *structs.Job, ev c.loopDetector.AddNodes(eval.JobID, djIDs...) - ctx, cancel := context.WithDeadline(c.mainContext, time.Now().Add(DefaultTimeout)) + ctx, cancel := context.WithDeadline(c.mainContext, time.Now().Add(dependencyTimeout(job))) c.dependencies[eval.JobID] = &dependency{ cancelFunc: cancel, job: job, @@ -152,6 +161,7 @@ func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.Stat } case <-ctx.Done(): + c.logger.Debug("dependency timeout reached", "jobID", eval.JobID) return } } @@ -161,16 +171,20 @@ func (c *Coordinator) verifyDependencies(dependantJob *structs.Job, jobs map[str var mErr multierror.Error ready := true - for _, d := range dependantJob.Dependencies { - job, ok := jobs[d.Job] + for _, depJob := range dependantJob.Dependencies.Jobs { + if depJob == nil { + continue + } + + job, ok := jobs[depJob.Name] if !ok { - mErr.Errors = append(mErr.Errors, errors.New("unable to check dependency for job: "+d.Job)) + mErr.Errors = append(mErr.Errors, errors.New("unable to check dependency for job: "+depJob.Name)) ready = false break } - if (job != nil && job.Status != d.Output) || job == nil { - c.logger.Debug("job not preset yet", "jobID", d.Job) + if job == nil || !statusMatches(job.Status, depJob.Status) { + c.logger.Debug("job not preset yet", "jobID", depJob.Name) ready = false break } @@ -179,6 +193,31 @@ func (c *Coordinator) verifyDependencies(dependantJob *structs.Job, jobs map[str return ready, mErr.ErrorOrNil() } +func statusMatches(actual, expected string) bool { + if expected == "" { + return actual == "" + } + + if expected == "completed" { + return actual == structs.JobStatusDead + } + + return actual == expected +} + +func dependencyTimeout(job *structs.Job) time.Duration { + timeout := DefaultTimeout + if job.Dependencies != nil && job.Dependencies.Timeout > 0 { + timeout = job.Dependencies.Timeout + } + + if timeout <= 0 { + return DefaultTimeout + } + + return timeout +} + func (c *Coordinator) Stop() { c.mainContext.Done() c.dependencies = nil diff --git a/scheduler/feasible/context.go b/scheduler/feasible/context.go index 39e8b514988..df7aaefb1ae 100644 --- a/scheduler/feasible/context.go +++ b/scheduler/feasible/context.go @@ -253,7 +253,7 @@ func (e *EvalEligibility) Reset() { func (e *EvalEligibility) SetJob(job *structs.Job) { // Determine whether the job has escaped constraints. e.jobEscaped = len(structs.EscapedConstraints(job.Constraints)) != 0 || - len(job.Dependencies) != 0 + job.Dependencies != nil // Determine the escaped constraints per task group. for _, tg := range job.TaskGroups { diff --git a/scheduler/util.go b/scheduler/util.go index 955da6a658b..93f7e604e5b 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -165,6 +165,11 @@ var same = comparison{modified: false} // tasksUpdated creates a comparison between task groups to see if the tasks, their // drivers, environment variables or config have been modified. func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) comparison { + if (jobA.Dependencies == nil) != (jobB.Dependencies == nil) || + (jobA.Dependencies != nil && !jobA.Dependencies.Equal(jobB.Dependencies)) { + return difference("job dependencies", jobA.Dependencies, jobB.Dependencies) + } + a := jobA.LookupTaskGroup(taskGroup) b := jobB.LookupTaskGroup(taskGroup) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 001bbbd802c..55fc1ad5a92 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -455,6 +455,22 @@ func TestTasksUpdated(t *testing.T) { must.True(t, tasksUpdated(j32, j33, name).modified) + // Change job dependency timeout + j34 := mock.Job() + j34.Dependencies = &structs.Dependency{ + Timeout: 10 * time.Minute, + ActionOnTimeout: "reject", + Jobs: []*structs.JobDependency{{ + Name: "service-123", + Status: "completed", + }}, + } + j35 := j34.Copy() + must.False(t, tasksUpdated(j34, j35, name).modified) + + j35.Dependencies.Timeout = 15 * time.Minute + must.True(t, tasksUpdated(j34, j35, name).modified) + } func TestTasksUpdated_connectServiceUpdated(t *testing.T) { From c3c986b516faa722ca9eeb23396d4725d48c0959 Mon Sep 17 00:00:00 2001 From: Juanadelacuesta <8647634+Juanadelacuesta@users.noreply.github.com> Date: Fri, 26 Jun 2026 13:37:25 +0200 Subject: [PATCH 10/10] fix: aboid sending erros if node is not found in the loop detector --- nomad/core_sched.go | 3 ++- scheduler/dependency/coordinator.go | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index eefb53685e3..ef0155cb74f 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -54,6 +54,7 @@ func NewCoreScheduler(srv *Server, snap *state.StateSnapshot, planner sstructs.P logger: srv.logger.ResetNamed("core.sched"), planner: planner, customThresholdForObject: make(map[string]*time.Duration), + dependecyChecker: srv.dependencyCoordinator, } for _, opt := range opts { @@ -171,7 +172,7 @@ OUTER: continue } - free, err := c.dependecyChecker.HasDependencies(job) + free, err := c.dependecyChecker.HasDependencies(job) if err != nil { c.logger.Error("job GC failed to get dependencies for job", "job", job.ID, "error", err) continue diff --git a/scheduler/dependency/coordinator.go b/scheduler/dependency/coordinator.go index c77c3022c1d..48c706fe0c9 100644 --- a/scheduler/dependency/coordinator.go +++ b/scheduler/dependency/coordinator.go @@ -146,6 +146,7 @@ func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.Stat select { case <-ws.WatchCh(ctx): + ready, err := c.verifyDependencies(c.dependencies[eval.JobID].job, dj) if err != nil { c.logger.Error("failed to verify dependency", "error", err) @@ -161,6 +162,7 @@ func (c *Coordinator) waitForDependency(ctx context.Context, state sstructs.Stat } case <-ctx.Done(): + c.unblockDependencies(eval, dj) c.logger.Debug("dependency timeout reached", "jobID", eval.JobID) return } @@ -229,7 +231,10 @@ func (c *Coordinator) HasDependencies(j *structs.Job) (bool, error) { if errors.Is(err, loop_detection.ErrNodeIsDependency) { return true, nil } - return false, err + + if !errors.Is(err, loop_detection.ErrNodeNotFound) { + return false, err + } } return false, nil