Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 67 additions & 21 deletions internal/api/dbhealth.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,42 @@ type DBPinger interface {
// atomic boolean. The HTTP middleware below short-circuits /api/* traffic
// with a 503 when the flag is false, preventing goroutine pile-up on pool
// acquisition when the DB is unreachable.
//
// To avoid false-positive 503 windows under load — e.g. SQLite with
// MaxOpen=1 where a busy write transiently blocks the 2s health ping —
// the poller flips to unhealthy only after failureThreshold consecutive
// failed pings. A single successful ping clears the counter and restores
// healthy immediately.
type DBHealth struct {
db DBPinger
driver string
interval time.Duration
healthy atomic.Bool
metrics *telemetry.Metrics
stopCh chan struct{}
doneCh chan struct{}
db DBPinger
driver string
interval time.Duration
healthy atomic.Bool
consecutiveFails atomic.Int32
failureThreshold int32
metrics *telemetry.Metrics
stopCh chan struct{}
doneCh chan struct{}
}

// defaultFailureThreshold is the number of consecutive failed pings before
// the middleware flips into 503-serving mode. Three @ 5s = ~15s of real
// outage before the gate trips, which comfortably swallows transient
// connection-pool contention on single-writer drivers like SQLite.
const defaultFailureThreshold = 3

// NewDBHealth constructs a health poller. Default poll interval is 5s; ping
// timeout is 2s per attempt. Start() must be called to begin polling.
// timeout is 2s per attempt; failureThreshold defaults to 3 consecutive
// failed pings before the gate flips. Start() must be called to begin polling.
func NewDBHealth(db DBPinger, driver string, metrics *telemetry.Metrics) *DBHealth {
h := &DBHealth{
db: db,
driver: driver,
interval: 5 * time.Second,
metrics: metrics,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
db: db,
driver: driver,
interval: 5 * time.Second,
failureThreshold: defaultFailureThreshold,
metrics: metrics,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
// Assume healthy until the first ping proves otherwise — avoids a
// spurious 503 window at startup.
Expand All @@ -51,6 +67,16 @@ func NewDBHealth(db DBPinger, driver string, metrics *telemetry.Metrics) *DBHeal
return h
}

// SetFailureThreshold overrides the number of consecutive failed pings
// before the middleware flips to 503. n <= 0 normalises to 1 (legacy
// behaviour: any single failure trips the gate).
func (h *DBHealth) SetFailureThreshold(n int) {
if n <= 0 {
n = 1
}
atomic.StoreInt32(&h.failureThreshold, int32(n)) // #nosec G115 -- n bounded above; in test wiring
}

// Start launches the background poller.
func (h *DBHealth) Start(ctx context.Context) {
go h.loop(ctx)
Expand Down Expand Up @@ -97,14 +123,34 @@ func (h *DBHealth) ping(parent context.Context) {
pctx, cancel := context.WithTimeout(parent, 2*time.Second)
defer cancel()
err := h.db.PingContext(pctx)
up := err == nil
if err == nil {
// A single success clears the failure streak and restores healthy
// immediately — recovery should not be debounced.
h.consecutiveFails.Store(0)
h.markHealthy(true)
return
}
fails := h.consecutiveFails.Add(1)
threshold := atomic.LoadInt32(&h.failureThreshold)
if threshold < 1 {
threshold = 1
}
if fails >= threshold {
h.markHealthy(false)
}
// Below threshold: leave the gate where it was. A transient pool-contention
// timeout under SQLite MaxOpen=1 must not produce a user-visible 503 window.
}

func (h *DBHealth) markHealthy(up bool) {
h.healthy.Store(up)
if h.metrics != nil && h.metrics.DBUp != nil {
if up {
h.metrics.DBUp.WithLabelValues(h.driver).Set(1)
} else {
h.metrics.DBUp.WithLabelValues(h.driver).Set(0)
}
if h.metrics == nil || h.metrics.DBUp == nil {
return
}
if up {
h.metrics.DBUp.WithLabelValues(h.driver).Set(1)
} else {
h.metrics.DBUp.WithLabelValues(h.driver).Set(0)
}
}

Expand Down
97 changes: 97 additions & 0 deletions internal/api/dbhealth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,103 @@ func TestDBHealth_TogglesOnPingFailure(t *testing.T) {
}
}

// TestDBHealth_SingleFailureDoesNotTripGate exercises the consecutive-failure
// threshold. With the legacy behaviour, a single ping failure flipped
// healthy=false and produced a 503 window for every /api/* request until
// the next successful poll up to 5s later. Under SQLite (MaxOpen=1) +
// real load, a poll competing with an in-flight write is enough to trip
// this — so a brief contended ping must NOT be observable to API clients.
func TestDBHealth_SingleFailureDoesNotTripGate(t *testing.T) {
p := &stubPinger{}
h := NewDBHealth(p, "sqlite", nil)
// Drive ping() directly — no goroutine, no ticker — so we control the
// exact failure pattern without timing flakes.
ctx := context.Background()

// One success then one failure; threshold is 3, so we must still be healthy.
h.ping(ctx)
if !h.Healthy() {
t.Fatalf("expected healthy=true after first successful ping")
}
p.fail.Store(true)
h.ping(ctx)
if !h.Healthy() {
t.Fatalf("single failure flipped healthy=false; threshold=%d should debounce",
h.failureThreshold)
}
// Two failures total — still under threshold of 3.
h.ping(ctx)
if !h.Healthy() {
t.Fatalf("two failures flipped healthy=false; threshold=%d should debounce",
h.failureThreshold)
}
}

// TestDBHealth_FlipsAtThreshold asserts that exactly N consecutive failed
// pings (default 3) are required before the gate trips and starts serving
// 503s. Anything less is treated as transient pool contention.
func TestDBHealth_FlipsAtThreshold(t *testing.T) {
p := &stubPinger{fail: atomic.Bool{}}
p.fail.Store(true)
h := NewDBHealth(p, "sqlite", nil)
ctx := context.Background()

for i := 1; i < int(h.failureThreshold); i++ {
h.ping(ctx)
if !h.Healthy() {
t.Fatalf("flipped after %d failures; expected only after %d",
i, h.failureThreshold)
}
}
// Threshold-th failure: must flip.
h.ping(ctx)
if h.Healthy() {
t.Fatalf("did not flip after %d consecutive failures", h.failureThreshold)
}
}

// TestDBHealth_SuccessResetsCounter asserts that the streak counter resets
// on the first success: two failures, one success, two more failures must
// NOT trip the gate, because there were never 3-in-a-row.
func TestDBHealth_SuccessResetsCounter(t *testing.T) {
p := &stubPinger{}
h := NewDBHealth(p, "sqlite", nil)
ctx := context.Background()

p.fail.Store(true)
h.ping(ctx) // fail 1
h.ping(ctx) // fail 2
if !h.Healthy() {
t.Fatalf("flipped before threshold")
}
p.fail.Store(false)
h.ping(ctx) // success — counter reset
if !h.Healthy() {
t.Fatalf("expected healthy=true after success")
}
p.fail.Store(true)
h.ping(ctx) // fail 1 (streak)
h.ping(ctx) // fail 2 (streak)
if !h.Healthy() {
t.Fatalf("flipped after only 2 fresh failures; counter did not reset on success")
}
}

// TestDBHealth_SetFailureThresholdNormalisesNonPositive confirms that a
// non-positive threshold collapses to 1 — restoring the legacy "any single
// failure trips the gate" behaviour for callers that explicitly opt out.
func TestDBHealth_SetFailureThresholdNormalisesNonPositive(t *testing.T) {
p := &stubPinger{}
h := NewDBHealth(p, "sqlite", nil)

for _, n := range []int{0, -1, -100} {
h.SetFailureThreshold(n)
if got := atomic.LoadInt32(&h.failureThreshold); got != 1 {
t.Fatalf("SetFailureThreshold(%d) → %d, want 1", n, got)
}
}
}

func waitFor(t *testing.T, d time.Duration, cond func() bool) {
t.Helper()
deadline := time.Now().Add(d)
Expand Down
Loading