feat: drain-on-Stop and NewTicker primitive#4
Conversation
- Stop now waits for runFunc to return on its own when WithDrain is set, closing the channel returned by Stopping(ctx) instead of cancelling. - Falls back to ctx cancel + wait once the drain timeout elapses, so stuck runFuncs still surface as DeadlineExceeded. - No-op for existing callers — Stop semantics are unchanged when WithDrain is not used. - Strengthen the existing stop-timeout test to assert ctx-cancel behavior when WithDrain is absent.
- NewTicker(interval, tick, opts...) wraps the standard time.NewTicker + select-loop pattern that periodic workers reimplement. - Composes with WithDrain (in-flight tick finishes; loop exits without firing a new tick) and WithRecoverer (panics in tick are caught). - Skip queued ticks accumulated while a slow tick was running once Stop has been called, so shutdown isn't delayed by stale t.C buffer. - Add examples/ticker-with-drain showing the full SIGTERM-safe shape (ticker + drain + recoverer + signal.NotifyContext).
- examples/main.go: drop unreachable return after infinite loop; capture and defer the cancel from context.WithTimeout to fix the ctx leak. - runnable_test.go, runnable_group_test.go: replace single-case selects with plain channel receives (S1000). - with_recoverer_test.go: drop unreachable returns after panic.
- with_recoverer_test.go: drop forced string type assertion in Report; use %v formatting so non-string panic values don't crash. - examples/ticker-with-drain: silence errcheck on os.Stderr.Write.
Concurrent Stop callers raced to close the same stoppingChan, causing "close of closed channel" panic on the second close. Pre-PR Stop was safe because context.CancelFunc tolerates repeat calls; WithDrain silently regressed that contract — and K8s does occasionally fire SIGTERM twice on shutdown. Claim ownership under the mutex: the first Stop copies and nils stoppingChan, drives the drain. Subsequent Stops see nil, skip the close, and fall through to the existing runCancel + <-runStop wait (which is already idempotent). All callers observe the same outcome: runFunc returns, then Stop returns. Adds TestWithDrain/Stop_is_concurrency_safe — 10 concurrent Stops, no panic, every caller returns nil or ErrNotRunning.
Stop's drain fall-through path was indistinguishable from a clean drain — both returned nil. Callers had no way to detect runFuncs that ignored Stopping(ctx) and only exited via ctx.Done(). Track the fall-through and return ErrDrainTimedOut once runFunc finally exits. ctx-cancel paths still return ctx.Err() unchanged. The runnable is fully stopped either way; the sentinel is observability.
- Stopping(ctx) godoc and README example: callers must select on both ctx.Done() and Stopping(ctx). A loop that observes only Stopping hangs on outer-ctx cancellation. README example now shows the full three-case select. - README also reflects ErrDrainTimedOut on the fall-through path. - NewTicker godoc: composing with WithRetry resets the ticker cadence on every retry (tick error bails the loop, WithRetry re-enters runFunc, fresh ticker). Document the behavior; users who need stable cadence should retry inside the tick handler.
context.WithTimeout(ctx, drainTimeout) derived the drain ctx from the caller's ctx, so a Stop deadline shorter than drainTimeout made <-ctx.Done() and the drain expiry race in the same select. The <-ctx.Done() branch returned ctx.Err() *without calling r.runCancel()* — leaving the runnable alive after Stop returned. The other branch could misreport ErrDrainTimedOut when the caller's deadline was the real cause. Switch to time.NewTimer(drainTimeout) and let <-ctx.Done() in the drain select fall through to r.runCancel() so the runnable always gets force-cancelled before Stop returns. drainTimedOut is only set when the standalone timer fires, so the sentinel is no longer spuriously returned on caller-ctx cancellation. Adds TestWithDrain/Stop_forces_cancel_when_caller_ctx_expires_during_drain which exercises a 100ms caller ctx against a 10s drain.
The example passed signal.NotifyContext's ctx to rc.Run. On SIGTERM that ctx cancels immediately — the ticker's runFunc observes <-ctx.Done() and exits before Stop ever closes Stopping(ctx), completely defeating WithDrain. Pass context.Background() to Run; use sigCtx only to detect when to call Stop. Stop's drain budget is now the sole driver of shutdown for the drain-enabled worker.
The previous concurrency-safe fix avoided the double-close panic but introduced two regressions Codex caught on re-review: 1. Secondary Stop callers fell through to r.runCancel() unconditionally, hard-cancelling the runCtx mid-drain. The drain that the primary caller was honoring got bypassed — exactly the failure mode WithDrain exists to prevent. 2. After fixing (1) by making secondary callers wait on runStop, a later Stop with a shorter deadline could no longer enforce it: it would return DeadlineExceeded but never escalate, so the runnable kept draining until the primary caller's drainTimeout expired. Restructure the secondary path: wait on runStop, but escalate to r.runCancel() if the caller's ctx expires first. The shortest deadline among concurrent callers wins, and the drain is only bypassed when some caller actively asks for it via deadline expiry. Adds two tests: - "concurrent Stop preserves drain semantics" — strengthened to assert runFunc observes Stopping(ctx), not ctx.Done() - "secondary Stop with shorter deadline escalates runCancel" — new regression for Codex's escalation finding
main was reading runErr only inside the SIGTERM branch, so if the worker exited early (tick error, recovered panic) main would block on sigCtx forever. Select on either sigCtx or runErr; an early worker exit now propagates the failure without waiting for a signal.
Stop captured runStop and stoppingChan under the mutex but read r.runCancel via the field after waiting in a select. With drain enabled, Stop's drainTimer / ctx.Done() branches fall through to runCancel after a wait. If the runnable exited and Run was called again before that wait returned, r.runCancel pointed at the *new* run's cancel — and Stop tore down the freshly started worker. Snapshot runCancel alongside the other fields so the Stop call operates on the runnable it observed under the lock, regardless of what the next Run does to the field. Adds "late runCancel does not tear down a subsequent Run" — runs Stop followed by an immediate fresh Run and asserts the second run isn't prematurely cancelled. Also adds "WithRetry stops retrying after Stopping fires" as part of the same drain-correctness sweep — see follow-up commit.
WithRetry had no awareness of WithDrain. After Stop was called and Stopping(ctx) closed, a transient error from the in-flight attempt would still trigger a retry — re-entering runFunc and starting fresh work mid-shutdown, defeating drain semantics. Add a non-blocking Stopping(ctx) check between attempts. When drain is not configured, Stopping returns nil and the default branch runs unchanged.
The previous "late runCancel" test created two separate *runnable instances for round 1 and round 2. They share no state, so a stale runCancel from round 1's Stop could not affect round 2 — the test passed even with the snapshot fix reverted. Replace with a single-runnable test: Run, then a primary Stop + secondary Stop with already-cancelled ctx (exercising the runCancel escalation path), then re-Run on the *same* runnable. Round 2 must run undisturbed until explicitly Stopped. The snapshot fix prevents round 1's secondary runCancel from cancelling round 2's runCtx via the field overwrite, so this is the closest deterministic regression shape available without runtime hooks.
A reviewer empirically reverted the runCancel-snapshot fix and ran the test 20×; all 20 passed. The test gates round 2 strictly after both round-1 Stops have returned, so by the time round 2 starts the field-overwrite vs. snapshot distinction is no longer observable. Rename to "runnable can be re-Run after a concurrent-Stop lifecycle" and document that it's a lifecycle smoke test, not a snapshot-fix regression. Deterministic coverage of that race needs testing/synctest (Go 1.25+) or a runtime hook — both out of scope. The snapshot fix is verified by inspection.
The lastTime field on withRetry persisted across Run cycles. After a Stop and re-Run, the first iteration of the new cycle compared against the prior cycle's last attempt — usually far in the past — which always triggered the i=0 reset. The reset was a no-op (i was already 0), so behavior didn't visibly change, but the field carried state across what callers reasonably treat as independent invocations. Move lastTime to a function-local variable inside the closure so each Run cycle gets a fresh timer. Field on the struct is removed. Adds "retry budget is per-Run-cycle" — runs the same WithRetry runnable twice and asserts both cycles exhaust the same retry budget.
Reviewer reverted the lastTime-scoping fix and ran the test against the buggy code: it passed. The bug is purely code-hygiene (state on a struct that shouldn't have it) — observable behavior is unchanged because the i=0 reset is a no-op when i is already 0. There's nothing to assert. The commit message on the fix accurately describes what changed; a test that lies about its coverage is worse than no test.
Alternative design: drain as a runFunc adapterTL;DRSame user-visible drain semantics —
What the PR gets right
The pushback below is about where drain logic lives, not whether it's needed. Concerns with the current design1. Drain logic is interleaved into the core lifecycle
2. Two recent fixes exist only because of the coupling
3.
|
WithDrain is now an adapter (adapters.Draining); core lifecycle has no drain knowledge.
Functionality moved to adapters.Draining and adapters.Ticker.
sigCtx can now be passed directly to Run; Draining intercepts cancellation.
- runnable.go: comment on runCancel snapshot was describing v0.1's failure mode (calling runCancel after waiting for drain). The actual defense in v0.2 is memory-model: Run overwrites the field across cycles, so reading it without synchronization races. Reword. - README: drop the contrived `<-time.After` case from the Draining example body and pull the "always select on both" note out as prose. The full pattern lives in examples/ticker-with-drain.
Panics in work ran on Draining's spawned goroutine, where the outer runnable.WithRecoverer's defer cannot reach them — recover only fires on the goroutine where the deferred call lives. The runtime would crash the whole process. The example explicitly composed Draining + Ticker + WithRecoverer and implied recovery worked; it didn't. Recover inside Draining's goroutine and surface the panic as an error containing the panic value and stack trace. Update the example to drop WithRecoverer (no longer load-bearing in this composition) and document the trade-off: WithRecoverer's Report/StackPrinter hooks do not fire; tick panics arrive as errors on runErr. A future Recovering adapter could give back the rich hooks for users who want them; deferred to a follow-up. Adds TestDraining_RecoversPanicAsError — fails on the unfixed code (work goroutine panic crashes the test process).
Old form asserted 2 <= count <= 4 after 175ms of wall sleep with a
50ms ticker. The upper bound flaked under load — Stop races t.C and
loaded CI runners can queue extra ticks. New form counts signals via
a channel until 3 ticks are observed, then stops. Pins the behavioral
claim ("fires repeatedly on interval") without depending on wall-clock
arithmetic.
Two pre-existing examples used `select { case <-ctx.Done(): ...; default: }`
followed by `time.Sleep(1s)`, which means Stop has to wait up to a
full second for the in-flight sleep before the loop observes
cancellation. The pattern directly contradicts the new "Adapters"
section's narrative about responsive shutdown via Draining/Stopping.
Replace with `case <-time.After(time.Second)` in the select itself so
cancellation is immediate. Also captures+defers the cancel from
context.WithTimeout in the timeout example (the discarded form was a
context-leak warning from go vet on master).
setup-go was pinned to 1.20 while go.mod declares 1.21 and the Draining adapter introduced in this PR uses context.WithoutCancel (Go 1.21+). Switch to go-version-file so the workflow tracks go.mod. Bump actions/checkout to v4 and actions/setup-go to v5; the v3/v4 versions both hit the Node 20 deprecation warning on the runner.
Introduce runnable.RunFunc and runnable.Adapter (func(next RunFunc) RunFunc)
as the core extension point, and runnable.WithAdapters Option that
wraps the runnable's runFunc with the supplied adapters left-to-right
(first listed = outermost wrapper).
Reshape adapters.Draining and adapters.Ticker as config-only
constructors that return runnable.Adapter — work is supplied via
runnable.New, not via the adapter call. Composition becomes:
runnable.New(reconcile, runnable.WithAdapters(
adapters.Draining(10*time.Second),
adapters.Ticker(2*time.Second),
))
The adapters package now imports runnable for the Adapter type;
runnable does not import adapters. Stop and runnable lifecycle are
unchanged.
Add adapters.Recovering (with a single PanicHandler callback, collapsing the v0.1 RecoveryReporter / StackPrinter split) and adapters.Retry, and remove the runnable.WithRecoverer and runnable.WithRetry Options they replace. Drop Status.Restarts: the field counted WithRetry re-entries via the onStart coupling and has no clean way to surface now that retry lives outside core. A later release can reintroduce per-attempt observability via an explicit event channel. Update examples/main.go and examples/ticker-with-drain to use the new adapters via runnable.WithAdapters.
Update the Adapters and Migrating-from-v0.1 sections of README and the adapters package doc comment to show the runnable.WithAdapters Option shape. Add Recovering and Retry to the migration table and call out the Status.Restarts removal.
Trim doc comments on Adapter / WithAdapters / Draining / Ticker / Recovering / Retry to one or two purpose-stating lines each. Drop restating of implementation details that the function body conveys.
runnable v0.2 — moves cross-cutting behaviors (drain-on-shutdown, periodic execution, panic recovery, retry) out of the core lifecycle and into the new
runnable/adapterssubpackage as chi-style middleware. Motivated by the OMSX Frontegg reconciler (and a queue of similar workers across Polygon services) needing shutdown-safe periodic execution. CoreRun/Stopsemantics for existing callers are unchanged unless they opt in.Final shape:
Each adapter is a config-only constructor returning
runnable.Adapter(func(next RunFunc) RunFunc).runnable.WithAdapterswrapsrunFuncleft-to-right (first listed = outermost).What lands
Core (
runnable/)runnable.RunFuncandrunnable.Adaptertypes — the extension point.runnable.WithAdapters(...Adapter) Option— applies a list of adapters, stacking left-to-right.Stop's drain logic is gone from core; drain is now an opt-in adapter concern.WithDrain,NewTicker,WithRetry,WithRecovererOptions are removed. Replacements live inadapters/.Status.Restartsremoved (returns in the stacked v0.3 PR via the event Publisher).go-version-file: 'go.mod'and bumpedactions/checkout@v4+actions/setup-go@v5(the previous pin to Go 1.20 broke oncecontext.WithoutCancelwas introduced).Adapters (
runnable/adapters/)Draining(timeout)— graceful shutdown. Outer ctx cancellation triggers drain: work hastimeoutto return viaadapters.Stopping(ctx)before its ctx is force-cancelled andErrDrainTimedOutis returned. Internally recovers panics on its spawned goroutine as a safety net.Ticker(interval)— periodic execution. Composes withDraining: an in-flight tick is allowed to finish before the loop exits.Recovering()— turns panics in the wrapped work into errors. Pairs withrunnable.WithPublisherif you want a handler (lands in stacked feat: Publisher interface and adapter event types #7).Retry(maxRetries, resetAfter)— re-invokes on non-context errors; budget resets after a quiet period.adapters.Stopping(ctx) <-chan struct{}— drain-start signal for long-running work to select on.adapters.ErrDrainTimedOut— sentinel returned when the drain window expires.Migration
The migration table and
Migrating from v0.1 to v0.2section live in the README. v0.2 is not yet released, so the prior intermediate API in earlier commits of this branch never escaped.Test plan
go test -race -count=1 ./...— clean (runnable~6s,runnable/adapters~3s).golangci-lint run ./...— clean.examples/ticker-with-drain/main.gobuilds end-to-end (full SIGTERM-safe shape).Stopdouble-close, drain timer inheriting caller ctx,runCancelfield race vs subsequentRun,WithRetryre-entry after drain (now moot — retry is adapter-side), panics inDraining's goroutine.runCancel).Notes for review
runnable.Adapter) sorunnable.WithAdaptersdoesn't have to import theadapterssubpackage. Dependency direction isadapters → runnable, never the reverse.NewGroupinteraction — drain-enabled children ofNewGroupnow drain correctly when the group is stopped. In v0.1 this was silently broken (child observedgroupCtx.Done()and exited before seeing its drain signal); the adapter design fixes it by construction. Load-bearing test atgroup_drain_test.go.Stop(ctx)'s ctx no longer shortens the drain window. In v0.1 a shorter caller ctx would force-cancel mid-drain; in v0.2,Stop's ctx governs only how long the caller waits to hear back — the drain timer is fixed byDraining's argument.WithDrain,NewTicker) being introduced and then removed. They're retained for the audit trail; only the final state lands externally.Stacked on top
#7 adds
runnable.Publisherand typed events (RetryEvent,DrainStartedEvent,PanicRecoveredEvent, etc.). RestoresStatus.Restartsvia subscription rather than the oldWithRetryonStartside-channel.Closes #6 (chi-style middleware reshape, now folded into this PR).