diff --git a/cell/lifecycle.go b/cell/lifecycle.go index 4721ee1..bbf23ab 100644 --- a/cell/lifecycle.go +++ b/cell/lifecycle.go @@ -36,6 +36,14 @@ type HookDescriptiveInterface interface { HookInfo() string } +// PreStopHook marks the hook as a pre-stop hook. This is used by the +// [job.registry] to ensure that the jobs started at runtime are +// stopped before anything else to ensure their dependencies are not +// stopped before the jobs. +type PreStopHook interface { + PreStopHookMarker() +} + // Hook is a pair of start and stop callbacks. Both are optional. // They're paired up to make sure that on failed start all corresponding // stop hooks are executed. @@ -82,12 +90,13 @@ type DefaultLifecycle struct { type augmentedHook struct { HookInterface moduleID FullModuleID + stopped bool } func NewDefaultLifecycle(hooks []HookInterface, numStarted int, logThreshold time.Duration) *DefaultLifecycle { h := make([]augmentedHook, 0, len(hooks)) for _, hook := range hooks { - h = append(h, augmentedHook{hook, nil}) + h = append(h, augmentedHook{hook, nil, false}) } return &DefaultLifecycle{ mu: sync.Mutex{}, @@ -101,7 +110,7 @@ func (lc *DefaultLifecycle) Append(hook HookInterface) { lc.mu.Lock() defer lc.mu.Unlock() - lc.hooks = append(lc.hooks, augmentedHook{hook, nil}) + lc.hooks = append(lc.hooks, augmentedHook{hook, nil, false}) } func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error { @@ -116,6 +125,8 @@ func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error { from := lc.numStarted for i, hook := range lc.hooks[from:] { + lc.hooks[from+i].stopped = false + fnName, exists := getHookFuncName(hook, true) if !exists { @@ -164,15 +175,10 @@ func (lc *DefaultLifecycle) Stop(log *slog.Logger, ctx context.Context) error { defer cancel() var errs error - for ; lc.numStarted > 0; lc.numStarted-- { - if ctx.Err() != nil { - return ctx.Err() - } - hook := lc.hooks[lc.numStarted-1] - + runStopHook := func(hook augmentedHook) { fnName, exists := getHookFuncName(hook, false) if !exists { - continue + return } l := log.With("function", fnName) @@ -195,6 +201,35 @@ func (lc *DefaultLifecycle) Stop(log *slog.Logger, ctx context.Context) error { } } } + + // Stop pre-stop hooks first. We can't update [lc.numStarted] + // since we're skipping hooks, so instead we mark the hook + // as already stopped to retain idempotency. + for i := lc.numStarted - 1; i >= 0; i-- { + if ctx.Err() != nil { + return ctx.Err() + } + hook := lc.hooks[i] + if hook.stopped { + continue + } + if _, ok := hook.HookInterface.(PreStopHook); ok { + runStopHook(hook) + lc.hooks[i].stopped = true + } + } + + // Finally stop the normal hooks. + for i := lc.numStarted - 1; i >= 0; i-- { + if ctx.Err() != nil { + return ctx.Err() + } + hook := lc.hooks[i] + if !hook.stopped { + runStopHook(hook) + } + lc.numStarted-- + } return errs } @@ -212,14 +247,25 @@ func (lc *DefaultLifecycle) PrintHooks(w io.Writer) { } fmt.Fprintf(w, "\nStop hooks:\n\n") - for i := len(lc.hooks) - 1; i >= 0; i-- { - hook := lc.hooks[i] + printHook := func(hook augmentedHook) { fnName, exists := getHookFuncName(hook.HookInterface, false) if !exists { - continue + return } fmt.Fprintf(w, " • %s (%s)\n", fnName, hook.moduleID) } + for i := len(lc.hooks) - 1; i >= 0; i-- { + hook := lc.hooks[i] + if _, ok := hook.HookInterface.(PreStopHook); ok { + printHook(hook) + } + } + for i := len(lc.hooks) - 1; i >= 0; i-- { + hook := lc.hooks[i] + if _, ok := hook.HookInterface.(PreStopHook); !ok { + printHook(hook) + } + } } type augmentedLifecycle struct { @@ -231,7 +277,7 @@ func (lc augmentedLifecycle) Append(hook HookInterface) { lc.mu.Lock() defer lc.mu.Unlock() - lc.hooks = append(lc.hooks, augmentedHook{hook, lc.moduleID}) + lc.hooks = append(lc.hooks, augmentedHook{hook, lc.moduleID, false}) } func getHookFuncName(hook HookInterface, start bool) (name string, hasHook bool) { diff --git a/job/job.go b/job/job.go index 54ae9f6..bbc81e4 100644 --- a/job/job.go +++ b/job/job.go @@ -10,6 +10,7 @@ import ( "log/slog" "runtime/pprof" "sync" + "time" "github.com/cilium/hive" "github.com/cilium/hive/cell" @@ -44,12 +45,20 @@ type registry struct { logger *slog.Logger shutdowner hive.Shutdowner - mu sync.Mutex + // appLifecycle is the main application appLifecycle. Jobs that are + // added before the registry is started are appended here. This ensures + // that the job starting order is interleaved with the start hooks and + // that we don't start the jobs before a dependency's start hook has ran. + appLifecycle cell.Lifecycle + + // runtimeLifecycle is the lifecycle used after registry has started. + runtimeLifecycle jobLifecycle + + // mu protects the fields below + mu sync.Mutex + groups []*group started bool - - lifecycle cell.Lifecycle - dynamicLC *cell.DefaultLifecycle } var _ cell.HookInterface = (*registry)(nil) @@ -60,9 +69,9 @@ func newRegistry( lc cell.Lifecycle, ) Registry { r := ®istry{ - logger: logger, - shutdowner: shutdowner, - lifecycle: lc, + logger: logger, + shutdowner: shutdowner, + appLifecycle: lc, } lc.Append(r) return r @@ -76,7 +85,6 @@ func (c *registry) Start(cell.HookContext) error { return nil } c.started = true - c.dynamicLC = cell.NewDefaultLifecycle(nil, 0, 0) return nil } @@ -87,15 +95,18 @@ func (c *registry) Stop(ctx cell.HookContext) error { return nil } c.started = false - c.dynamicLC.Stop(c.logger, ctx) - return nil + return c.runtimeLifecycle.stop(ctx) } +// PreStopHookMarker tells [cell.DefaultLifecycle] that this +// hook should be stopped before any other hook. +func (c *registry) PreStopHookMarker() {} + func (c *registry) WithLifecycle(lifecycle cell.Lifecycle) Registry { r := ®istry{ - logger: c.logger, - shutdowner: c.shutdowner, - lifecycle: lifecycle, + logger: c.logger, + shutdowner: c.shutdowner, + appLifecycle: lifecycle, } lifecycle.Append(r) return r @@ -136,7 +147,7 @@ func (c *registry) addJobs(health cell.Health, opts options, jobs ...Job) { if !c.started { for _, job := range jobs { - c.lifecycle.Append(&queuedJob{ + c.appLifecycle.Append(&queuedJob{ registry: c, job: job, health: health, @@ -147,15 +158,15 @@ func (c *registry) addJobs(health cell.Health, opts options, jobs ...Job) { } for _, job := range jobs { - qj := &queuedJob{ - registry: c, - job: job, - health: health, - options: opts, - } - c.dynamicLC.Append(qj) - // Start the newly appended job immediately. - c.dynamicLC.Start(c.logger, context.Background()) + c.runtimeLifecycle.insertAndStart( + &queuedJob{ + registry: c, + job: job, + health: health, + options: opts, + runtimeJob: true, + }, + ) } } @@ -180,22 +191,40 @@ type Job interface { } type queuedJob struct { - registry *registry - job Job - health cell.Health - options options - cancel context.CancelFunc - done chan struct{} + registry *registry + job Job + health cell.Health + options options + cancel context.CancelFunc + done chan struct{} + startedAt time.Time + prev *queuedJob + next *queuedJob + runtimeJob bool } // Start implements cell.HookInterface. func (qj *queuedJob) Start(cell.HookContext) error { + qj.startedAt = time.Now() qj.done = make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) qj.cancel = cancel pprof.Do(ctx, qj.options.pprofLabels, func(ctx context.Context) { go func() { - defer close(qj.done) + defer func() { + qj.registry.runtimeLifecycle.remove(qj) + if qj.runtimeJob { + qj.registry.logger.Info("Job stopped", + "job", qj.HookInfo(), + "duration", time.Since(qj.startedAt)) + } + close(qj.done) + }() + if qj.runtimeJob { + // We only log this for runtime jobs since we already have the + // lifecycle logging for the jobs added before starting. + qj.registry.logger.Info("Job started", "job", qj.HookInfo()) + } qj.job.start(ctx, qj.health, qj.options) }() }) @@ -217,7 +246,7 @@ func (qj *queuedJob) Stop(ctx cell.HookContext) error { } } -var _ cell.HookInterface = &queuedJob{} +var _ cell.HookDescriptiveInterface = &queuedJob{} type group struct { registry *registry diff --git a/job/job_test.go b/job/job_test.go index 879cebe..6ed6943 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -5,10 +5,10 @@ package job import ( "context" - "fmt" "log/slog" "runtime" "runtime/pprof" + "slices" "strings" "sync" "sync/atomic" @@ -221,12 +221,10 @@ func TestJobLifecycleOrderingAcrossGroups(t *testing.T) { g2 Group ) - staticStarts := []string{"g1-first", "g1-second", "g2-first", "g1-third"} - expectStarts := append(append([]string{}, staticStarts...), "g2-dynamic", "g1-dynamic") - expectStops := []string{"g1-third", "g2-first", "g1-second", "g1-first", "g1-dynamic", "g2-dynamic"} + started, stopped := &orderCollector{}, &orderCollector{} addJob := func(g Group, name string) { - g.Add(&orderingJob{name: name}) + g.Add(&orderingJob{started: started, stopped: stopped, name: name}) } h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { @@ -239,28 +237,48 @@ func TestJobLifecycleOrderingAcrossGroups(t *testing.T) { addJob(g1, "g1-third") }) - log, logs := newRecordingLogger() + log := hivetest.Logger(t) ctx := context.Background() require.NoError(t, h.Start(log, ctx)) - waitForDetails(t, logs, len(staticStarts)) - startDetails := logs.attrs("detail") - assert.Equal(t, staticStarts, startDetails) + assert.Eventually(t, + func() bool { + return len(started.get()) == 4 + }, + time.Second, 10*time.Millisecond, "expected 4 static jobs to have started") // Add jobs dynamically after the lifecycle has already started. addJob(g2, "g2-dynamic") addJob(g1, "g1-dynamic") - waitForDetails(t, logs, len(expectStarts)) - startDetails = logs.attrs("detail") - assert.Equal(t, expectStarts, startDetails) - logs.clear() + assert.Eventually(t, + func() bool { + return len(started.get()) == 6 + }, + time.Second, 10*time.Millisecond, "expected 4 static jobs + 2 dynamic jobs") + + // While the order of [started.get] is non-deterministic the order in which + // we stop is. + expectedStops := []string{ + "g1-dynamic", + "g2-dynamic", + "g1-third", + "g2-first", + "g1-second", + "g1-first", + } + + // The order in which these goroutines started is non-deterministic, so just + // check that all of them are running as expected + startedJobs := started.get() + slices.Sort(startedJobs) + expectedStarts := slices.Clone(expectedStops) + slices.Sort(expectedStarts) + assert.Equal(t, startedJobs, expectedStarts) require.NoError(t, h.Stop(log, ctx)) - waitForDetails(t, logs, len(expectStops)) - stopDetails := logs.attrs("detail") - assert.Equal(t, expectStops, stopDetails) + assert.Equal(t, expectedStops, stopped.get()) } func TestModuleDecoratedGroup(t *testing.T) { @@ -361,108 +379,33 @@ func Test_sanitizeName(t *testing.T) { } } -type orderingJob struct { - name string -} - -func (oj *orderingJob) start(ctx context.Context, _ cell.Health, _ options) { - <-ctx.Done() -} - -func (oj *orderingJob) info() string { - return oj.name -} - -type logRecord struct { - msg string - attrs map[string]string -} - -type logCollector struct { +type orderCollector struct { mu sync.Mutex - records []logRecord + records []string } -func (lc *logCollector) add(rec logRecord) { - lc.mu.Lock() - defer lc.mu.Unlock() - lc.records = append(lc.records, rec) +func (oc *orderCollector) add(name string) { + oc.mu.Lock() + oc.records = append(oc.records, name) + oc.mu.Unlock() } - -func (lc *logCollector) clear() { - lc.mu.Lock() - defer lc.mu.Unlock() - lc.records = nil +func (oc *orderCollector) get() []string { + oc.mu.Lock() + defer oc.mu.Unlock() + return slices.Clone(oc.records) } -func (lc *logCollector) attrs(attr string) []string { - lc.mu.Lock() - defer lc.mu.Unlock() - - var out []string - for _, rec := range lc.records { - value, ok := rec.attrs[attr] - if !ok { - continue - } - out = append(out, value) - } - return out -} - -type recordingHandler struct { - collector *logCollector - attrs []slog.Attr -} - -func newRecordingLogger() (*slog.Logger, *logCollector) { - collector := &logCollector{} - return slog.New(&recordingHandler{collector: collector}), collector -} - -func (rh *recordingHandler) Enabled(_ context.Context, lvl slog.Level) bool { - return lvl >= slog.LevelInfo -} - -func (rh *recordingHandler) Handle(_ context.Context, r slog.Record) error { - rec := logRecord{ - msg: r.Message, - attrs: make(map[string]string), - } - for _, attr := range rh.attrs { - rec.attrs[attr.Key] = fmt.Sprint(attr.Value) - } - r.Attrs(func(a slog.Attr) bool { - rec.attrs[a.Key] = fmt.Sprint(a.Value) - return true - }) - rh.collector.add(rec) - return nil -} - -func (rh *recordingHandler) WithAttrs(attrs []slog.Attr) slog.Handler { - nrh := &recordingHandler{ - collector: rh.collector, - attrs: append(append([]slog.Attr{}, rh.attrs...), attrs...), - } - return nrh +type orderingJob struct { + started, stopped *orderCollector + name string } -func (rh *recordingHandler) WithGroup(string) slog.Handler { - return &recordingHandler{ - collector: rh.collector, - attrs: append([]slog.Attr{}, rh.attrs...), - } +func (oj *orderingJob) start(ctx context.Context, _ cell.Health, _ options) { + oj.started.add(oj.name) + defer oj.stopped.add(oj.name) + <-ctx.Done() } -func waitForDetails(t *testing.T, logs *logCollector, expectedLen int) { - t.Helper() - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - if len(logs.attrs("detail")) >= expectedLen { - return - } - time.Sleep(tick) - } - t.Fatalf("timeout waiting for logs (expected %d); have %v", expectedLen, logs.attrs("detail")) +func (oj *orderingJob) info() string { + return oj.name } diff --git a/job/lifecycle.go b/job/lifecycle.go new file mode 100644 index 0000000..b89af2b --- /dev/null +++ b/job/lifecycle.go @@ -0,0 +1,80 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package job + +import ( + "context" + "sync" + + "github.com/cilium/hive/cell" +) + +// jobLifecycle tracks jobs started after the [registry] has been started +// (and thus not appended to application lifecycle) and to stop them when +// [registry] stops. Jobs that are stopped earlier are removed from the +// lifecycle. +type jobLifecycle struct { + mu sync.Mutex + + // jobs is a doubly linked-list of started jobs. The head is the latest + // started jobs, e.g. they're in the order they should be stopped. + jobs *queuedJob +} + +func (r *jobLifecycle) insertAndStart(job *queuedJob) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.jobs != nil { + r.jobs.prev = job + } + job.next = r.jobs + r.jobs = job + job.Start(context.Background()) +} + +func (r *jobLifecycle) remove(qj *queuedJob) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.jobs == nil { + qj.prev = nil + qj.next = nil + return + } + + if qj.next != nil { + qj.next.prev = qj.prev + } + if qj == r.jobs { + r.jobs = qj.next + } else if qj.prev != nil { + qj.prev.next = qj.next + } + qj.prev = nil + qj.next = nil +} + +func (r *jobLifecycle) stop(ctx cell.HookContext) error { + // Collect jobs to stop and unlink them. We must stop them without holding the + // lock as [queuedJob.Stop] will try to call [jobLifecycle.remove]. + var jobsToStop []*queuedJob + r.mu.Lock() + var next *queuedJob + for job := r.jobs; job != nil; job = next { + next = job.next + job.next = nil + job.prev = nil + jobsToStop = append(jobsToStop, job) + } + r.jobs = nil + r.mu.Unlock() + for _, job := range jobsToStop { + job.Stop(ctx) + if ctx.Err() != nil { + break + } + } + return ctx.Err() +} diff --git a/job/lifecycle_test.go b/job/lifecycle_test.go new file mode 100644 index 0000000..0dea481 --- /dev/null +++ b/job/lifecycle_test.go @@ -0,0 +1,232 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package job + +import ( + "context" + "io" + "log/slog" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cilium/hive/cell" +) + +func TestJobLifecycleInsertAndRemove(t *testing.T) { + t.Parallel() + + r := ®istry{logger: lifecycleTestLogger()} + first := newBlockingLifecycleJob(t, r, "first", nil) + second := newBlockingLifecycleJob(t, r, "second", nil) + third := newBlockingLifecycleJob(t, r, "third", nil) + defer stopLifecycleJobs(t, first, second, third) + + insertRuntimeLifecycleJobs(r, first, second, third) + waitForLifecycleJobs(t, first, second, third) + + requireRuntimeLifecycleList(t, r, third, second, first) + + removeRuntimeLifecycleJob(r, second) + requireRuntimeLifecycleList(t, r, third, first) + requireRuntimeLifecycleJobUnlinked(t, r, second) + + removeRuntimeLifecycleJob(r, third) + requireRuntimeLifecycleList(t, r, first) + requireRuntimeLifecycleJobUnlinked(t, r, third) + + removeRuntimeLifecycleJob(r, first) + requireRuntimeLifecycleList(t, r) + requireRuntimeLifecycleJobUnlinked(t, r, first) +} + +func TestJobLifecycleRemovesCompletedJobs(t *testing.T) { + t.Parallel() + + r := ®istry{logger: lifecycleTestLogger()} + qj := &queuedJob{ + registry: r, + job: &completingLifecycleJob{started: make(chan struct{})}, + } + + insertRuntimeLifecycleJobs(r, qj) + <-qj.job.(*completingLifecycleJob).started + + require.EventuallyWithT(t, func(c *assert.CollectT) { + r.runtimeLifecycle.mu.Lock() + defer r.runtimeLifecycle.mu.Unlock() + + assert.Nil(c, r.runtimeLifecycle.jobs) + assert.Nil(c, qj.prev) + assert.Nil(c, qj.next) + }, timeout, tick) +} + +func TestJobLifecycleStopStopsJobsInReverseStartOrder(t *testing.T) { + t.Parallel() + + var ( + mu sync.Mutex + stopped []string + ) + recordStop := func(name string) { + mu.Lock() + defer mu.Unlock() + stopped = append(stopped, name) + } + + r := ®istry{logger: lifecycleTestLogger()} + first := newBlockingLifecycleJob(t, r, "first", recordStop) + second := newBlockingLifecycleJob(t, r, "second", recordStop) + third := newBlockingLifecycleJob(t, r, "third", recordStop) + + insertRuntimeLifecycleJobs(r, first, second, third) + waitForLifecycleJobs(t, first, second, third) + + require.NoError(t, stopRuntimeLifecycle(r, context.Background())) + + requireRuntimeLifecycleList(t, r) + + mu.Lock() + defer mu.Unlock() + assert.Equal(t, []string{"third", "second", "first"}, stopped) +} + +func TestJobLifecycleStopReturnsContextError(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + var lifecycle jobLifecycle + assert.ErrorIs(t, lifecycle.stop(ctx), context.Canceled) +} + +type blockingLifecycleJob struct { + name string + started chan struct{} + startOnce sync.Once + onStop func(string) +} + +func newBlockingLifecycleJob(t *testing.T, r *registry, name string, onStop func(string)) *queuedJob { + t.Helper() + + return &queuedJob{ + registry: r, + job: &blockingLifecycleJob{ + name: name, + started: make(chan struct{}), + onStop: onStop, + }, + } +} + +func lifecycleTestLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func (j *blockingLifecycleJob) start(ctx context.Context, _ cell.Health, _ options) { + j.startOnce.Do(func() { + close(j.started) + }) + <-ctx.Done() + if j.onStop != nil { + j.onStop(j.name) + } +} + +func (j *blockingLifecycleJob) info() string { + return j.name +} + +type completingLifecycleJob struct { + started chan struct{} +} + +func (j *completingLifecycleJob) start(context.Context, cell.Health, options) { + close(j.started) +} + +func (j *completingLifecycleJob) info() string { + return "completed" +} + +func waitForLifecycleJobs(t *testing.T, jobs ...*queuedJob) { + t.Helper() + + for _, qj := range jobs { + <-qj.job.(*blockingLifecycleJob).started + } +} + +func stopLifecycleJobs(t *testing.T, jobs ...*queuedJob) { + t.Helper() + + for _, qj := range jobs { + if qj.cancel == nil { + continue + } + require.NoError(t, qj.Stop(context.Background())) + } +} + +func insertRuntimeLifecycleJobs(r *registry, jobs ...*queuedJob) { + for _, qj := range jobs { + r.runtimeLifecycle.insertAndStart(qj) + } +} + +func removeRuntimeLifecycleJob(r *registry, qj *queuedJob) { + r.runtimeLifecycle.remove(qj) +} + +func stopRuntimeLifecycle(r *registry, ctx context.Context) error { + return r.runtimeLifecycle.stop(ctx) +} + +func requireRuntimeLifecycleList(t *testing.T, r *registry, want ...*queuedJob) { + t.Helper() + + r.runtimeLifecycle.mu.Lock() + defer r.runtimeLifecycle.mu.Unlock() + + requireLifecycleList(t, r.runtimeLifecycle.jobs, want...) +} + +func requireRuntimeLifecycleJobUnlinked(t *testing.T, r *registry, qj *queuedJob) { + t.Helper() + + r.runtimeLifecycle.mu.Lock() + defer r.runtimeLifecycle.mu.Unlock() + + assert.Nil(t, qj.prev) + assert.Nil(t, qj.next) +} + +func requireLifecycleList(t *testing.T, head *queuedJob, want ...*queuedJob) { + t.Helper() + + var got []*queuedJob + for qj := head; qj != nil; qj = qj.next { + got = append(got, qj) + } + + require.Len(t, got, len(want)) + for i := range want { + require.Same(t, want[i], got[i]) + if i == 0 { + assert.Nil(t, got[i].prev) + } else { + assert.Same(t, got[i-1], got[i].prev) + } + if i == len(want)-1 { + assert.Nil(t, got[i].next) + } else { + assert.Same(t, got[i+1], got[i].next) + } + } +} diff --git a/job/observer.go b/job/observer.go index 6c757f6..b0ee209 100644 --- a/job/observer.go +++ b/job/observer.go @@ -78,6 +78,7 @@ func (jo *jobObserver[T]) start(ctx context.Context, health cell.Health, options } jo.health = health.NewScope("observer-job-" + jo.name) + defer jo.health.Close() reportTicker := time.NewTicker(10 * time.Second) defer reportTicker.Stop() @@ -85,7 +86,6 @@ func (jo *jobObserver[T]) start(ctx context.Context, health cell.Health, options "name", jo.name, "func", internal.FuncNameAndLocation(jo.fn)) - l.Debug("Observer job started") jo.health.OK("Primed") var msgCount uint64 @@ -106,8 +106,12 @@ func (jo *jobObserver[T]) start(ctx context.Context, health cell.Health, options return } - jo.health.Degraded("observer job errored", err) - l.Error("Observer job errored", "error", err) + msg := fmt.Sprintf("Observer job failed (duration %s)", duration) + jo.health.Degraded(msg, err) + l.Error("Observer job errored", + "error", err, + "duration", duration, + ) if options.metrics != nil { options.metrics.JobError(jo.name, err) @@ -135,10 +139,7 @@ func (jo *jobObserver[T]) start(ctx context.Context, health cell.Health, options <-done - jo.health.Stopped("observer job done") if err != nil && !errors.Is(err, context.Canceled) { l.Error("Observer job stopped with an error", "error", err) - } else { - l.Debug("Observer job stopped") } } diff --git a/job/oneshot.go b/job/oneshot.go index d363320..632c7ef 100644 --- a/job/oneshot.go +++ b/job/oneshot.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "strconv" "time" "github.com/cilium/hive" @@ -112,16 +113,16 @@ func (jos *jobOneShot) start(ctx context.Context, health cell.Health, options op } jos.health = health.NewScope("job-" + jos.name) - defer jos.health.Stopped("one-shot job done") + defer jos.health.Close() l := options.logger.With( "name", jos.name, "func", internal.FuncNameAndLocation(jos.fn)) var err error + var timeout time.Duration for i := 0; jos.retry < 0 || i <= jos.retry; i++ { if i != 0 { - timeout := jos.backoff.Wait() options.logger.Debug("Delaying retry attempt", "backoff", timeout, "retry-count", i, @@ -133,22 +134,40 @@ func (jos *jobOneShot) start(ctx context.Context, health cell.Health, options op } } - l.Debug("Starting one-shot job") - jos.health.OK("Running") start := time.Now() err = jos.fn(ctx, jos.health) + duration := time.Since(start) if options.metrics != nil { - duration := time.Since(start) options.metrics.OneShotRunDuration(jos.name, duration) } - if err == nil { + switch { + case err == nil: + jos.health.OK("Finished (" + duration.String() + ")") + return + case errors.Is(err, context.Canceled) || ctx.Err() != nil: return - } else if !errors.Is(err, context.Canceled) { - jos.health.Degraded("one-shot job errored", err) - l.Error("one-shot job errored", "error", err) + default: + if jos.backoff != nil && (jos.retry < 0 || i < jos.retry) { + timeout = jos.backoff.Wait() + } + retriesRemain := strconv.FormatInt(int64(jos.retry-i), 10) + if jos.retry < 0 { + retriesRemain = "" + } else if jos.retry == 0 { + retriesRemain = "" + } + msg := fmt.Sprintf("Failed (duration %s, retry %d/%s in %s)", duration, i+1, retriesRemain, timeout) + jos.health.Degraded(msg, err) + l.Error("Failed", + "error", err, + "retry", i+1, + "remaining", retriesRemain, + "timeout", timeout, + "duration", duration, + ) if options.metrics != nil { options.metrics.JobError(jos.name, err) } diff --git a/job/timer.go b/job/timer.go index 5da6fb3..a8d5031 100644 --- a/job/timer.go +++ b/job/timer.go @@ -155,6 +155,7 @@ func (jt *jobTimer) start(ctx context.Context, health cell.Health, options optio } jt.health = health.NewScope("timer-job-" + jt.name) + defer jt.health.Close() l := options.logger.With( "name", jt.name, @@ -172,13 +173,11 @@ func (jt *jobTimer) start(ctx context.Context, health cell.Health, options optio triggerChan = jt.trigger.c } - l.Debug("Starting timer job") jt.health.OK("Primed") for { select { case <-ctx.Done(): - jt.health.Stopped("timer job context done") return case <-tickerChan: case <-triggerChan: @@ -202,8 +201,11 @@ func (jt *jobTimer) start(ctx context.Context, health cell.Health, options optio jt.health.OK("OK (" + duration.String() + ")") l.Debug("Timer job finished") } else if !errors.Is(err, context.Canceled) { - jt.health.Degraded("timer job errored", err) - l.Error("Timer job errored", "error", err) + msg := fmt.Sprintf("Timer job failed (duration %s)", duration) + jt.health.Degraded(msg, err) + l.Error("Timer job failed", + "error", err, + "duration", duration) if options.metrics != nil { options.metrics.JobError(jt.name, err)