Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions api/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

package api

import (
"errors"
"fmt"
)

const (
ConstraintDistinctProperty = "distinct_property"
ConstraintDistinctHosts = "distinct_hosts"
Expand Down Expand Up @@ -31,3 +36,123 @@ func NewConstraint(left, operand, right string) *Constraint {
Operand: operand,
}
}

type JobDependency struct {
Name string `hcl:"name,optional"`
Status string `hcl:"status,optional"`
}

// JobDepdendency is kept as an alias for compatibility with callers using the
// legacy misspelled type name.
type JobDepdendency = JobDependency

func NewJobDependency(name, status string) *JobDependency {
return &JobDependency{
Name: name,
Status: status,
}
}

func (d *JobDependency) Canonicalize() {
if d.Status == "" {
d.Status = "completed"
}
}

func (d *JobDependency) Copy() *JobDependency {
if d == nil {
return nil
}

copy := *d
return &copy
}

func (d *JobDependency) Validate() error {
if d.Name == "" {
return errors.New("dependency job name is required")
}

if d.Status == "" {
return errors.New("dependency job status is required")
}

return nil
}

// Dependency is used to serialize a job placement dependency.
type Dependency struct {
Timeout string `hcl:"timeout,optional"`
ActionOnTimeout string `hcl:"action_on_timeout,optional"`
Jobs []*JobDependency `hcl:"job,block"`
}

func NewDependency(timeout, actionOnTimeout string, jobs ...JobDepdendency) *Dependency {
copyJobs := make([]*JobDependency, 0, len(jobs))
for _, job := range jobs {
copyJobs = append(copyJobs, (&job).Copy())
}

return &Dependency{
Timeout: timeout,
Jobs: copyJobs,
ActionOnTimeout: actionOnTimeout,
}
}

func (d *Dependency) Canonicalize() {
if d.ActionOnTimeout == "" {
d.ActionOnTimeout = "reject"
}

for _, job := range d.Jobs {
job.Canonicalize()
}
}

func (d *Dependency) Copy() *Dependency {
if d == nil {
return nil
}

jobs := make([]*JobDependency, 0, len(d.Jobs))
for _, job := range d.Jobs {
jobs = append(jobs, job.Copy())
}

return &Dependency{
Timeout: d.Timeout,
ActionOnTimeout: d.ActionOnTimeout,
Jobs: jobs,
}
}

func (d *Dependency) Validate() error {
if d == nil {
return nil
}

if d.Timeout == "" {
return errors.New("dependency timeout is required")
}

if d.ActionOnTimeout == "" {
return errors.New("dependency action_on_timeout is required")
}

if d.ActionOnTimeout != "reject" {
return fmt.Errorf("invalid dependency action_on_timeout %q", d.ActionOnTimeout)
}

if len(d.Jobs) == 0 {
return errors.New("dependency requires at least one job block")
}

for _, job := range d.Jobs {
if err := job.Validate(); err != nil {
return err
}
}

return nil
}
41 changes: 41 additions & 0 deletions api/constraint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,44 @@ func TestCompose_Constraints(t *testing.T) {
}
must.Eq(t, expect, c)
}

func TestCompose_Dependencies(t *testing.T) {
testutil.Parallel(t)

d := NewDependency("10m", "reject", JobDepdendency{Name: "service-123", Status: "completed"})
d.Canonicalize()

must.Eq(t, "10m", d.Timeout)
must.Eq(t, "reject", d.ActionOnTimeout)
must.Len(t, 1, d.Jobs)
must.Eq(t, "service-123", d.Jobs[0].Name)
must.Eq(t, "completed", d.Jobs[0].Status)
must.NoError(t, d.Validate())

copy := d.Copy()
must.Eq(t, d, copy)
must.True(t, d.Jobs[0] != copy.Jobs[0])
}

func TestCompose_Dependencies_DefaultsAndValidation(t *testing.T) {
testutil.Parallel(t)

d := &Dependency{
Timeout: "10m",
Jobs: []*JobDependency{{
Name: "service-123",
}},
}
d.Canonicalize()

must.Eq(t, "reject", d.ActionOnTimeout)
must.Eq(t, "completed", d.Jobs[0].Status)
must.NoError(t, d.Validate())

bad := &Dependency{
Timeout: "10m",
ActionOnTimeout: "continue",
Jobs: []*JobDependency{{Name: "service-123", Status: "completed"}},
}
must.Error(t, bad.Validate())
}
19 changes: 15 additions & 4 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,22 @@ const (
// For Client configuration, if no region information is given,
// the client node will default to be part of the GlobalRegion.
GlobalRegion = "global"
)

const (
// RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by
// enforcing the job modify index during registers.
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
)

const (
// JobPeriodicLaunchSuffix is the string appended to the periodic jobs ID
// when launching derived instances of it.
JobPeriodicLaunchSuffix = "/periodic-"

// JobDispatchLaunchSuffix is the string appended to the parameterized job's ID
// when dispatching instances of it.
JobDispatchLaunchSuffix = "/dispatch-"

JobStatusPending = "pending" // Pending means the job is waiting on scheduling
JobStatusRunning = "running" // Running means the job has non-terminal allocations
JobStatusDead = "dead" // Dead means all evaluation's and allocations are terminal
)

// Jobs is used to access the job-specific endpoints.
Expand Down Expand Up @@ -1115,6 +1115,7 @@ type Job struct {
Datacenters []string `hcl:"datacenters,optional"`
NodePool *string `mapstructure:"node_pool" hcl:"node_pool,optional"`
Constraints []*Constraint `hcl:"constraint,block"`
Dependencies []*Dependency `hcl:"dependency,block"`
Affinities []*Affinity `hcl:"affinity,block"`
TaskGroups []*TaskGroup `hcl:"group,block"`
Update *UpdateStrategy `hcl:"update,block"`
Expand Down Expand Up @@ -1247,6 +1248,10 @@ func (j *Job) Canonicalize() {
a.Canonicalize()
}

for _, d := range j.Dependencies {
d.Canonicalize()
}

if j.UI != nil {
j.UI.Canonicalize()
}
Expand Down Expand Up @@ -1399,6 +1404,12 @@ func (j *Job) Constrain(c *Constraint) *Job {
return j
}

// Depend is used to add a dependency to a job.
func (j *Job) Depend(d *Dependency) *Job {
j.Dependencies = append(j.Dependencies, d)
return j
}

// AddAffinity is used to add an affinity to a job.
func (j *Job) AddAffinity(a *Affinity) *Job {
j.Affinities = append(j.Affinities, a)
Expand Down
42 changes: 42 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"slices"
"strconv"
"strings"
"time"

"github.com/golang/snappy"
"github.com/hashicorp/nomad/acl"
Expand Down Expand Up @@ -1145,6 +1146,7 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
Version: *job.Version,
Constraints: ApiConstraintsToStructs(job.Constraints),
Affinities: ApiAffinitiesToStructs(job.Affinities),
Dependencies: ApiDependenciesToStructs(job.Dependencies),
UI: ApiJobUIConfigToStructs(job.UI),
VersionTag: ApiJobVersionTagToStructs(job.VersionTag),
}
Expand Down Expand Up @@ -2262,6 +2264,46 @@ func ApiAffinitiesToStructs(in []*api.Affinity) []*structs.Affinity {
return out
}

func ApiDependenciesToStructs(in []*api.Dependency) *structs.Dependency {
if len(in) == 0 {
return nil
}

for _, dep := range in {
if dep != nil {
return ApiDependencyToStructs(dep)
}
}

return nil
}

func ApiDependencyToStructs(in *api.Dependency) *structs.Dependency {
if in == nil {
return nil
}

jobs := make([]*structs.JobDependency, 0, len(in.Jobs))
for _, j := range in.Jobs {
if j == nil {
continue
}

jobs = append(jobs, &structs.JobDependency{
Name: j.Name,
Status: j.Status,
})
}

timeout, _ := time.ParseDuration(in.Timeout)

return &structs.Dependency{
Timeout: timeout,
ActionOnTimeout: in.ActionOnTimeout,
Jobs: jobs,
}
}

func ApiJobUIConfigToStructs(jobUI *api.JobUIConfig) *structs.JobUIConfig {
if jobUI == nil {
return nil
Expand Down
21 changes: 21 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4676,3 +4676,24 @@ func TestConversion_ApiJobVersionTagToStructs(t *testing.T) {
must.Eq(t, expected, result)
})
}

func TestConversion_ApiDependencyToStructs(t *testing.T) {
t.Run("nil dependency", func(t *testing.T) {
must.Nil(t, ApiDependencyToStructs(nil))
})

t.Run("maps timeout, action and nested jobs", func(t *testing.T) {
in := &api.Dependency{
Timeout: "10m",
ActionOnTimeout: "reject",
Jobs: []*api.JobDependency{
{Name: "service-123", Status: "completed"},
},
}

out := ApiDependencyToStructs(in)
must.Eq(t, 10*time.Minute, out.Timeout)
must.Eq(t, "reject", out.ActionOnTimeout)
must.Eq(t, []*structs.JobDependency{{Name: "service-123", Status: "completed"}}, out.Jobs)
})
}
6 changes: 6 additions & 0 deletions jobspec2/hcl_conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func newHCLDecoder() *gohcl.Decoder {
// custom nomad types
decoder.RegisterBlockDecoder(reflect.TypeOf(api.Affinity{}), decodeAffinity)
decoder.RegisterBlockDecoder(reflect.TypeOf(api.Constraint{}), decodeConstraint)
decoder.RegisterBlockDecoder(reflect.TypeOf(api.Dependency{}), decodeDependency)

return decoder
}
Expand Down Expand Up @@ -261,6 +262,11 @@ func decodeConstraint(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.
return diags
}

func decodeDependency(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics {
d := val.(*api.Dependency)
return gohcl.DecodeBody(body, ctx, d)
}

func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics {
tg := val.(*api.TaskGroup)

Expand Down
28 changes: 28 additions & 0 deletions jobspec2/parse_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,16 @@ func normalizeJob(jc *jobConfig) {
j.Periodic.SpecType = &v
}

if len(j.Dependencies) == 0 && len(jc.Dependencies) != 0 {
j.Dependencies = jc.Dependencies
}

normalizeVault(jc.Vault)

for _, d := range j.Dependencies {
normalizeDependency(d)
}

if len(jc.Tasks) != 0 {
alone := make([]*api.TaskGroup, 0, len(jc.Tasks))
for _, t := range jc.Tasks {
Expand Down Expand Up @@ -96,6 +104,26 @@ func normalizeVault(v *api.Vault) {
}
}

func normalizeDependency(d *api.Dependency) {
if d == nil {
return
}

if d.ActionOnTimeout == "" {
d.ActionOnTimeout = "reject"
}

for _, depJob := range d.Jobs {
if depJob == nil {
continue
}

if depJob.Status == "" {
depJob.Status = "completed"
}
}
}

func normalizeNetworkPorts(networks []*api.NetworkResource) {
if networks == nil {
return
Expand Down
Loading
Loading