Skip to content

Commit 7490961

Browse files
authored
fix(serve): degrade large watcher trees instead of failing startup (#443)
## Summary - Cap recursive file watcher setup at an internal process-wide budget of 8192 directories. When the budget is exhausted or `fsnotify.Add` returns `EMFILE`/`ENOSPC`, the affected root degrades to the existing periodic polling path instead of bubbling up as a fatal startup error. - Defer file watcher startup until after the backend listener is bound, the managed Caddy proxy is reachable, and the state file/startup lock are published. Slow recursive watch setup can no longer block the web UI from coming up or stall daemon discovery. - Report polled roots after the existing `Watching N directories` line so the operator can tell when degradation is in effect. Fixes #364. Co-authored-by: Wes McKinney <wesm@users.noreply.github.com>
1 parent 57dcb2b commit 7490961

3 files changed

Lines changed: 117 additions & 21 deletions

File tree

cmd/agentsview/main.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const (
3232
periodicSyncInterval = 15 * time.Minute
3333
unwatchedPollInterval = 2 * time.Minute
3434
watcherDebounce = 500 * time.Millisecond
35+
recursiveWatchBudget = 8192
3536
)
3637

3738
func main() {
@@ -142,9 +143,6 @@ func runServe(cfg config.Config) {
142143
return
143144
}
144145

145-
stopWatcher, unwatchedDirs := startFileWatcher(cfg, engine)
146-
defer stopWatcher()
147-
148146
// Backfill runs in the background. On a large DB (e.g.
149147
// after copying tens of thousands of orphaned sessions
150148
// during a resync), walking every row to recompute
@@ -164,9 +162,6 @@ func runServe(cfg config.Config) {
164162
}()
165163

166164
go startPeriodicSync(engine, database)
167-
if len(unwatchedDirs) > 0 {
168-
go startUnwatchedPoll(engine)
169-
}
170165
}
171166

172167
// Seed model_pricing after any resync swap so the new DB
@@ -249,6 +244,14 @@ func runServe(cfg config.Config) {
249244
}
250245
fmt.Printf("Database: %s\n", cfg.DBPath)
251246

247+
if engine != nil {
248+
stopWatcher, unwatchedDirs := startFileWatcher(cfg, engine)
249+
defer stopWatcher()
250+
if len(unwatchedDirs) > 0 {
251+
go startUnwatchedPoll(engine)
252+
}
253+
}
254+
252255
if err := waitForServerRuntime(ctx, srv, rt); err != nil {
253256
fatal("%v", err)
254257
}
@@ -498,6 +501,7 @@ func startFileWatcher(
498501

499502
var totalWatched int
500503
var shallowWatched int
504+
remaining := recursiveWatchBudget
501505
for _, r := range roots {
502506
if r.shallow {
503507
if watcher.WatchShallow(r.root) {
@@ -508,14 +512,19 @@ func startFileWatcher(
508512
}
509513
continue
510514
}
511-
watched, uw, _ := watcher.WatchRecursive(r.root)
512-
totalWatched += watched
513-
if uw > 0 {
515+
result := watcher.WatchRecursiveBudgeted(r.root, remaining)
516+
totalWatched += result.Watched
517+
remaining -= result.Watched
518+
if result.Unwatched > 0 || result.BudgetExhausted ||
519+
result.ResourceExhausted || result.Err != nil {
514520
unwatchedDirs = append(unwatchedDirs, r.dir)
515521
log.Printf(
516522
"Couldn't watch %d directories under %s, will poll every %s",
517-
uw, r.dir, unwatchedPollInterval,
523+
result.Unwatched, r.dir, unwatchedPollInterval,
518524
)
525+
if result.Err != nil {
526+
log.Printf("watching %s: %v", r.dir, result.Err)
527+
}
519528
}
520529
}
521530

@@ -530,6 +539,12 @@ func startFileWatcher(
530539
totalWatched, time.Since(t).Round(time.Millisecond),
531540
)
532541
}
542+
if len(unwatchedDirs) > 0 {
543+
fmt.Printf(
544+
"Polling %d roots every %s for changes\n",
545+
len(unwatchedDirs), unwatchedPollInterval,
546+
)
547+
}
533548
watcher.Start()
534549
return watcher.Stop, unwatchedDirs
535550
}

internal/sync/watcher.go

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
11
package sync
22

33
import (
4+
"errors"
45
"fmt"
56
"io/fs"
67
"log"
8+
"math"
79
"os"
810
"path/filepath"
911
"slices"
1012
"strings"
1113
"sync"
14+
"syscall"
1215
"time"
1316

1417
"github.com/fsnotify/fsnotify"
1518
)
1619

20+
type RecursiveWatchResult struct {
21+
Watched int
22+
Unwatched int
23+
Err error
24+
BudgetExhausted bool
25+
ResourceExhausted bool
26+
ResourceExhaustedAt string
27+
}
28+
1729
// Watcher uses fsnotify to watch session directories for changes
1830
// and triggers a callback with debouncing.
1931
type Watcher struct {
@@ -60,27 +72,58 @@ func NewWatcher(debounce time.Duration, onChange func(paths []string), excludes
6072
// subdirectories to the watch list. Returns the number
6173
// of directories watched and unwatched (failed to add).
6274
func (w *Watcher) WatchRecursive(root string) (watched int, unwatched int, err error) {
75+
result := w.WatchRecursiveBudgeted(root, math.MaxInt)
76+
return result.Watched, result.Unwatched, result.Err
77+
}
78+
79+
// WatchRecursiveBudgeted walks a directory tree and adds at most
80+
// budget subdirectories to the watch list. The walk stops as soon
81+
// as the budget is exhausted or fsnotify reports resource
82+
// exhaustion, so the caller can degrade the rest of the tree to
83+
// polling without continuing to traverse it.
84+
func (w *Watcher) WatchRecursiveBudgeted(root string, budget int) RecursiveWatchResult {
85+
var result RecursiveWatchResult
6386
root = filepath.Clean(root)
6487
w.addRoot(root)
65-
err = filepath.WalkDir(root,
88+
89+
remaining := budget
90+
result.Err = filepath.WalkDir(root,
6691
func(path string, d fs.DirEntry, err error) error {
6792
if err != nil {
6893
return nil // skip inaccessible dirs
6994
}
70-
if d.IsDir() {
71-
// Skip entire excluded subtrees, but always keep the root.
72-
if path != root && w.shouldExcludeForRoot(path, root) {
73-
return filepath.SkipDir
74-
}
75-
if addErr := w.watcher.Add(path); addErr != nil {
76-
unwatched++
77-
} else {
78-
watched++
95+
if !d.IsDir() {
96+
return nil
97+
}
98+
// Skip entire excluded subtrees, but always keep the root.
99+
if path != root && w.shouldExcludeForRoot(path, root) {
100+
return filepath.SkipDir
101+
}
102+
if remaining <= 0 {
103+
result.BudgetExhausted = true
104+
return filepath.SkipAll
105+
}
106+
if addErr := w.watcher.Add(path); addErr != nil {
107+
result.Unwatched++
108+
if isWatchResourceExhaustion(addErr) {
109+
result.ResourceExhausted = true
110+
result.ResourceExhaustedAt = path
111+
return filepath.SkipAll
79112
}
113+
return nil
80114
}
115+
remaining--
116+
result.Watched++
81117
return nil
82118
})
83-
return watched, unwatched, err
119+
if errors.Is(result.Err, filepath.SkipAll) {
120+
result.Err = nil
121+
}
122+
return result
123+
}
124+
125+
func isWatchResourceExhaustion(err error) bool {
126+
return errors.Is(err, syscall.EMFILE) || errors.Is(err, syscall.ENOSPC)
84127
}
85128

86129
// WatchShallow adds only the root directory to the watch list,

internal/sync/watcher_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package sync
22

33
import (
44
"errors"
5+
"fmt"
56
"os"
67
"path/filepath"
78
"slices"
89
"sync"
10+
"syscall"
911
"testing"
1012
"time"
1113

@@ -351,6 +353,42 @@ func TestWatchRecursive_ExcludesDirectoryNames(t *testing.T) {
351353
}
352354
}
353355

356+
func TestWatchRecursiveBudget_DegradesWhenBudgetExhausted(t *testing.T) {
357+
root := t.TempDir()
358+
for i := range 5 {
359+
if err := os.MkdirAll(filepath.Join(root, fmt.Sprintf("dir-%d", i)), 0o755); err != nil {
360+
t.Fatalf("MkdirAll: %v", err)
361+
}
362+
}
363+
364+
w, err := NewWatcher(time.Second, func(_ []string) {}, nil)
365+
if err != nil {
366+
t.Fatalf("NewWatcher: %v", err)
367+
}
368+
w.Start()
369+
t.Cleanup(func() { w.Stop() })
370+
371+
result := w.WatchRecursiveBudgeted(root, 3)
372+
if result.Watched != 3 {
373+
t.Fatalf("Watched = %d, want 3", result.Watched)
374+
}
375+
if !result.BudgetExhausted {
376+
t.Fatal("BudgetExhausted = false, want true")
377+
}
378+
}
379+
380+
func TestIsWatchResourceExhaustion(t *testing.T) {
381+
if !isWatchResourceExhaustion(syscall.EMFILE) {
382+
t.Fatal("EMFILE should be resource exhaustion")
383+
}
384+
if !isWatchResourceExhaustion(syscall.ENOSPC) {
385+
t.Fatal("ENOSPC should be resource exhaustion")
386+
}
387+
if isWatchResourceExhaustion(os.ErrNotExist) {
388+
t.Fatal("ErrNotExist should not be resource exhaustion")
389+
}
390+
}
391+
354392
func TestWatcherAutoWatchesNewDirs_RespectsExcludes(t *testing.T) {
355393
pathsCh := make(chan []string, 10)
356394
w, err := NewWatcher(20*time.Millisecond, func(paths []string) {

0 commit comments

Comments
 (0)