From 9acdde7fa00c80a0c85e67ef6dc7e962499fa694 Mon Sep 17 00:00:00 2001 From: steiler Date: Thu, 28 May 2026 16:35:38 +0200 Subject: [PATCH 1/2] fix(stream): goroutine handoff to prevent post-SyncResponse notification drops MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #440. buildTreeSyncWithDatastore previously called syncToRunning inline, blocking the consumer goroutine for the duration of ApplyToRunning → performRevert. Any notification arriving while the goroutine was blocked waited up to notificationSendTimeout (5 s) and was then silently and permanently dropped, leaving the running view of the device incomplete until the datastore was recreated. Fix: on SyncResponse and ticker events, snapshot the local syncTree, reset it to a fresh empty tree, and run syncToRunning in a background goroutine. The main select loop never blocks on a commit and keeps draining updChan at all times. Post-SyncResponse notifications land in the fresh syncTree and are committed by the next ticker (≤ 5 s lag, self-healing, no permanent loss). Implementation details: - atomic.Bool syncRunning prevents ticker goroutines from stacking while a commit is in flight (skip-if-busy) - tickerChan starts as nil (never selects) and is activated lazily on the first SyncResponse, replacing the previous tickerActive bool flag - syncToRunning signature simplified: no mutex parameter, no tree return value - updChanBufSize and notifSendTimeout are configurable fields (defaults unchanged) to enable deterministic tests - NewEmptyTree failure after handoff exits the loop rather than continuing with a shared tree Decision rationale: docs/adr/0003-streamsync-goroutine-handoff-for-notification-drop-prevention.md Co-authored-by: Cursor --- ...andoff-for-notification-drop-prevention.md | 48 ++++ pkg/datastore/target/gnmi/stream.go | 117 +++++--- pkg/datastore/target/gnmi/stream_test.go | 256 ++++++++++++++++++ 3 files changed, 384 insertions(+), 37 deletions(-) create mode 100644 docs/adr/0003-streamsync-goroutine-handoff-for-notification-drop-prevention.md create mode 100644 pkg/datastore/target/gnmi/stream_test.go diff --git a/docs/adr/0003-streamsync-goroutine-handoff-for-notification-drop-prevention.md b/docs/adr/0003-streamsync-goroutine-handoff-for-notification-drop-prevention.md new file mode 100644 index 00000000..bfd206a7 --- /dev/null +++ b/docs/adr/0003-streamsync-goroutine-handoff-for-notification-drop-prevention.md @@ -0,0 +1,48 @@ +# ADR 0003: StreamSync goroutine handoff to prevent post-SyncResponse notification drops + +## Status + +Accepted — 2026-05-28 + +## Context + +`StreamSync.buildTreeSyncWithDatastore` ran a single `for/select` loop that both drained the incoming notification channel (`updChan`) **and** called `syncToRunning` inline. `syncToRunning` calls `ApplyToRunning` → `performRevert`, which loads all non-running intents, validates the merged tree, and pushes revert deltas to the device. This can take several seconds. + +While `syncToRunning` held the goroutine, no notifications could be consumed from `updChan`. Pool workers delivering notifications blocked until `notificationSendTimeout` (5 s) expired, at which point the update was silently and permanently dropped. Any notification dropped during this window left the running view of the device permanently incomplete until the datastore was recreated. + +A concurrent race (problem B) compounded this: the `SyncResponse` signal was sent directly to the consumer goroutine while in-flight pool workers were still writing their notifications to `updChan`. Some post-`SyncResponse` onChange notifications arrived while the goroutine was blocked in `syncToRunning`, causing the drops. + +A real CI failure confirmed the impact: the mandatory `password` leaf for the `admin` user was permanently absent from the running tree, causing every subsequent `TransactionSet` validation to fail for 34+ minutes. + +## Decision + +**Primary fix (Problem A):** apply a goroutine handoff in `buildTreeSyncWithDatastore`. + +On each `syncResponse` or ticker commit event, the current `syncTree` is snapshotted as `treeToCommit`, `syncTree` is immediately reset to a fresh empty tree (from `NewEmptyTree`), and `syncToRunning` is called in a background goroutine. The main loop never blocks on a commit — it keeps draining `updChan` at all times. + +Key implementation details: +- An `atomic.Bool syncRunning` tracks whether a commit goroutine is in flight. The ticker case skips if `syncRunning` is true (the running goroutine will commit the accumulated state). The `syncResponse` case always spawns since the initial sync commit must not be skipped. +- Both cases set `syncRunning.Store(true)` before spawning; the goroutine calls `defer syncRunning.Store(false)`. +- If `NewEmptyTree` fails, `buildTreeSyncWithDatastore` logs and returns rather than continuing with a shared tree that is also in use by the spawned goroutine. +- The local `syncTreeMutex` previously passed to `syncToRunning` is removed: with the handoff, each goroutine owns its own snapshot so there is no shared mutable local tree. Concurrent calls to `ApplyToRunning` are serialized by `d.syncTreeMutex` inside the datastore, which was always the authoritative guard. +- The `syncToRunning` signature is simplified: it no longer returns a `*tree.RootEntry` (the fresh tree is created in the main loop before spawning) and no longer accepts a `*sync.Mutex` parameter. + +**Problem B (post-SyncResponse notification lag):** with Problem A fixed, notifications that race past the `SyncResponse` signal land in the fresh `syncTree` and are committed by the next 5-second ticker. This is a bounded, self-healing lag (≤ 5 s) rather than permanent loss. A full per-window drain (Option 4 in the issue) was considered but deferred: it adds pool lifecycle complexity and the bounded lag is acceptable given that `ApplyToRunning` is idempotent and the ticker commits continuously. + +## Alternatives considered + +- **Increase `updChan` buffer** — raises the threshold at which drops occur, does not eliminate the root cause. +- **Remove drop timeout** — pushes the problem upstream; a stuck `performRevert` would freeze the whole stream pipeline instead of logging a drop. +- **Per-window drain (`CloseAndWait`)** — eliminates the 5-second Problem B lag but must be combined with the goroutine handoff to be safe; deferred as a follow-up. +- **Decouple `performRevert` from `ApplyToRunning`** — architecturally cleanest long-term direction (the `// TODO` comment in `sync.go` already flags this); larger refactor, tracked separately. + +## Consequences + +- **Positive:** Post-`SyncResponse` notifications are never permanently dropped regardless of how long `performRevert` takes. +- **Positive:** The `syncToRunning` signature is simpler (no mutex parameter, no tree return value). +- **Negative:** A bounded lag (≤ 5 s) remains for the narrow race where post-`SyncResponse` onChange notifications arrive before the ticker fires; acceptable given idempotent apply and continuous ticker commits. +- **Observability:** Any `NewEmptyTree` failure now causes `buildTreeSyncWithDatastore` to exit and log, surfacing a previously silent degraded state. + +## Related + +- [GitHub issue #440](https://github.com/sdcio/data-server/issues/440) diff --git a/pkg/datastore/target/gnmi/stream.go b/pkg/datastore/target/gnmi/stream.go index b8816924..2ce3d5af 100644 --- a/pkg/datastore/target/gnmi/stream.go +++ b/pkg/datastore/target/gnmi/stream.go @@ -3,7 +3,7 @@ package gnmi import ( "context" "errors" - "sync" + "sync/atomic" "time" "github.com/openconfig/gnmi/proto/gnmi" @@ -25,7 +25,8 @@ import ( const ( syncSignalBufferSize = 1 - notificationSendTimeout = 5 * time.Second + defaultUpdChanBufSize = 20 + defaultNotifSendTimeout = 5 * time.Second notificationSlowSendWarnTime = 500 * time.Millisecond ) @@ -37,6 +38,13 @@ type StreamSync struct { runningStore types.RunningStore schemaClient dsutils.SchemaClientBound vpoolFactory pool.VirtualPoolFactory + + // updChanBufSize controls the notification channel buffer. Default is + // defaultUpdChanBufSize; tests may override before calling Start. + updChanBufSize int + // notifSendTimeout is the per-notification delivery deadline. Default is + // defaultNotifSendTimeout; tests may override before calling Start. + notifSendTimeout time.Duration } func NewStreamSync(ctx context.Context, target SyncTarget, c *config.SyncProtocol, runningStore types.RunningStore, schemaClient dsutils.SchemaClientBound, vpoolFactory pool.VirtualPoolFactory) *StreamSync { @@ -47,13 +55,15 @@ func NewStreamSync(ctx context.Context, target SyncTarget, c *config.SyncProtoco ctx = logger.IntoContext(ctx, log) return &StreamSync{ - config: c, - target: target, - cancel: cancel, - runningStore: runningStore, - schemaClient: schemaClient, - ctx: ctx, - vpoolFactory: vpoolFactory, + config: c, + target: target, + cancel: cancel, + runningStore: runningStore, + schemaClient: schemaClient, + ctx: ctx, + vpoolFactory: vpoolFactory, + updChanBufSize: defaultUpdChanBufSize, + notifSendTimeout: defaultNotifSendTimeout, } } @@ -110,7 +120,7 @@ func (s *StreamSync) Start() error { log := logger.FromContext(s.ctx) log.Info("Starting Sync") - updChan := make(chan *NotificationData, 20) + updChan := make(chan *NotificationData, s.updChanBufSize) // Keep a single pending sync signal to avoid blocking the subscribe loop. syncResponse := make(chan struct{}, syncSignalBufferSize) @@ -128,6 +138,13 @@ func (s *StreamSync) Start() error { return nil } +// buildTreeSyncWithDatastore accumulates incoming notifications into a local +// syncTree and commits it to Running on SyncResponse and every ticker tick. +// +// The commit is handed off to a background goroutine so the main select loop +// never blocks on ApplyToRunning. This prevents notifications from timing out +// and being silently dropped while a slow performRevert holds the goroutine +// (see GitHub issue #440). func (s *StreamSync) buildTreeSyncWithDatastore(cUS <-chan *NotificationData, syncResponse <-chan struct{}) { log := logger.FromContext(s.ctx) syncTree, err := s.runningStore.NewEmptyTree(s.ctx) @@ -135,15 +152,18 @@ func (s *StreamSync) buildTreeSyncWithDatastore(cUS <-chan *NotificationData, sy log.Error(err, "failure creating new sync tree") return } - syncTreeMutex := &sync.Mutex{} - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - // disable ticker until after the initial full sync is done - tickerActive := false uif := treetypes.NewUpdateInsertFlags() + // tickerChan starts as nil (receives from a nil channel block forever in + // Go, so the ticker case never fires). It is set to a real ticker after + // the initial SyncResponse so the ticker only runs post-initial-sync. + var tickerChan <-chan time.Time + + // syncRunning is true while a syncToRunning goroutine is in flight. + // The ticker case skips when true; the syncResponse case always proceeds. + var syncRunning atomic.Bool + for { select { case <-s.ctx.Done(): @@ -159,21 +179,42 @@ func (s *StreamSync) buildTreeSyncWithDatastore(cUS <-chan *NotificationData, sy } syncTree.GetTreeContext().ExplicitDeletes().Add(consts.RunningIntentName, consts.RunningValuesPrio, noti.deletes) case <-syncResponse: - syncTree, err = s.syncToRunning(syncTree, syncTreeMutex, true) - tickerActive = true + treeToCommit := syncTree + syncTree, err = s.runningStore.NewEmptyTree(s.ctx) if err != nil { - log.Error(err, "failed committing synctree to running") + log.Error(err, "failure creating new sync tree after sync response") + return + } + if tickerChan == nil { + t := time.NewTicker(5 * time.Second) + defer t.Stop() + tickerChan = t.C } - case <-ticker.C: - if !tickerActive { - log.Info("Skipping a sync tick - initial sync not finished yet") + syncRunning.Store(true) + go func() { + defer syncRunning.Store(false) + if err := s.syncToRunning(treeToCommit, true); err != nil { + log.Error(err, "failed committing synctree to running") + } + }() + case <-tickerChan: + if syncRunning.Load() { continue } log.Info("SyncRunning due to ticker") - syncTree, err = s.syncToRunning(syncTree, syncTreeMutex, true) + treeToCommit := syncTree + syncTree, err = s.runningStore.NewEmptyTree(s.ctx) if err != nil { - log.Error(err, "failed committing synctree to running") + log.Error(err, "failure creating new sync tree after ticker") + return } + syncRunning.Store(true) + go func() { + defer syncRunning.Store(false) + if err := s.syncToRunning(treeToCommit, true); err != nil { + log.Error(err, "failed committing synctree to running") + } + }() } } } @@ -186,8 +227,8 @@ func (s *StreamSync) gnmiSubscribe(subReq *gnmi.SubscribeRequest, updChan chan<- respChan, errChan := s.target.Subscribe(s.ctx, subReq, s.config.Name) taskPool := s.vpoolFactory.NewVirtualPool(pool.VirtualTolerant) - defer taskPool.CloseForSubmit() - taskParams := NewNotificationProcessorTaskParameters(updChan, s.schemaClient) + defer func() { taskPool.CloseForSubmit() }() + taskParams := NewNotificationProcessorTaskParameters(updChan, s.schemaClient, s.notifSendTimeout) syncStartTime := time.Now() for { @@ -228,10 +269,11 @@ func (s *StreamSync) gnmiSubscribe(subReq *gnmi.SubscribeRequest, updChan chan<- } } -func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, m *sync.Mutex, logCount bool) (*tree.RootEntry, error) { +// syncToRunning exports syncTree and applies it to Running. It is called from +// background goroutines spawned by buildTreeSyncWithDatastore; the caller owns +// syncTree exclusively and no mutex is needed. +func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, logCount bool) error { log := logger.FromContext(s.ctx) - m.Lock() - defer m.Unlock() startTime := time.Now() result, err := ops.TreeExport(syncTree.Entry, consts.RunningIntentName, consts.RunningValuesPrio, false) @@ -239,11 +281,10 @@ func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, m *sync.Mutex, logC if err != nil { if errors.Is(err, ops.ErrorIntentNotPresent) { log.Info("sync no config changes") - // all good no data present - return syncTree, nil + return nil } log.Error(err, "sync tree export error") - return s.runningStore.NewEmptyTree(s.ctx) + return err } // extract the explicit deletes deletes := result.ExplicitDeletes @@ -259,10 +300,10 @@ func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, m *sync.Mutex, logC err = s.runningStore.ApplyToRunning(s.ctx, deletes, proto.NewProtoTreeImporter(result)) if err != nil { log.Error(err, "failed importing sync to running") - return s.runningStore.NewEmptyTree(s.ctx) + return err } log.V(logger.VTrace).Info("import to running tree done", "duration", time.Since(startTime).String()) - return s.runningStore.NewEmptyTree(s.ctx) + return nil } type SyncTarget interface { @@ -282,12 +323,14 @@ type notificationProcessorTask struct { type NotificationProcessorTaskParameters struct { notificationResult chan<- *NotificationData schemaClientBound dsutils.SchemaClientBound + sendTimeout time.Duration } -func NewNotificationProcessorTaskParameters(notificationResult chan<- *NotificationData, scb dsutils.SchemaClientBound) *NotificationProcessorTaskParameters { +func NewNotificationProcessorTaskParameters(notificationResult chan<- *NotificationData, scb dsutils.SchemaClientBound, sendTimeout time.Duration) *NotificationProcessorTaskParameters { return &NotificationProcessorTaskParameters{ notificationResult: notificationResult, schemaClientBound: scb, + sendTimeout: sendTimeout, } } @@ -310,8 +353,8 @@ func (t *notificationProcessorTask) sendNotificationData(ctx context.Context, da log.Info("slow notification delivery to sync loop", "duration", duration.String()) } return nil - case <-time.After(notificationSendTimeout): - log.Error(context.DeadlineExceeded, "notification delivery timeout, dropping update", "timeout", notificationSendTimeout.String()) + case <-time.After(t.params.sendTimeout): + log.Error(context.DeadlineExceeded, "notification delivery timeout, dropping update", "timeout", t.params.sendTimeout.String()) return nil } } diff --git a/pkg/datastore/target/gnmi/stream_test.go b/pkg/datastore/target/gnmi/stream_test.go new file mode 100644 index 00000000..ec332bb5 --- /dev/null +++ b/pkg/datastore/target/gnmi/stream_test.go @@ -0,0 +1,256 @@ +package gnmi + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/openconfig/gnmi/proto/gnmi" + schemaClientPkg "github.com/sdcio/data-server/pkg/datastore/clients/schema" + "github.com/sdcio/data-server/pkg/config" + "github.com/sdcio/data-server/pkg/pool" + "github.com/sdcio/data-server/pkg/tree" + treeimporter "github.com/sdcio/data-server/pkg/tree/importer" + "github.com/sdcio/data-server/pkg/utils/testhelper" + sdcpb "github.com/sdcio/sdc-protos/sdcpb" +) + +// fakeRunningStore uses real tree creation but intercepts ApplyToRunning. +// The first call blocks until firstUnblock is closed; all subsequent calls +// return immediately so the test can observe the call count. +type fakeRunningStore struct { + sc *schemaClientPkg.SchemaClientBoundImpl + vp pool.VirtualPoolFactory + + firstUnblock chan struct{} + firstStarted chan struct{} + firstStartedOnce sync.Once + + calls atomic.Int32 +} + +func newFakeRunningStore(sc *schemaClientPkg.SchemaClientBoundImpl, vp pool.VirtualPoolFactory) *fakeRunningStore { + return &fakeRunningStore{ + sc: sc, + vp: vp, + firstUnblock: make(chan struct{}), + firstStarted: make(chan struct{}), + } +} + +func (f *fakeRunningStore) ApplyToRunning(ctx context.Context, _ []*sdcpb.Path, _ treeimporter.ImportConfigAdapter) error { + f.firstStartedOnce.Do(func() { close(f.firstStarted) }) + select { + case <-f.firstUnblock: + case <-ctx.Done(): + return ctx.Err() + } + f.calls.Add(1) + return nil +} + +func (f *fakeRunningStore) NewEmptyTree(ctx context.Context) (*tree.RootEntry, error) { + tc := tree.NewTreeContext(f.sc, f.vp) + return tree.NewTreeRoot(ctx, tc) +} + +// fakeSyncTarget implements SyncTarget with channels the test controls. +type fakeSyncTarget struct { + respChan chan *gnmi.SubscribeResponse + errChan chan error +} + +func (f *fakeSyncTarget) Subscribe(_ context.Context, _ *gnmi.SubscribeRequest, _ string) (chan *gnmi.SubscribeResponse, chan error) { + return f.respChan, f.errChan +} + +// interfaceDescriptionNotif returns a gNMI SubscribeResponse_Update for +// /interface[name=ifname]/description = desc. +func interfaceDescriptionNotif(ifname, desc string) *gnmi.SubscribeResponse { + return &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Update: []*gnmi.Update{ + { + Path: &gnmi.Path{ + Elem: []*gnmi.PathElem{ + {Name: "interface", Key: map[string]string{"name": ifname}}, + {Name: "description"}, + }, + }, + Val: &gnmi.TypedValue{ + Value: &gnmi.TypedValue_StringVal{StringVal: desc}, + }, + }, + }, + }, + }, + } +} + +func syncRespMsg() *gnmi.SubscribeResponse { + return &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}, + } +} + +// TestBuildTreeSyncWithDatastore_PostSyncNotificationsNotDropped is the +// regression test for GitHub issue #440. +// +// Scenario: ApplyToRunning is slow. Notifications that arrive after the +// SyncResponse is processed but while ApplyToRunning is blocked must not be +// dropped — they must be committed by the next ticker. +// +// Observable invariant: ApplyToRunning is called at least twice (once for the +// initial sync commit, and once for the ticker commit of post-sync +// notifications). If notifications were silently dropped, the fresh syncTree +// would be empty and the ticker's syncToRunning would return early without +// calling ApplyToRunning, leaving the count at 1. +func TestBuildTreeSyncWithDatastore_PostSyncNotificationsNotDropped(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + sc, schemaConf, err := testhelper.InitSDCIOSchema() + if err != nil { + t.Fatalf("init schema: %v", err) + } + scb := schemaClientPkg.NewSchemaClientBound(schemaConf, sc) + + sharedPool := pool.NewSharedTaskPool(ctx, runtime.GOMAXPROCS(0)) + store := newFakeRunningStore(scb, sharedPool) + + respChan := make(chan *gnmi.SubscribeResponse, 20) + target := &fakeSyncTarget{ + respChan: respChan, + errChan: make(chan error, 1), + } + + ss := NewStreamSync(ctx, target, &config.SyncProtocol{ + Name: "test", + Paths: []string{"/"}, + Mode: "on-change", + }, store, scb, sharedPool) + // Zero-buffer channel + short timeout forces a deterministic drop with the + // buggy implementation: pool workers block immediately (no receiver while + // the consumer goroutine is in ApplyToRunning) and time out after 20 ms. + // With the fix the goroutine handoff keeps the main loop draining, so all + // notifications are delivered before the timeout fires. + ss.updChanBufSize = 0 + ss.notifSendTimeout = 20 * time.Millisecond + + if err := ss.Start(); err != nil { + t.Fatalf("Start: %v", err) + } + defer ss.Stop() + + // Initial sync: one notification + SyncResponse. + respChan <- interfaceDescriptionNotif("ethernet-1/1", "pre-sync") + respChan <- syncRespMsg() + + // Wait for the first ApplyToRunning call to start (it will block there). + select { + case <-store.firstStarted: + case <-ctx.Done(): + t.Fatal("timed out waiting for first ApplyToRunning to start") + } + + // While ApplyToRunning is blocked, send post-sync notifications. + // With the buggy implementation the consumer goroutine is blocked, so + // pool workers cannot deliver to the zero-buffer updChan and will drop + // after 20 ms. With the fix the goroutine handoff keeps the consumer + // running and all notifications reach the fresh syncTree. + const postSyncCount = 5 + for i := 0; i < postSyncCount; i++ { + respChan <- interfaceDescriptionNotif("ethernet-1/2", "post-sync") + } + + // Wait longer than notifSendTimeout so that drops occur with the old code. + time.Sleep(100 * time.Millisecond) + + // Unblock the first ApplyToRunning. + close(store.firstUnblock) + + // Wait for the second ApplyToRunning call (ticker commit of post-sync tree). + // The ticker fires every 5 s; allow up to 3 windows. + deadline := time.Now().Add(20 * time.Second) + for time.Now().Before(deadline) { + if store.calls.Load() >= 2 { + break + } + time.Sleep(100 * time.Millisecond) + } + + if got := store.calls.Load(); got < 2 { + t.Errorf("ApplyToRunning called %d time(s); want ≥2 — post-sync notifications appear to have been dropped", got) + } +} + +// TestBuildTreeSyncWithDatastore_NewEmptyTreeFailureExits verifies that +// buildTreeSyncWithDatastore exits cleanly rather than panicking when +// NewEmptyTree fails after the initial sync handoff. +func TestBuildTreeSyncWithDatastore_NewEmptyTreeFailureExits(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + sc, schemaConf, err := testhelper.InitSDCIOSchema() + if err != nil { + t.Fatalf("init schema: %v", err) + } + scb := schemaClientPkg.NewSchemaClientBound(schemaConf, sc) + sharedPool := pool.NewSharedTaskPool(ctx, runtime.GOMAXPROCS(0)) + + store := &failAfterFirstNewEmptyTree{ + inner: newFakeRunningStore(scb, sharedPool), + } + + respChan := make(chan *gnmi.SubscribeResponse, 10) + target := &fakeSyncTarget{ + respChan: respChan, + errChan: make(chan error, 1), + } + + ss := NewStreamSync(ctx, target, &config.SyncProtocol{ + Name: "test", + Paths: []string{"/"}, + Mode: "on-change", + }, store, scb, sharedPool) + ss.updChanBufSize = 0 + ss.notifSendTimeout = 20 * time.Millisecond + + if err := ss.Start(); err != nil { + t.Fatalf("Start: %v", err) + } + defer ss.Stop() + + // Close the inner firstUnblock immediately so the first ApplyToRunning + // does not stall; we just want NewEmptyTree to fail on the second call. + close(store.inner.firstUnblock) + + respChan <- interfaceDescriptionNotif("ethernet-1/1", "init") + respChan <- syncRespMsg() + + // The goroutine should exit within a short timeout; the test passes if + // there is no panic and the context is not leaked. + time.Sleep(2 * time.Second) +} + +// failAfterFirstNewEmptyTree wraps fakeRunningStore but returns an error on +// the second NewEmptyTree call (simulating the post-handoff allocation failure). +type failAfterFirstNewEmptyTree struct { + inner *fakeRunningStore + calls atomic.Int32 +} + +func (f *failAfterFirstNewEmptyTree) ApplyToRunning(ctx context.Context, d []*sdcpb.Path, i treeimporter.ImportConfigAdapter) error { + return f.inner.ApplyToRunning(ctx, d, i) +} + +func (f *failAfterFirstNewEmptyTree) NewEmptyTree(ctx context.Context) (*tree.RootEntry, error) { + if f.calls.Add(1) > 1 { + return nil, context.DeadlineExceeded + } + return f.inner.NewEmptyTree(ctx) +} From 409ee23d78161ae21f6aa5d701f11c319861c7c6 Mon Sep 17 00:00:00 2001 From: Markus Vahlenkamp Date: Thu, 28 May 2026 16:51:56 +0200 Subject: [PATCH 2/2] Delete docs/adr/0003-streamsync-goroutine-handoff-for-notification-drop-prevention.md --- ...andoff-for-notification-drop-prevention.md | 48 ------------------- 1 file changed, 48 deletions(-) delete mode 100644 docs/adr/0003-streamsync-goroutine-handoff-for-notification-drop-prevention.md diff --git a/docs/adr/0003-streamsync-goroutine-handoff-for-notification-drop-prevention.md b/docs/adr/0003-streamsync-goroutine-handoff-for-notification-drop-prevention.md deleted file mode 100644 index bfd206a7..00000000 --- a/docs/adr/0003-streamsync-goroutine-handoff-for-notification-drop-prevention.md +++ /dev/null @@ -1,48 +0,0 @@ -# ADR 0003: StreamSync goroutine handoff to prevent post-SyncResponse notification drops - -## Status - -Accepted — 2026-05-28 - -## Context - -`StreamSync.buildTreeSyncWithDatastore` ran a single `for/select` loop that both drained the incoming notification channel (`updChan`) **and** called `syncToRunning` inline. `syncToRunning` calls `ApplyToRunning` → `performRevert`, which loads all non-running intents, validates the merged tree, and pushes revert deltas to the device. This can take several seconds. - -While `syncToRunning` held the goroutine, no notifications could be consumed from `updChan`. Pool workers delivering notifications blocked until `notificationSendTimeout` (5 s) expired, at which point the update was silently and permanently dropped. Any notification dropped during this window left the running view of the device permanently incomplete until the datastore was recreated. - -A concurrent race (problem B) compounded this: the `SyncResponse` signal was sent directly to the consumer goroutine while in-flight pool workers were still writing their notifications to `updChan`. Some post-`SyncResponse` onChange notifications arrived while the goroutine was blocked in `syncToRunning`, causing the drops. - -A real CI failure confirmed the impact: the mandatory `password` leaf for the `admin` user was permanently absent from the running tree, causing every subsequent `TransactionSet` validation to fail for 34+ minutes. - -## Decision - -**Primary fix (Problem A):** apply a goroutine handoff in `buildTreeSyncWithDatastore`. - -On each `syncResponse` or ticker commit event, the current `syncTree` is snapshotted as `treeToCommit`, `syncTree` is immediately reset to a fresh empty tree (from `NewEmptyTree`), and `syncToRunning` is called in a background goroutine. The main loop never blocks on a commit — it keeps draining `updChan` at all times. - -Key implementation details: -- An `atomic.Bool syncRunning` tracks whether a commit goroutine is in flight. The ticker case skips if `syncRunning` is true (the running goroutine will commit the accumulated state). The `syncResponse` case always spawns since the initial sync commit must not be skipped. -- Both cases set `syncRunning.Store(true)` before spawning; the goroutine calls `defer syncRunning.Store(false)`. -- If `NewEmptyTree` fails, `buildTreeSyncWithDatastore` logs and returns rather than continuing with a shared tree that is also in use by the spawned goroutine. -- The local `syncTreeMutex` previously passed to `syncToRunning` is removed: with the handoff, each goroutine owns its own snapshot so there is no shared mutable local tree. Concurrent calls to `ApplyToRunning` are serialized by `d.syncTreeMutex` inside the datastore, which was always the authoritative guard. -- The `syncToRunning` signature is simplified: it no longer returns a `*tree.RootEntry` (the fresh tree is created in the main loop before spawning) and no longer accepts a `*sync.Mutex` parameter. - -**Problem B (post-SyncResponse notification lag):** with Problem A fixed, notifications that race past the `SyncResponse` signal land in the fresh `syncTree` and are committed by the next 5-second ticker. This is a bounded, self-healing lag (≤ 5 s) rather than permanent loss. A full per-window drain (Option 4 in the issue) was considered but deferred: it adds pool lifecycle complexity and the bounded lag is acceptable given that `ApplyToRunning` is idempotent and the ticker commits continuously. - -## Alternatives considered - -- **Increase `updChan` buffer** — raises the threshold at which drops occur, does not eliminate the root cause. -- **Remove drop timeout** — pushes the problem upstream; a stuck `performRevert` would freeze the whole stream pipeline instead of logging a drop. -- **Per-window drain (`CloseAndWait`)** — eliminates the 5-second Problem B lag but must be combined with the goroutine handoff to be safe; deferred as a follow-up. -- **Decouple `performRevert` from `ApplyToRunning`** — architecturally cleanest long-term direction (the `// TODO` comment in `sync.go` already flags this); larger refactor, tracked separately. - -## Consequences - -- **Positive:** Post-`SyncResponse` notifications are never permanently dropped regardless of how long `performRevert` takes. -- **Positive:** The `syncToRunning` signature is simpler (no mutex parameter, no tree return value). -- **Negative:** A bounded lag (≤ 5 s) remains for the narrow race where post-`SyncResponse` onChange notifications arrive before the ticker fires; acceptable given idempotent apply and continuous ticker commits. -- **Observability:** Any `NewEmptyTree` failure now causes `buildTreeSyncWithDatastore` to exit and log, surfacing a previously silent degraded state. - -## Related - -- [GitHub issue #440](https://github.com/sdcio/data-server/issues/440)