diff --git a/pkg/datastore/target/gnmi/stream.go b/pkg/datastore/target/gnmi/stream.go index b8816924..2ce3d5af 100644 --- a/pkg/datastore/target/gnmi/stream.go +++ b/pkg/datastore/target/gnmi/stream.go @@ -3,7 +3,7 @@ package gnmi import ( "context" "errors" - "sync" + "sync/atomic" "time" "github.com/openconfig/gnmi/proto/gnmi" @@ -25,7 +25,8 @@ import ( const ( syncSignalBufferSize = 1 - notificationSendTimeout = 5 * time.Second + defaultUpdChanBufSize = 20 + defaultNotifSendTimeout = 5 * time.Second notificationSlowSendWarnTime = 500 * time.Millisecond ) @@ -37,6 +38,13 @@ type StreamSync struct { runningStore types.RunningStore schemaClient dsutils.SchemaClientBound vpoolFactory pool.VirtualPoolFactory + + // updChanBufSize controls the notification channel buffer. Default is + // defaultUpdChanBufSize; tests may override before calling Start. + updChanBufSize int + // notifSendTimeout is the per-notification delivery deadline. Default is + // defaultNotifSendTimeout; tests may override before calling Start. + notifSendTimeout time.Duration } func NewStreamSync(ctx context.Context, target SyncTarget, c *config.SyncProtocol, runningStore types.RunningStore, schemaClient dsutils.SchemaClientBound, vpoolFactory pool.VirtualPoolFactory) *StreamSync { @@ -47,13 +55,15 @@ func NewStreamSync(ctx context.Context, target SyncTarget, c *config.SyncProtoco ctx = logger.IntoContext(ctx, log) return &StreamSync{ - config: c, - target: target, - cancel: cancel, - runningStore: runningStore, - schemaClient: schemaClient, - ctx: ctx, - vpoolFactory: vpoolFactory, + config: c, + target: target, + cancel: cancel, + runningStore: runningStore, + schemaClient: schemaClient, + ctx: ctx, + vpoolFactory: vpoolFactory, + updChanBufSize: defaultUpdChanBufSize, + notifSendTimeout: defaultNotifSendTimeout, } } @@ -110,7 +120,7 @@ func (s *StreamSync) Start() error { log := logger.FromContext(s.ctx) log.Info("Starting Sync") - updChan := make(chan *NotificationData, 20) + updChan := make(chan *NotificationData, s.updChanBufSize) // Keep a single pending sync signal to avoid blocking the subscribe loop. syncResponse := make(chan struct{}, syncSignalBufferSize) @@ -128,6 +138,13 @@ func (s *StreamSync) Start() error { return nil } +// buildTreeSyncWithDatastore accumulates incoming notifications into a local +// syncTree and commits it to Running on SyncResponse and every ticker tick. +// +// The commit is handed off to a background goroutine so the main select loop +// never blocks on ApplyToRunning. This prevents notifications from timing out +// and being silently dropped while a slow performRevert holds the goroutine +// (see GitHub issue #440). func (s *StreamSync) buildTreeSyncWithDatastore(cUS <-chan *NotificationData, syncResponse <-chan struct{}) { log := logger.FromContext(s.ctx) syncTree, err := s.runningStore.NewEmptyTree(s.ctx) @@ -135,15 +152,18 @@ func (s *StreamSync) buildTreeSyncWithDatastore(cUS <-chan *NotificationData, sy log.Error(err, "failure creating new sync tree") return } - syncTreeMutex := &sync.Mutex{} - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - // disable ticker until after the initial full sync is done - tickerActive := false uif := treetypes.NewUpdateInsertFlags() + // tickerChan starts as nil (receives from a nil channel block forever in + // Go, so the ticker case never fires). It is set to a real ticker after + // the initial SyncResponse so the ticker only runs post-initial-sync. + var tickerChan <-chan time.Time + + // syncRunning is true while a syncToRunning goroutine is in flight. + // The ticker case skips when true; the syncResponse case always proceeds. + var syncRunning atomic.Bool + for { select { case <-s.ctx.Done(): @@ -159,21 +179,42 @@ func (s *StreamSync) buildTreeSyncWithDatastore(cUS <-chan *NotificationData, sy } syncTree.GetTreeContext().ExplicitDeletes().Add(consts.RunningIntentName, consts.RunningValuesPrio, noti.deletes) case <-syncResponse: - syncTree, err = s.syncToRunning(syncTree, syncTreeMutex, true) - tickerActive = true + treeToCommit := syncTree + syncTree, err = s.runningStore.NewEmptyTree(s.ctx) if err != nil { - log.Error(err, "failed committing synctree to running") + log.Error(err, "failure creating new sync tree after sync response") + return + } + if tickerChan == nil { + t := time.NewTicker(5 * time.Second) + defer t.Stop() + tickerChan = t.C } - case <-ticker.C: - if !tickerActive { - log.Info("Skipping a sync tick - initial sync not finished yet") + syncRunning.Store(true) + go func() { + defer syncRunning.Store(false) + if err := s.syncToRunning(treeToCommit, true); err != nil { + log.Error(err, "failed committing synctree to running") + } + }() + case <-tickerChan: + if syncRunning.Load() { continue } log.Info("SyncRunning due to ticker") - syncTree, err = s.syncToRunning(syncTree, syncTreeMutex, true) + treeToCommit := syncTree + syncTree, err = s.runningStore.NewEmptyTree(s.ctx) if err != nil { - log.Error(err, "failed committing synctree to running") + log.Error(err, "failure creating new sync tree after ticker") + return } + syncRunning.Store(true) + go func() { + defer syncRunning.Store(false) + if err := s.syncToRunning(treeToCommit, true); err != nil { + log.Error(err, "failed committing synctree to running") + } + }() } } } @@ -186,8 +227,8 @@ func (s *StreamSync) gnmiSubscribe(subReq *gnmi.SubscribeRequest, updChan chan<- respChan, errChan := s.target.Subscribe(s.ctx, subReq, s.config.Name) taskPool := s.vpoolFactory.NewVirtualPool(pool.VirtualTolerant) - defer taskPool.CloseForSubmit() - taskParams := NewNotificationProcessorTaskParameters(updChan, s.schemaClient) + defer func() { taskPool.CloseForSubmit() }() + taskParams := NewNotificationProcessorTaskParameters(updChan, s.schemaClient, s.notifSendTimeout) syncStartTime := time.Now() for { @@ -228,10 +269,11 @@ func (s *StreamSync) gnmiSubscribe(subReq *gnmi.SubscribeRequest, updChan chan<- } } -func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, m *sync.Mutex, logCount bool) (*tree.RootEntry, error) { +// syncToRunning exports syncTree and applies it to Running. It is called from +// background goroutines spawned by buildTreeSyncWithDatastore; the caller owns +// syncTree exclusively and no mutex is needed. +func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, logCount bool) error { log := logger.FromContext(s.ctx) - m.Lock() - defer m.Unlock() startTime := time.Now() result, err := ops.TreeExport(syncTree.Entry, consts.RunningIntentName, consts.RunningValuesPrio, false) @@ -239,11 +281,10 @@ func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, m *sync.Mutex, logC if err != nil { if errors.Is(err, ops.ErrorIntentNotPresent) { log.Info("sync no config changes") - // all good no data present - return syncTree, nil + return nil } log.Error(err, "sync tree export error") - return s.runningStore.NewEmptyTree(s.ctx) + return err } // extract the explicit deletes deletes := result.ExplicitDeletes @@ -259,10 +300,10 @@ func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, m *sync.Mutex, logC err = s.runningStore.ApplyToRunning(s.ctx, deletes, proto.NewProtoTreeImporter(result)) if err != nil { log.Error(err, "failed importing sync to running") - return s.runningStore.NewEmptyTree(s.ctx) + return err } log.V(logger.VTrace).Info("import to running tree done", "duration", time.Since(startTime).String()) - return s.runningStore.NewEmptyTree(s.ctx) + return nil } type SyncTarget interface { @@ -282,12 +323,14 @@ type notificationProcessorTask struct { type NotificationProcessorTaskParameters struct { notificationResult chan<- *NotificationData schemaClientBound dsutils.SchemaClientBound + sendTimeout time.Duration } -func NewNotificationProcessorTaskParameters(notificationResult chan<- *NotificationData, scb dsutils.SchemaClientBound) *NotificationProcessorTaskParameters { +func NewNotificationProcessorTaskParameters(notificationResult chan<- *NotificationData, scb dsutils.SchemaClientBound, sendTimeout time.Duration) *NotificationProcessorTaskParameters { return &NotificationProcessorTaskParameters{ notificationResult: notificationResult, schemaClientBound: scb, + sendTimeout: sendTimeout, } } @@ -310,8 +353,8 @@ func (t *notificationProcessorTask) sendNotificationData(ctx context.Context, da log.Info("slow notification delivery to sync loop", "duration", duration.String()) } return nil - case <-time.After(notificationSendTimeout): - log.Error(context.DeadlineExceeded, "notification delivery timeout, dropping update", "timeout", notificationSendTimeout.String()) + case <-time.After(t.params.sendTimeout): + log.Error(context.DeadlineExceeded, "notification delivery timeout, dropping update", "timeout", t.params.sendTimeout.String()) return nil } } diff --git a/pkg/datastore/target/gnmi/stream_test.go b/pkg/datastore/target/gnmi/stream_test.go new file mode 100644 index 00000000..ec332bb5 --- /dev/null +++ b/pkg/datastore/target/gnmi/stream_test.go @@ -0,0 +1,256 @@ +package gnmi + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/openconfig/gnmi/proto/gnmi" + schemaClientPkg "github.com/sdcio/data-server/pkg/datastore/clients/schema" + "github.com/sdcio/data-server/pkg/config" + "github.com/sdcio/data-server/pkg/pool" + "github.com/sdcio/data-server/pkg/tree" + treeimporter "github.com/sdcio/data-server/pkg/tree/importer" + "github.com/sdcio/data-server/pkg/utils/testhelper" + sdcpb "github.com/sdcio/sdc-protos/sdcpb" +) + +// fakeRunningStore uses real tree creation but intercepts ApplyToRunning. +// The first call blocks until firstUnblock is closed; all subsequent calls +// return immediately so the test can observe the call count. +type fakeRunningStore struct { + sc *schemaClientPkg.SchemaClientBoundImpl + vp pool.VirtualPoolFactory + + firstUnblock chan struct{} + firstStarted chan struct{} + firstStartedOnce sync.Once + + calls atomic.Int32 +} + +func newFakeRunningStore(sc *schemaClientPkg.SchemaClientBoundImpl, vp pool.VirtualPoolFactory) *fakeRunningStore { + return &fakeRunningStore{ + sc: sc, + vp: vp, + firstUnblock: make(chan struct{}), + firstStarted: make(chan struct{}), + } +} + +func (f *fakeRunningStore) ApplyToRunning(ctx context.Context, _ []*sdcpb.Path, _ treeimporter.ImportConfigAdapter) error { + f.firstStartedOnce.Do(func() { close(f.firstStarted) }) + select { + case <-f.firstUnblock: + case <-ctx.Done(): + return ctx.Err() + } + f.calls.Add(1) + return nil +} + +func (f *fakeRunningStore) NewEmptyTree(ctx context.Context) (*tree.RootEntry, error) { + tc := tree.NewTreeContext(f.sc, f.vp) + return tree.NewTreeRoot(ctx, tc) +} + +// fakeSyncTarget implements SyncTarget with channels the test controls. +type fakeSyncTarget struct { + respChan chan *gnmi.SubscribeResponse + errChan chan error +} + +func (f *fakeSyncTarget) Subscribe(_ context.Context, _ *gnmi.SubscribeRequest, _ string) (chan *gnmi.SubscribeResponse, chan error) { + return f.respChan, f.errChan +} + +// interfaceDescriptionNotif returns a gNMI SubscribeResponse_Update for +// /interface[name=ifname]/description = desc. +func interfaceDescriptionNotif(ifname, desc string) *gnmi.SubscribeResponse { + return &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Update: []*gnmi.Update{ + { + Path: &gnmi.Path{ + Elem: []*gnmi.PathElem{ + {Name: "interface", Key: map[string]string{"name": ifname}}, + {Name: "description"}, + }, + }, + Val: &gnmi.TypedValue{ + Value: &gnmi.TypedValue_StringVal{StringVal: desc}, + }, + }, + }, + }, + }, + } +} + +func syncRespMsg() *gnmi.SubscribeResponse { + return &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}, + } +} + +// TestBuildTreeSyncWithDatastore_PostSyncNotificationsNotDropped is the +// regression test for GitHub issue #440. +// +// Scenario: ApplyToRunning is slow. Notifications that arrive after the +// SyncResponse is processed but while ApplyToRunning is blocked must not be +// dropped — they must be committed by the next ticker. +// +// Observable invariant: ApplyToRunning is called at least twice (once for the +// initial sync commit, and once for the ticker commit of post-sync +// notifications). If notifications were silently dropped, the fresh syncTree +// would be empty and the ticker's syncToRunning would return early without +// calling ApplyToRunning, leaving the count at 1. +func TestBuildTreeSyncWithDatastore_PostSyncNotificationsNotDropped(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + sc, schemaConf, err := testhelper.InitSDCIOSchema() + if err != nil { + t.Fatalf("init schema: %v", err) + } + scb := schemaClientPkg.NewSchemaClientBound(schemaConf, sc) + + sharedPool := pool.NewSharedTaskPool(ctx, runtime.GOMAXPROCS(0)) + store := newFakeRunningStore(scb, sharedPool) + + respChan := make(chan *gnmi.SubscribeResponse, 20) + target := &fakeSyncTarget{ + respChan: respChan, + errChan: make(chan error, 1), + } + + ss := NewStreamSync(ctx, target, &config.SyncProtocol{ + Name: "test", + Paths: []string{"/"}, + Mode: "on-change", + }, store, scb, sharedPool) + // Zero-buffer channel + short timeout forces a deterministic drop with the + // buggy implementation: pool workers block immediately (no receiver while + // the consumer goroutine is in ApplyToRunning) and time out after 20 ms. + // With the fix the goroutine handoff keeps the main loop draining, so all + // notifications are delivered before the timeout fires. + ss.updChanBufSize = 0 + ss.notifSendTimeout = 20 * time.Millisecond + + if err := ss.Start(); err != nil { + t.Fatalf("Start: %v", err) + } + defer ss.Stop() + + // Initial sync: one notification + SyncResponse. + respChan <- interfaceDescriptionNotif("ethernet-1/1", "pre-sync") + respChan <- syncRespMsg() + + // Wait for the first ApplyToRunning call to start (it will block there). + select { + case <-store.firstStarted: + case <-ctx.Done(): + t.Fatal("timed out waiting for first ApplyToRunning to start") + } + + // While ApplyToRunning is blocked, send post-sync notifications. + // With the buggy implementation the consumer goroutine is blocked, so + // pool workers cannot deliver to the zero-buffer updChan and will drop + // after 20 ms. With the fix the goroutine handoff keeps the consumer + // running and all notifications reach the fresh syncTree. + const postSyncCount = 5 + for i := 0; i < postSyncCount; i++ { + respChan <- interfaceDescriptionNotif("ethernet-1/2", "post-sync") + } + + // Wait longer than notifSendTimeout so that drops occur with the old code. + time.Sleep(100 * time.Millisecond) + + // Unblock the first ApplyToRunning. + close(store.firstUnblock) + + // Wait for the second ApplyToRunning call (ticker commit of post-sync tree). + // The ticker fires every 5 s; allow up to 3 windows. + deadline := time.Now().Add(20 * time.Second) + for time.Now().Before(deadline) { + if store.calls.Load() >= 2 { + break + } + time.Sleep(100 * time.Millisecond) + } + + if got := store.calls.Load(); got < 2 { + t.Errorf("ApplyToRunning called %d time(s); want ≥2 — post-sync notifications appear to have been dropped", got) + } +} + +// TestBuildTreeSyncWithDatastore_NewEmptyTreeFailureExits verifies that +// buildTreeSyncWithDatastore exits cleanly rather than panicking when +// NewEmptyTree fails after the initial sync handoff. +func TestBuildTreeSyncWithDatastore_NewEmptyTreeFailureExits(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + sc, schemaConf, err := testhelper.InitSDCIOSchema() + if err != nil { + t.Fatalf("init schema: %v", err) + } + scb := schemaClientPkg.NewSchemaClientBound(schemaConf, sc) + sharedPool := pool.NewSharedTaskPool(ctx, runtime.GOMAXPROCS(0)) + + store := &failAfterFirstNewEmptyTree{ + inner: newFakeRunningStore(scb, sharedPool), + } + + respChan := make(chan *gnmi.SubscribeResponse, 10) + target := &fakeSyncTarget{ + respChan: respChan, + errChan: make(chan error, 1), + } + + ss := NewStreamSync(ctx, target, &config.SyncProtocol{ + Name: "test", + Paths: []string{"/"}, + Mode: "on-change", + }, store, scb, sharedPool) + ss.updChanBufSize = 0 + ss.notifSendTimeout = 20 * time.Millisecond + + if err := ss.Start(); err != nil { + t.Fatalf("Start: %v", err) + } + defer ss.Stop() + + // Close the inner firstUnblock immediately so the first ApplyToRunning + // does not stall; we just want NewEmptyTree to fail on the second call. + close(store.inner.firstUnblock) + + respChan <- interfaceDescriptionNotif("ethernet-1/1", "init") + respChan <- syncRespMsg() + + // The goroutine should exit within a short timeout; the test passes if + // there is no panic and the context is not leaked. + time.Sleep(2 * time.Second) +} + +// failAfterFirstNewEmptyTree wraps fakeRunningStore but returns an error on +// the second NewEmptyTree call (simulating the post-handoff allocation failure). +type failAfterFirstNewEmptyTree struct { + inner *fakeRunningStore + calls atomic.Int32 +} + +func (f *failAfterFirstNewEmptyTree) ApplyToRunning(ctx context.Context, d []*sdcpb.Path, i treeimporter.ImportConfigAdapter) error { + return f.inner.ApplyToRunning(ctx, d, i) +} + +func (f *failAfterFirstNewEmptyTree) NewEmptyTree(ctx context.Context) (*tree.RootEntry, error) { + if f.calls.Add(1) > 1 { + return nil, context.DeadlineExceeded + } + return f.inner.NewEmptyTree(ctx) +}