From 712b6cbcc36c9303add815954418fd6550948fea Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Thu, 21 May 2026 11:45:57 +0200 Subject: [PATCH 1/3] job: Close the job health reporters To support huge amounts of jobs that start and stop during the runtime close the job health scopes instead of marking them stopped to avoid accumulating an ever growing number of health entries. Signed-off-by: Jussi Maki --- job/observer.go | 5 +---- job/oneshot.go | 4 +--- job/timer.go | 3 +-- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/job/observer.go b/job/observer.go index 6c757f6..d093216 100644 --- a/job/observer.go +++ b/job/observer.go @@ -78,6 +78,7 @@ func (jo *jobObserver[T]) start(ctx context.Context, health cell.Health, options } jo.health = health.NewScope("observer-job-" + jo.name) + defer jo.health.Close() reportTicker := time.NewTicker(10 * time.Second) defer reportTicker.Stop() @@ -85,7 +86,6 @@ func (jo *jobObserver[T]) start(ctx context.Context, health cell.Health, options "name", jo.name, "func", internal.FuncNameAndLocation(jo.fn)) - l.Debug("Observer job started") jo.health.OK("Primed") var msgCount uint64 @@ -135,10 +135,7 @@ func (jo *jobObserver[T]) start(ctx context.Context, health cell.Health, options <-done - jo.health.Stopped("observer job done") if err != nil && !errors.Is(err, context.Canceled) { l.Error("Observer job stopped with an error", "error", err) - } else { - l.Debug("Observer job stopped") } } diff --git a/job/oneshot.go b/job/oneshot.go index d363320..c0cadcb 100644 --- a/job/oneshot.go +++ b/job/oneshot.go @@ -112,7 +112,7 @@ func (jos *jobOneShot) start(ctx context.Context, health cell.Health, options op } jos.health = health.NewScope("job-" + jos.name) - defer jos.health.Stopped("one-shot job done") + defer jos.health.Close() l := options.logger.With( "name", jos.name, @@ -133,8 +133,6 @@ func (jos *jobOneShot) start(ctx context.Context, health cell.Health, options op } } - l.Debug("Starting one-shot job") - jos.health.OK("Running") start := time.Now() err = jos.fn(ctx, jos.health) diff --git a/job/timer.go b/job/timer.go index 5da6fb3..7043b31 100644 --- a/job/timer.go +++ b/job/timer.go @@ -155,6 +155,7 @@ func (jt *jobTimer) start(ctx context.Context, health cell.Health, options optio } jt.health = health.NewScope("timer-job-" + jt.name) + defer jt.health.Close() l := options.logger.With( "name", jt.name, @@ -172,13 +173,11 @@ func (jt *jobTimer) start(ctx context.Context, health cell.Health, options optio triggerChan = jt.trigger.c } - l.Debug("Starting timer job") jt.health.OK("Primed") for { select { case <-ctx.Done(): - jt.health.Stopped("timer job context done") return case <-tickerChan: case <-triggerChan: From f61f9d2711bc7587e5f9e04127ae0a272983060d Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Thu, 21 May 2026 11:47:36 +0200 Subject: [PATCH 2/3] cell: Add support for pre-stop hooks The runtime lifecycle of the job registry is being stopped too late which leads to jobs started at runtime to being stopped after all the other stop hooks. This may lead to a dependency of a job being stopped before the job. To fix this add support for "pre-stop" hooks that are called before other stop hooks. Signed-off-by: Jussi Maki --- cell/lifecycle.go | 72 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 13 deletions(-) diff --git a/cell/lifecycle.go b/cell/lifecycle.go index 4721ee1..bbf23ab 100644 --- a/cell/lifecycle.go +++ b/cell/lifecycle.go @@ -36,6 +36,14 @@ type HookDescriptiveInterface interface { HookInfo() string } +// PreStopHook marks the hook as a pre-stop hook. This is used by the +// [job.registry] to ensure that the jobs started at runtime are +// stopped before anything else to ensure their dependencies are not +// stopped before the jobs. +type PreStopHook interface { + PreStopHookMarker() +} + // Hook is a pair of start and stop callbacks. Both are optional. // They're paired up to make sure that on failed start all corresponding // stop hooks are executed. @@ -82,12 +90,13 @@ type DefaultLifecycle struct { type augmentedHook struct { HookInterface moduleID FullModuleID + stopped bool } func NewDefaultLifecycle(hooks []HookInterface, numStarted int, logThreshold time.Duration) *DefaultLifecycle { h := make([]augmentedHook, 0, len(hooks)) for _, hook := range hooks { - h = append(h, augmentedHook{hook, nil}) + h = append(h, augmentedHook{hook, nil, false}) } return &DefaultLifecycle{ mu: sync.Mutex{}, @@ -101,7 +110,7 @@ func (lc *DefaultLifecycle) Append(hook HookInterface) { lc.mu.Lock() defer lc.mu.Unlock() - lc.hooks = append(lc.hooks, augmentedHook{hook, nil}) + lc.hooks = append(lc.hooks, augmentedHook{hook, nil, false}) } func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error { @@ -116,6 +125,8 @@ func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error { from := lc.numStarted for i, hook := range lc.hooks[from:] { + lc.hooks[from+i].stopped = false + fnName, exists := getHookFuncName(hook, true) if !exists { @@ -164,15 +175,10 @@ func (lc *DefaultLifecycle) Stop(log *slog.Logger, ctx context.Context) error { defer cancel() var errs error - for ; lc.numStarted > 0; lc.numStarted-- { - if ctx.Err() != nil { - return ctx.Err() - } - hook := lc.hooks[lc.numStarted-1] - + runStopHook := func(hook augmentedHook) { fnName, exists := getHookFuncName(hook, false) if !exists { - continue + return } l := log.With("function", fnName) @@ -195,6 +201,35 @@ func (lc *DefaultLifecycle) Stop(log *slog.Logger, ctx context.Context) error { } } } + + // Stop pre-stop hooks first. We can't update [lc.numStarted] + // since we're skipping hooks, so instead we mark the hook + // as already stopped to retain idempotency. + for i := lc.numStarted - 1; i >= 0; i-- { + if ctx.Err() != nil { + return ctx.Err() + } + hook := lc.hooks[i] + if hook.stopped { + continue + } + if _, ok := hook.HookInterface.(PreStopHook); ok { + runStopHook(hook) + lc.hooks[i].stopped = true + } + } + + // Finally stop the normal hooks. + for i := lc.numStarted - 1; i >= 0; i-- { + if ctx.Err() != nil { + return ctx.Err() + } + hook := lc.hooks[i] + if !hook.stopped { + runStopHook(hook) + } + lc.numStarted-- + } return errs } @@ -212,14 +247,25 @@ func (lc *DefaultLifecycle) PrintHooks(w io.Writer) { } fmt.Fprintf(w, "\nStop hooks:\n\n") - for i := len(lc.hooks) - 1; i >= 0; i-- { - hook := lc.hooks[i] + printHook := func(hook augmentedHook) { fnName, exists := getHookFuncName(hook.HookInterface, false) if !exists { - continue + return } fmt.Fprintf(w, " • %s (%s)\n", fnName, hook.moduleID) } + for i := len(lc.hooks) - 1; i >= 0; i-- { + hook := lc.hooks[i] + if _, ok := hook.HookInterface.(PreStopHook); ok { + printHook(hook) + } + } + for i := len(lc.hooks) - 1; i >= 0; i-- { + hook := lc.hooks[i] + if _, ok := hook.HookInterface.(PreStopHook); !ok { + printHook(hook) + } + } } type augmentedLifecycle struct { @@ -231,7 +277,7 @@ func (lc augmentedLifecycle) Append(hook HookInterface) { lc.mu.Lock() defer lc.mu.Unlock() - lc.hooks = append(lc.hooks, augmentedHook{hook, lc.moduleID}) + lc.hooks = append(lc.hooks, augmentedHook{hook, lc.moduleID, false}) } func getHookFuncName(hook HookInterface, start bool) (name string, hasHook bool) { From ff7aa60bd325c82ad298b013277b5630f6c1ad3b Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Thu, 21 May 2026 11:49:09 +0200 Subject: [PATCH 3/3] job: Support removing jobs from runtime lifecycle To support large numbers of jobs started and stopped at runtime we need to avoid accumulating state related to them. Earlier commit already fixed the accumulating of health entries by closing the health scope instead of marking it stopped. We were additionally appending each "queuedJob" to the "dynamicLC" and never removing from it. This introduces a new job-specific lifecycle (jobLifecycle) that maintains a linked-list of jobs started at runtime with support for removing from this list when a job stops before the application stops. Signed-off-by: Jussi Maki --- job/job.go | 91 +++++++++++------ job/job_test.go | 163 ++++++++++------------------- job/lifecycle.go | 80 +++++++++++++++ job/lifecycle_test.go | 232 ++++++++++++++++++++++++++++++++++++++++++ job/observer.go | 8 +- job/oneshot.go | 33 ++++-- job/timer.go | 7 +- 7 files changed, 463 insertions(+), 151 deletions(-) create mode 100644 job/lifecycle.go create mode 100644 job/lifecycle_test.go diff --git a/job/job.go b/job/job.go index 54ae9f6..bbc81e4 100644 --- a/job/job.go +++ b/job/job.go @@ -10,6 +10,7 @@ import ( "log/slog" "runtime/pprof" "sync" + "time" "github.com/cilium/hive" "github.com/cilium/hive/cell" @@ -44,12 +45,20 @@ type registry struct { logger *slog.Logger shutdowner hive.Shutdowner - mu sync.Mutex + // appLifecycle is the main application appLifecycle. Jobs that are + // added before the registry is started are appended here. This ensures + // that the job starting order is interleaved with the start hooks and + // that we don't start the jobs before a dependency's start hook has ran. + appLifecycle cell.Lifecycle + + // runtimeLifecycle is the lifecycle used after registry has started. + runtimeLifecycle jobLifecycle + + // mu protects the fields below + mu sync.Mutex + groups []*group started bool - - lifecycle cell.Lifecycle - dynamicLC *cell.DefaultLifecycle } var _ cell.HookInterface = (*registry)(nil) @@ -60,9 +69,9 @@ func newRegistry( lc cell.Lifecycle, ) Registry { r := ®istry{ - logger: logger, - shutdowner: shutdowner, - lifecycle: lc, + logger: logger, + shutdowner: shutdowner, + appLifecycle: lc, } lc.Append(r) return r @@ -76,7 +85,6 @@ func (c *registry) Start(cell.HookContext) error { return nil } c.started = true - c.dynamicLC = cell.NewDefaultLifecycle(nil, 0, 0) return nil } @@ -87,15 +95,18 @@ func (c *registry) Stop(ctx cell.HookContext) error { return nil } c.started = false - c.dynamicLC.Stop(c.logger, ctx) - return nil + return c.runtimeLifecycle.stop(ctx) } +// PreStopHookMarker tells [cell.DefaultLifecycle] that this +// hook should be stopped before any other hook. +func (c *registry) PreStopHookMarker() {} + func (c *registry) WithLifecycle(lifecycle cell.Lifecycle) Registry { r := ®istry{ - logger: c.logger, - shutdowner: c.shutdowner, - lifecycle: lifecycle, + logger: c.logger, + shutdowner: c.shutdowner, + appLifecycle: lifecycle, } lifecycle.Append(r) return r @@ -136,7 +147,7 @@ func (c *registry) addJobs(health cell.Health, opts options, jobs ...Job) { if !c.started { for _, job := range jobs { - c.lifecycle.Append(&queuedJob{ + c.appLifecycle.Append(&queuedJob{ registry: c, job: job, health: health, @@ -147,15 +158,15 @@ func (c *registry) addJobs(health cell.Health, opts options, jobs ...Job) { } for _, job := range jobs { - qj := &queuedJob{ - registry: c, - job: job, - health: health, - options: opts, - } - c.dynamicLC.Append(qj) - // Start the newly appended job immediately. - c.dynamicLC.Start(c.logger, context.Background()) + c.runtimeLifecycle.insertAndStart( + &queuedJob{ + registry: c, + job: job, + health: health, + options: opts, + runtimeJob: true, + }, + ) } } @@ -180,22 +191,40 @@ type Job interface { } type queuedJob struct { - registry *registry - job Job - health cell.Health - options options - cancel context.CancelFunc - done chan struct{} + registry *registry + job Job + health cell.Health + options options + cancel context.CancelFunc + done chan struct{} + startedAt time.Time + prev *queuedJob + next *queuedJob + runtimeJob bool } // Start implements cell.HookInterface. func (qj *queuedJob) Start(cell.HookContext) error { + qj.startedAt = time.Now() qj.done = make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) qj.cancel = cancel pprof.Do(ctx, qj.options.pprofLabels, func(ctx context.Context) { go func() { - defer close(qj.done) + defer func() { + qj.registry.runtimeLifecycle.remove(qj) + if qj.runtimeJob { + qj.registry.logger.Info("Job stopped", + "job", qj.HookInfo(), + "duration", time.Since(qj.startedAt)) + } + close(qj.done) + }() + if qj.runtimeJob { + // We only log this for runtime jobs since we already have the + // lifecycle logging for the jobs added before starting. + qj.registry.logger.Info("Job started", "job", qj.HookInfo()) + } qj.job.start(ctx, qj.health, qj.options) }() }) @@ -217,7 +246,7 @@ func (qj *queuedJob) Stop(ctx cell.HookContext) error { } } -var _ cell.HookInterface = &queuedJob{} +var _ cell.HookDescriptiveInterface = &queuedJob{} type group struct { registry *registry diff --git a/job/job_test.go b/job/job_test.go index 879cebe..6ed6943 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -5,10 +5,10 @@ package job import ( "context" - "fmt" "log/slog" "runtime" "runtime/pprof" + "slices" "strings" "sync" "sync/atomic" @@ -221,12 +221,10 @@ func TestJobLifecycleOrderingAcrossGroups(t *testing.T) { g2 Group ) - staticStarts := []string{"g1-first", "g1-second", "g2-first", "g1-third"} - expectStarts := append(append([]string{}, staticStarts...), "g2-dynamic", "g1-dynamic") - expectStops := []string{"g1-third", "g2-first", "g1-second", "g1-first", "g1-dynamic", "g2-dynamic"} + started, stopped := &orderCollector{}, &orderCollector{} addJob := func(g Group, name string) { - g.Add(&orderingJob{name: name}) + g.Add(&orderingJob{started: started, stopped: stopped, name: name}) } h := fixture(func(r Registry, s cell.Health, l cell.Lifecycle) { @@ -239,28 +237,48 @@ func TestJobLifecycleOrderingAcrossGroups(t *testing.T) { addJob(g1, "g1-third") }) - log, logs := newRecordingLogger() + log := hivetest.Logger(t) ctx := context.Background() require.NoError(t, h.Start(log, ctx)) - waitForDetails(t, logs, len(staticStarts)) - startDetails := logs.attrs("detail") - assert.Equal(t, staticStarts, startDetails) + assert.Eventually(t, + func() bool { + return len(started.get()) == 4 + }, + time.Second, 10*time.Millisecond, "expected 4 static jobs to have started") // Add jobs dynamically after the lifecycle has already started. addJob(g2, "g2-dynamic") addJob(g1, "g1-dynamic") - waitForDetails(t, logs, len(expectStarts)) - startDetails = logs.attrs("detail") - assert.Equal(t, expectStarts, startDetails) - logs.clear() + assert.Eventually(t, + func() bool { + return len(started.get()) == 6 + }, + time.Second, 10*time.Millisecond, "expected 4 static jobs + 2 dynamic jobs") + + // While the order of [started.get] is non-deterministic the order in which + // we stop is. + expectedStops := []string{ + "g1-dynamic", + "g2-dynamic", + "g1-third", + "g2-first", + "g1-second", + "g1-first", + } + + // The order in which these goroutines started is non-deterministic, so just + // check that all of them are running as expected + startedJobs := started.get() + slices.Sort(startedJobs) + expectedStarts := slices.Clone(expectedStops) + slices.Sort(expectedStarts) + assert.Equal(t, startedJobs, expectedStarts) require.NoError(t, h.Stop(log, ctx)) - waitForDetails(t, logs, len(expectStops)) - stopDetails := logs.attrs("detail") - assert.Equal(t, expectStops, stopDetails) + assert.Equal(t, expectedStops, stopped.get()) } func TestModuleDecoratedGroup(t *testing.T) { @@ -361,108 +379,33 @@ func Test_sanitizeName(t *testing.T) { } } -type orderingJob struct { - name string -} - -func (oj *orderingJob) start(ctx context.Context, _ cell.Health, _ options) { - <-ctx.Done() -} - -func (oj *orderingJob) info() string { - return oj.name -} - -type logRecord struct { - msg string - attrs map[string]string -} - -type logCollector struct { +type orderCollector struct { mu sync.Mutex - records []logRecord + records []string } -func (lc *logCollector) add(rec logRecord) { - lc.mu.Lock() - defer lc.mu.Unlock() - lc.records = append(lc.records, rec) +func (oc *orderCollector) add(name string) { + oc.mu.Lock() + oc.records = append(oc.records, name) + oc.mu.Unlock() } - -func (lc *logCollector) clear() { - lc.mu.Lock() - defer lc.mu.Unlock() - lc.records = nil +func (oc *orderCollector) get() []string { + oc.mu.Lock() + defer oc.mu.Unlock() + return slices.Clone(oc.records) } -func (lc *logCollector) attrs(attr string) []string { - lc.mu.Lock() - defer lc.mu.Unlock() - - var out []string - for _, rec := range lc.records { - value, ok := rec.attrs[attr] - if !ok { - continue - } - out = append(out, value) - } - return out -} - -type recordingHandler struct { - collector *logCollector - attrs []slog.Attr -} - -func newRecordingLogger() (*slog.Logger, *logCollector) { - collector := &logCollector{} - return slog.New(&recordingHandler{collector: collector}), collector -} - -func (rh *recordingHandler) Enabled(_ context.Context, lvl slog.Level) bool { - return lvl >= slog.LevelInfo -} - -func (rh *recordingHandler) Handle(_ context.Context, r slog.Record) error { - rec := logRecord{ - msg: r.Message, - attrs: make(map[string]string), - } - for _, attr := range rh.attrs { - rec.attrs[attr.Key] = fmt.Sprint(attr.Value) - } - r.Attrs(func(a slog.Attr) bool { - rec.attrs[a.Key] = fmt.Sprint(a.Value) - return true - }) - rh.collector.add(rec) - return nil -} - -func (rh *recordingHandler) WithAttrs(attrs []slog.Attr) slog.Handler { - nrh := &recordingHandler{ - collector: rh.collector, - attrs: append(append([]slog.Attr{}, rh.attrs...), attrs...), - } - return nrh +type orderingJob struct { + started, stopped *orderCollector + name string } -func (rh *recordingHandler) WithGroup(string) slog.Handler { - return &recordingHandler{ - collector: rh.collector, - attrs: append([]slog.Attr{}, rh.attrs...), - } +func (oj *orderingJob) start(ctx context.Context, _ cell.Health, _ options) { + oj.started.add(oj.name) + defer oj.stopped.add(oj.name) + <-ctx.Done() } -func waitForDetails(t *testing.T, logs *logCollector, expectedLen int) { - t.Helper() - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - if len(logs.attrs("detail")) >= expectedLen { - return - } - time.Sleep(tick) - } - t.Fatalf("timeout waiting for logs (expected %d); have %v", expectedLen, logs.attrs("detail")) +func (oj *orderingJob) info() string { + return oj.name } diff --git a/job/lifecycle.go b/job/lifecycle.go new file mode 100644 index 0000000..b89af2b --- /dev/null +++ b/job/lifecycle.go @@ -0,0 +1,80 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package job + +import ( + "context" + "sync" + + "github.com/cilium/hive/cell" +) + +// jobLifecycle tracks jobs started after the [registry] has been started +// (and thus not appended to application lifecycle) and to stop them when +// [registry] stops. Jobs that are stopped earlier are removed from the +// lifecycle. +type jobLifecycle struct { + mu sync.Mutex + + // jobs is a doubly linked-list of started jobs. The head is the latest + // started jobs, e.g. they're in the order they should be stopped. + jobs *queuedJob +} + +func (r *jobLifecycle) insertAndStart(job *queuedJob) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.jobs != nil { + r.jobs.prev = job + } + job.next = r.jobs + r.jobs = job + job.Start(context.Background()) +} + +func (r *jobLifecycle) remove(qj *queuedJob) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.jobs == nil { + qj.prev = nil + qj.next = nil + return + } + + if qj.next != nil { + qj.next.prev = qj.prev + } + if qj == r.jobs { + r.jobs = qj.next + } else if qj.prev != nil { + qj.prev.next = qj.next + } + qj.prev = nil + qj.next = nil +} + +func (r *jobLifecycle) stop(ctx cell.HookContext) error { + // Collect jobs to stop and unlink them. We must stop them without holding the + // lock as [queuedJob.Stop] will try to call [jobLifecycle.remove]. + var jobsToStop []*queuedJob + r.mu.Lock() + var next *queuedJob + for job := r.jobs; job != nil; job = next { + next = job.next + job.next = nil + job.prev = nil + jobsToStop = append(jobsToStop, job) + } + r.jobs = nil + r.mu.Unlock() + for _, job := range jobsToStop { + job.Stop(ctx) + if ctx.Err() != nil { + break + } + } + return ctx.Err() +} diff --git a/job/lifecycle_test.go b/job/lifecycle_test.go new file mode 100644 index 0000000..0dea481 --- /dev/null +++ b/job/lifecycle_test.go @@ -0,0 +1,232 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package job + +import ( + "context" + "io" + "log/slog" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cilium/hive/cell" +) + +func TestJobLifecycleInsertAndRemove(t *testing.T) { + t.Parallel() + + r := ®istry{logger: lifecycleTestLogger()} + first := newBlockingLifecycleJob(t, r, "first", nil) + second := newBlockingLifecycleJob(t, r, "second", nil) + third := newBlockingLifecycleJob(t, r, "third", nil) + defer stopLifecycleJobs(t, first, second, third) + + insertRuntimeLifecycleJobs(r, first, second, third) + waitForLifecycleJobs(t, first, second, third) + + requireRuntimeLifecycleList(t, r, third, second, first) + + removeRuntimeLifecycleJob(r, second) + requireRuntimeLifecycleList(t, r, third, first) + requireRuntimeLifecycleJobUnlinked(t, r, second) + + removeRuntimeLifecycleJob(r, third) + requireRuntimeLifecycleList(t, r, first) + requireRuntimeLifecycleJobUnlinked(t, r, third) + + removeRuntimeLifecycleJob(r, first) + requireRuntimeLifecycleList(t, r) + requireRuntimeLifecycleJobUnlinked(t, r, first) +} + +func TestJobLifecycleRemovesCompletedJobs(t *testing.T) { + t.Parallel() + + r := ®istry{logger: lifecycleTestLogger()} + qj := &queuedJob{ + registry: r, + job: &completingLifecycleJob{started: make(chan struct{})}, + } + + insertRuntimeLifecycleJobs(r, qj) + <-qj.job.(*completingLifecycleJob).started + + require.EventuallyWithT(t, func(c *assert.CollectT) { + r.runtimeLifecycle.mu.Lock() + defer r.runtimeLifecycle.mu.Unlock() + + assert.Nil(c, r.runtimeLifecycle.jobs) + assert.Nil(c, qj.prev) + assert.Nil(c, qj.next) + }, timeout, tick) +} + +func TestJobLifecycleStopStopsJobsInReverseStartOrder(t *testing.T) { + t.Parallel() + + var ( + mu sync.Mutex + stopped []string + ) + recordStop := func(name string) { + mu.Lock() + defer mu.Unlock() + stopped = append(stopped, name) + } + + r := ®istry{logger: lifecycleTestLogger()} + first := newBlockingLifecycleJob(t, r, "first", recordStop) + second := newBlockingLifecycleJob(t, r, "second", recordStop) + third := newBlockingLifecycleJob(t, r, "third", recordStop) + + insertRuntimeLifecycleJobs(r, first, second, third) + waitForLifecycleJobs(t, first, second, third) + + require.NoError(t, stopRuntimeLifecycle(r, context.Background())) + + requireRuntimeLifecycleList(t, r) + + mu.Lock() + defer mu.Unlock() + assert.Equal(t, []string{"third", "second", "first"}, stopped) +} + +func TestJobLifecycleStopReturnsContextError(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + var lifecycle jobLifecycle + assert.ErrorIs(t, lifecycle.stop(ctx), context.Canceled) +} + +type blockingLifecycleJob struct { + name string + started chan struct{} + startOnce sync.Once + onStop func(string) +} + +func newBlockingLifecycleJob(t *testing.T, r *registry, name string, onStop func(string)) *queuedJob { + t.Helper() + + return &queuedJob{ + registry: r, + job: &blockingLifecycleJob{ + name: name, + started: make(chan struct{}), + onStop: onStop, + }, + } +} + +func lifecycleTestLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func (j *blockingLifecycleJob) start(ctx context.Context, _ cell.Health, _ options) { + j.startOnce.Do(func() { + close(j.started) + }) + <-ctx.Done() + if j.onStop != nil { + j.onStop(j.name) + } +} + +func (j *blockingLifecycleJob) info() string { + return j.name +} + +type completingLifecycleJob struct { + started chan struct{} +} + +func (j *completingLifecycleJob) start(context.Context, cell.Health, options) { + close(j.started) +} + +func (j *completingLifecycleJob) info() string { + return "completed" +} + +func waitForLifecycleJobs(t *testing.T, jobs ...*queuedJob) { + t.Helper() + + for _, qj := range jobs { + <-qj.job.(*blockingLifecycleJob).started + } +} + +func stopLifecycleJobs(t *testing.T, jobs ...*queuedJob) { + t.Helper() + + for _, qj := range jobs { + if qj.cancel == nil { + continue + } + require.NoError(t, qj.Stop(context.Background())) + } +} + +func insertRuntimeLifecycleJobs(r *registry, jobs ...*queuedJob) { + for _, qj := range jobs { + r.runtimeLifecycle.insertAndStart(qj) + } +} + +func removeRuntimeLifecycleJob(r *registry, qj *queuedJob) { + r.runtimeLifecycle.remove(qj) +} + +func stopRuntimeLifecycle(r *registry, ctx context.Context) error { + return r.runtimeLifecycle.stop(ctx) +} + +func requireRuntimeLifecycleList(t *testing.T, r *registry, want ...*queuedJob) { + t.Helper() + + r.runtimeLifecycle.mu.Lock() + defer r.runtimeLifecycle.mu.Unlock() + + requireLifecycleList(t, r.runtimeLifecycle.jobs, want...) +} + +func requireRuntimeLifecycleJobUnlinked(t *testing.T, r *registry, qj *queuedJob) { + t.Helper() + + r.runtimeLifecycle.mu.Lock() + defer r.runtimeLifecycle.mu.Unlock() + + assert.Nil(t, qj.prev) + assert.Nil(t, qj.next) +} + +func requireLifecycleList(t *testing.T, head *queuedJob, want ...*queuedJob) { + t.Helper() + + var got []*queuedJob + for qj := head; qj != nil; qj = qj.next { + got = append(got, qj) + } + + require.Len(t, got, len(want)) + for i := range want { + require.Same(t, want[i], got[i]) + if i == 0 { + assert.Nil(t, got[i].prev) + } else { + assert.Same(t, got[i-1], got[i].prev) + } + if i == len(want)-1 { + assert.Nil(t, got[i].next) + } else { + assert.Same(t, got[i+1], got[i].next) + } + } +} diff --git a/job/observer.go b/job/observer.go index d093216..b0ee209 100644 --- a/job/observer.go +++ b/job/observer.go @@ -106,8 +106,12 @@ func (jo *jobObserver[T]) start(ctx context.Context, health cell.Health, options return } - jo.health.Degraded("observer job errored", err) - l.Error("Observer job errored", "error", err) + msg := fmt.Sprintf("Observer job failed (duration %s)", duration) + jo.health.Degraded(msg, err) + l.Error("Observer job errored", + "error", err, + "duration", duration, + ) if options.metrics != nil { options.metrics.JobError(jo.name, err) diff --git a/job/oneshot.go b/job/oneshot.go index c0cadcb..632c7ef 100644 --- a/job/oneshot.go +++ b/job/oneshot.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "strconv" "time" "github.com/cilium/hive" @@ -119,9 +120,9 @@ func (jos *jobOneShot) start(ctx context.Context, health cell.Health, options op "func", internal.FuncNameAndLocation(jos.fn)) var err error + var timeout time.Duration for i := 0; jos.retry < 0 || i <= jos.retry; i++ { if i != 0 { - timeout := jos.backoff.Wait() options.logger.Debug("Delaying retry attempt", "backoff", timeout, "retry-count", i, @@ -137,16 +138,36 @@ func (jos *jobOneShot) start(ctx context.Context, health cell.Health, options op start := time.Now() err = jos.fn(ctx, jos.health) + duration := time.Since(start) if options.metrics != nil { - duration := time.Since(start) options.metrics.OneShotRunDuration(jos.name, duration) } - if err == nil { + switch { + case err == nil: + jos.health.OK("Finished (" + duration.String() + ")") return - } else if !errors.Is(err, context.Canceled) { - jos.health.Degraded("one-shot job errored", err) - l.Error("one-shot job errored", "error", err) + case errors.Is(err, context.Canceled) || ctx.Err() != nil: + return + default: + if jos.backoff != nil && (jos.retry < 0 || i < jos.retry) { + timeout = jos.backoff.Wait() + } + retriesRemain := strconv.FormatInt(int64(jos.retry-i), 10) + if jos.retry < 0 { + retriesRemain = "" + } else if jos.retry == 0 { + retriesRemain = "" + } + msg := fmt.Sprintf("Failed (duration %s, retry %d/%s in %s)", duration, i+1, retriesRemain, timeout) + jos.health.Degraded(msg, err) + l.Error("Failed", + "error", err, + "retry", i+1, + "remaining", retriesRemain, + "timeout", timeout, + "duration", duration, + ) if options.metrics != nil { options.metrics.JobError(jos.name, err) } diff --git a/job/timer.go b/job/timer.go index 7043b31..a8d5031 100644 --- a/job/timer.go +++ b/job/timer.go @@ -201,8 +201,11 @@ func (jt *jobTimer) start(ctx context.Context, health cell.Health, options optio jt.health.OK("OK (" + duration.String() + ")") l.Debug("Timer job finished") } else if !errors.Is(err, context.Canceled) { - jt.health.Degraded("timer job errored", err) - l.Error("Timer job errored", "error", err) + msg := fmt.Sprintf("Timer job failed (duration %s)", duration) + jt.health.Degraded(msg, err) + l.Error("Timer job failed", + "error", err, + "duration", duration) if options.metrics != nil { options.metrics.JobError(jt.name, err)