Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 59 additions & 13 deletions cell/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ type HookDescriptiveInterface interface {
HookInfo() string
}

// PreStopHook marks the hook as a pre-stop hook. This is used by the
// [job.registry] to ensure that the jobs started at runtime are
// stopped before anything else to ensure their dependencies are not
// stopped before the jobs.
type PreStopHook interface {
PreStopHookMarker()
}

// Hook is a pair of start and stop callbacks. Both are optional.
// They're paired up to make sure that on failed start all corresponding
// stop hooks are executed.
Expand Down Expand Up @@ -82,12 +90,13 @@ type DefaultLifecycle struct {
type augmentedHook struct {
HookInterface
moduleID FullModuleID
stopped bool
}

func NewDefaultLifecycle(hooks []HookInterface, numStarted int, logThreshold time.Duration) *DefaultLifecycle {
h := make([]augmentedHook, 0, len(hooks))
for _, hook := range hooks {
h = append(h, augmentedHook{hook, nil})
h = append(h, augmentedHook{hook, nil, false})
}
return &DefaultLifecycle{
mu: sync.Mutex{},
Expand All @@ -101,7 +110,7 @@ func (lc *DefaultLifecycle) Append(hook HookInterface) {
lc.mu.Lock()
defer lc.mu.Unlock()

lc.hooks = append(lc.hooks, augmentedHook{hook, nil})
lc.hooks = append(lc.hooks, augmentedHook{hook, nil, false})
}

func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error {
Expand All @@ -116,6 +125,8 @@ func (lc *DefaultLifecycle) Start(log *slog.Logger, ctx context.Context) error {

from := lc.numStarted
for i, hook := range lc.hooks[from:] {
lc.hooks[from+i].stopped = false

fnName, exists := getHookFuncName(hook, true)

if !exists {
Expand Down Expand Up @@ -164,15 +175,10 @@ func (lc *DefaultLifecycle) Stop(log *slog.Logger, ctx context.Context) error {
defer cancel()

var errs error
for ; lc.numStarted > 0; lc.numStarted-- {
if ctx.Err() != nil {
return ctx.Err()
}
hook := lc.hooks[lc.numStarted-1]

runStopHook := func(hook augmentedHook) {
fnName, exists := getHookFuncName(hook, false)
if !exists {
continue
return
}

l := log.With("function", fnName)
Expand All @@ -195,6 +201,35 @@ func (lc *DefaultLifecycle) Stop(log *slog.Logger, ctx context.Context) error {
}
}
}

// Stop pre-stop hooks first. We can't update [lc.numStarted]
// since we're skipping hooks, so instead we mark the hook
// as already stopped to retain idempotency.
for i := lc.numStarted - 1; i >= 0; i-- {
if ctx.Err() != nil {
return ctx.Err()
}
hook := lc.hooks[i]
if hook.stopped {
continue
}
if _, ok := hook.HookInterface.(PreStopHook); ok {
runStopHook(hook)
lc.hooks[i].stopped = true
}
}

// Finally stop the normal hooks.
for i := lc.numStarted - 1; i >= 0; i-- {
if ctx.Err() != nil {
return ctx.Err()
}
hook := lc.hooks[i]
if !hook.stopped {
runStopHook(hook)
}
lc.numStarted--
}
return errs
}

Expand All @@ -212,14 +247,25 @@ func (lc *DefaultLifecycle) PrintHooks(w io.Writer) {
}

fmt.Fprintf(w, "\nStop hooks:\n\n")
for i := len(lc.hooks) - 1; i >= 0; i-- {
hook := lc.hooks[i]
printHook := func(hook augmentedHook) {
fnName, exists := getHookFuncName(hook.HookInterface, false)
if !exists {
continue
return
}
fmt.Fprintf(w, " • %s (%s)\n", fnName, hook.moduleID)
}
for i := len(lc.hooks) - 1; i >= 0; i-- {
hook := lc.hooks[i]
if _, ok := hook.HookInterface.(PreStopHook); ok {
printHook(hook)
}
}
for i := len(lc.hooks) - 1; i >= 0; i-- {
hook := lc.hooks[i]
if _, ok := hook.HookInterface.(PreStopHook); !ok {
printHook(hook)
}
}
}

type augmentedLifecycle struct {
Expand All @@ -231,7 +277,7 @@ func (lc augmentedLifecycle) Append(hook HookInterface) {
lc.mu.Lock()
defer lc.mu.Unlock()

lc.hooks = append(lc.hooks, augmentedHook{hook, lc.moduleID})
lc.hooks = append(lc.hooks, augmentedHook{hook, lc.moduleID, false})
}

func getHookFuncName(hook HookInterface, start bool) (name string, hasHook bool) {
Expand Down
91 changes: 60 additions & 31 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log/slog"
"runtime/pprof"
"sync"
"time"

"github.com/cilium/hive"
"github.com/cilium/hive/cell"
Expand Down Expand Up @@ -44,12 +45,20 @@ type registry struct {
logger *slog.Logger
shutdowner hive.Shutdowner

mu sync.Mutex
// appLifecycle is the main application appLifecycle. Jobs that are
// added before the registry is started are appended here. This ensures
// that the job starting order is interleaved with the start hooks and
// that we don't start the jobs before a dependency's start hook has ran.
appLifecycle cell.Lifecycle

// runtimeLifecycle is the lifecycle used after registry has started.
runtimeLifecycle jobLifecycle

// mu protects the fields below
mu sync.Mutex

groups []*group
started bool

lifecycle cell.Lifecycle
dynamicLC *cell.DefaultLifecycle
}

var _ cell.HookInterface = (*registry)(nil)
Expand All @@ -60,9 +69,9 @@ func newRegistry(
lc cell.Lifecycle,
) Registry {
r := &registry{
logger: logger,
shutdowner: shutdowner,
lifecycle: lc,
logger: logger,
shutdowner: shutdowner,
appLifecycle: lc,
}
lc.Append(r)
return r
Expand All @@ -76,7 +85,6 @@ func (c *registry) Start(cell.HookContext) error {
return nil
}
c.started = true
c.dynamicLC = cell.NewDefaultLifecycle(nil, 0, 0)
return nil
}

Expand All @@ -87,15 +95,18 @@ func (c *registry) Stop(ctx cell.HookContext) error {
return nil
}
c.started = false
c.dynamicLC.Stop(c.logger, ctx)
return nil
return c.runtimeLifecycle.stop(ctx)
}

// PreStopHookMarker tells [cell.DefaultLifecycle] that this
// hook should be stopped before any other hook.
func (c *registry) PreStopHookMarker() {}

func (c *registry) WithLifecycle(lifecycle cell.Lifecycle) Registry {
r := &registry{
logger: c.logger,
shutdowner: c.shutdowner,
lifecycle: lifecycle,
logger: c.logger,
shutdowner: c.shutdowner,
appLifecycle: lifecycle,
}
lifecycle.Append(r)
return r
Expand Down Expand Up @@ -136,7 +147,7 @@ func (c *registry) addJobs(health cell.Health, opts options, jobs ...Job) {

if !c.started {
for _, job := range jobs {
c.lifecycle.Append(&queuedJob{
c.appLifecycle.Append(&queuedJob{
registry: c,
job: job,
health: health,
Expand All @@ -147,15 +158,15 @@ func (c *registry) addJobs(health cell.Health, opts options, jobs ...Job) {
}

for _, job := range jobs {
qj := &queuedJob{
registry: c,
job: job,
health: health,
options: opts,
}
c.dynamicLC.Append(qj)
// Start the newly appended job immediately.
c.dynamicLC.Start(c.logger, context.Background())
c.runtimeLifecycle.insertAndStart(
&queuedJob{
registry: c,
job: job,
health: health,
options: opts,
runtimeJob: true,
},
)
}
}

Expand All @@ -180,22 +191,40 @@ type Job interface {
}

type queuedJob struct {
registry *registry
job Job
health cell.Health
options options
cancel context.CancelFunc
done chan struct{}
registry *registry
job Job
health cell.Health
options options
cancel context.CancelFunc
done chan struct{}
startedAt time.Time
prev *queuedJob
next *queuedJob
runtimeJob bool
}

// Start implements cell.HookInterface.
func (qj *queuedJob) Start(cell.HookContext) error {
qj.startedAt = time.Now()
qj.done = make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
qj.cancel = cancel
pprof.Do(ctx, qj.options.pprofLabels, func(ctx context.Context) {
go func() {
defer close(qj.done)
defer func() {
qj.registry.runtimeLifecycle.remove(qj)
if qj.runtimeJob {
qj.registry.logger.Info("Job stopped",
"job", qj.HookInfo(),
"duration", time.Since(qj.startedAt))
}
close(qj.done)
}()
if qj.runtimeJob {
// We only log this for runtime jobs since we already have the
// lifecycle logging for the jobs added before starting.
qj.registry.logger.Info("Job started", "job", qj.HookInfo())
}
qj.job.start(ctx, qj.health, qj.options)
}()
})
Expand All @@ -217,7 +246,7 @@ func (qj *queuedJob) Stop(ctx cell.HookContext) error {
}
}

var _ cell.HookInterface = &queuedJob{}
var _ cell.HookDescriptiveInterface = &queuedJob{}

type group struct {
registry *registry
Expand Down
Loading
Loading