Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ go 1.25.0

replace github.com/openconfig/goyang v1.6.0 => github.com/sdcio/goyang v1.6.2-2

// Pinned to open PRs; replace with tagged releases once merged:
// - sdcio/goyang PR#4: fix(ApplyDeviate): replace Exts on deviate replace, not append
// - sdcio/sdc-protos PR#123: feat(sensitive): schema.Path + LeafSchema.sensitive flag
replace (
github.com/sdcio/goyang v1.6.2-2 => github.com/sdcio/goyang v1.6.2-2.0.20260608121857-4668a077cf72
github.com/sdcio/sdc-protos v0.0.54 => github.com/sdcio/sdc-protos v0.0.55-0.20260610090020-aeb8edf494c4
)

require (
github.com/AlekSi/pointer v1.2.0
github.com/beevik/etree v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ github.com/sdcio/logger v0.0.3 h1:IFUbObObGry+S8lHGwOQKKRxJSuOphgRU/hxVhOdMOM=
github.com/sdcio/logger v0.0.3/go.mod h1:yWaOxK/G6vszjg8tKZiMqiEjlZouHsjFME4zSk+SAEA=
github.com/sdcio/schema-server v0.0.34 h1:NNDOkvtUMONtBA7cVvN96F+FWGD/Do6HNqfchy9B8eI=
github.com/sdcio/schema-server v0.0.34/go.mod h1:6t8HLXpqUqEJmE5yNZh29u/KZw0jlOICdNWns7zE4GE=
github.com/sdcio/sdc-protos v0.0.54 h1:1EbtU9ZbbJHFPOFGi5aW8Th79cuY9i+AJaP0ABVx8hw=
github.com/sdcio/sdc-protos v0.0.54/go.mod h1:YMLHbey0/aL1qtLW8csSYVPafsgnnn7aY54HkV5dbyQ=
github.com/sdcio/sdc-protos v0.0.55-0.20260610090020-aeb8edf494c4 h1:M1C1wzt2ni0fssvPEaMNPdHNKqTzBtT8VTT3/EzbrCE=
github.com/sdcio/sdc-protos v0.0.55-0.20260610090020-aeb8edf494c4/go.mod h1:NsvzvHnTonLcwQ/WNzxzBCauQmqxpuviaW0wh7Lkrts=
github.com/sdcio/yang-parser v0.0.12 h1:RSSeqfAOIsJx5Lno5u4/ezyOmQYUduQ22rBfU/mtpJ4=
github.com/sdcio/yang-parser v0.0.12/go.mod h1:CBqn3Miq85qmFVGHxHXHLluXkaIOsTzV06IM4DW6+D4=
github.com/sirikothe/gotextfsm v1.0.1-0.20200816110946-6aa2cfd355e4 h1:FHUL2HofYJuslFOQdy/JjjP36zxqIpd/dcoiwLMIs7k=
Expand Down
484 changes: 20 additions & 464 deletions mocks/mocktreeentry/entry.go

Large diffs are not rendered by default.

61 changes: 47 additions & 14 deletions pkg/datastore/datastore_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package datastore
import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"time"
Expand All @@ -35,7 +36,10 @@ import (
"github.com/sdcio/data-server/pkg/pool"
"github.com/sdcio/data-server/pkg/schema"
"github.com/sdcio/data-server/pkg/tree"
"github.com/sdcio/data-server/pkg/tree/ops"
"github.com/sdcio/data-server/pkg/tree/processors"
treetypes "github.com/sdcio/data-server/pkg/tree/types"
tree_persist "github.com/sdcio/sdc-protos/tree_persist"
)

type Datastore struct {
Expand Down Expand Up @@ -69,6 +73,10 @@ type Datastore struct {
syncTree *tree.RootEntry
syncTreeMutex *sync.RWMutex

// sensitivePathIndex is the in-memory reverse index of sensitive paths.
// It is populated at startup and updated on every intent write or delete.
sensitivePathIndex *treetypes.SensitivePathIndex

taskPool *pool.SharedTaskPool
}

Expand Down Expand Up @@ -104,24 +112,31 @@ func New(ctx context.Context, c *config.DatastoreConfig, sc schema.Client, cc ca
ccb := cache.NewCacheClientBound(c.Name, cc)

ds := &Datastore{
config: c,
schemaClient: scb,
ctx: ctx,
cfn: cancel,
cacheClient: ccb,
m: &sync.RWMutex{},
dmutex: &sync.Mutex{},
deviationClients: make(map[sdcpb.DataServer_WatchDeviationsServer]string),
syncTree: syncTreeRoot,
syncTreeMutex: &sync.RWMutex{},
taskPool: pool.NewSharedTaskPool(ctx, runtime.GOMAXPROCS(0)),
config: c,
schemaClient: scb,
ctx: ctx,
cfn: cancel,
cacheClient: ccb,
m: &sync.RWMutex{},
dmutex: &sync.Mutex{},
deviationClients: make(map[sdcpb.DataServer_WatchDeviationsServer]string),
syncTree: syncTreeRoot,
syncTreeMutex: &sync.RWMutex{},
sensitivePathIndex: treetypes.NewSensitivePathIndex(),
taskPool: pool.NewSharedTaskPool(ctx, runtime.GOMAXPROCS(0)),
}
ds.transactionManager = types.NewTransactionManager(NewDatastoreRollbackAdapter(ds))

// create cache instance if needed
// this is a blocking call
ds.initCache(ctx)

// populate the sensitive-path index from all existing intents in cache.
if err := populateSensitivePathIndex(ctx, ds.sensitivePathIndex, ccb); err != nil {
cancel()
return nil, fmt.Errorf("failed to populate sensitive path index: %w", err)
}

go func() {
// init sbi, this is a blocking call
err := ds.connectSBI(ctx, opts...)
Expand Down Expand Up @@ -228,7 +243,7 @@ func (d *Datastore) Stop(ctx context.Context) error {
return nil
}

func (d *Datastore) BlameConfig(ctx context.Context, includeDefaults bool) (*sdcpb.BlameTreeElement, error) {
func (d *Datastore) BlameConfig(ctx context.Context, includeDefaults, exposeSensitive bool) (*sdcpb.BlameTreeElement, error) {
log := logger.FromContext(ctx).WithName("BlameConfig")
ctx = logger.IntoContext(ctx, log)

Expand All @@ -239,13 +254,19 @@ func (d *Datastore) BlameConfig(ctx context.Context, includeDefaults bool) (*sdc
if err != nil {
return nil, err
}
// load all intents
// load all intents into the tree for blame; sensitive paths come from the index.
_, err = d.LoadAllButRunningIntents(ctx, root)
if err != nil {
return nil, err
}

bcp := processors.NewBlameConfigProcessor(&processors.BlameConfigProcessorParams{IncludeDefaults: includeDefaults})
bcp := processors.NewBlameConfigProcessor(&processors.BlameConfigProcessorParams{
IncludeDefaults: includeDefaults,
RenderOpts: ops.RenderOpts{
IncludeSensitive: exposeSensitive,
SensitivePathSet: d.sensitivePathIndex,
},
})
bte, err := bcp.Run(ctx, root.Entry, d.taskPool)

// set the root level elements name to the target name
Expand All @@ -272,3 +293,15 @@ func (dra *DatastoreRollbackAdapter) TransactionRollback(ctx context.Context, tr

// Assure the types.RollbackInterface is implemented by the DatastoreRollbackAdapter
var _ types.RollbackInterface = &DatastoreRollbackAdapter{}

// populateSensitivePathIndex scans all intents in cache and loads their
// sensitive_paths into s. It is called once during Datastore startup, before
// the first northbound read is served.
func populateSensitivePathIndex(ctx context.Context, s *treetypes.SensitivePathIndex, cc cache.CacheClientBound) error {
return forEachIntent(ctx, cc, nil, func(intent *tree_persist.Intent) error {
if len(intent.GetSensitivePaths()) > 0 {
s.Set(intent.GetIntentName(), intent.GetSensitivePaths())
}
return nil
})
}
15 changes: 14 additions & 1 deletion pkg/datastore/deviations.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,16 @@ func (d DeviationsStats) LogValue() map[string]map[string]int {
return stats
}

// SendDeviations reads deviation entries from ch and forwards them to all
// active deviation clients. Sensitivity redaction is applied upstream in the
// ops layer (GetDeviations with IncludeSensitive=false), so no masking is done
// here.
func (d *Datastore) SendDeviations(ctx context.Context, ch <-chan *treetypes.DeviationEntry, deviationClients map[string]sdcpb.DataServer_WatchDeviationsServer) {
log := logf.FromContext(ctx)
deviationsStats := make(DeviationsStats)
for deviation := range ch {
deviationsStats.Add(deviation)

for clientIdentifier, dc := range deviationClients {
if dc.Context().Err() != nil {
continue
Expand Down Expand Up @@ -212,7 +217,15 @@ func (d *Datastore) calculateDeviations(ctx context.Context) (<-chan *treetypes.
deviationChan <- treetypes.NewDeviationEntry(n, treetypes.DeviationReasonIntentExists, nil)
}

err := ops.GetDeviations(ctx, deviationTree.Entry, &ops.GetDeviationParams{Ch: deviationChan}, d.taskPool)
// IncludeSensitive=false: redaction is applied in the ops layer so that
// sensitive values never leave the tree as plaintext.
err := ops.GetDeviations(ctx, deviationTree.Entry, &ops.GetDeviationParams{
Ch: deviationChan,
RenderOpts: ops.RenderOpts{
IncludeSensitive: false,
SensitivePathSet: d.sensitivePathIndex,
},
}, d.taskPool)
if err != nil {
log.Error(err, "failed to run deviation processor")
}
Expand Down
183 changes: 183 additions & 0 deletions pkg/datastore/deviations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package datastore

import (
"context"
"runtime"
"sync"
"testing"

"github.com/openconfig/ygot/ygot"
"github.com/sdcio/data-server/mocks/mockcacheclient"
"github.com/sdcio/data-server/pkg/config"
"github.com/sdcio/data-server/pkg/pool"
schemaClientPkg "github.com/sdcio/data-server/pkg/datastore/clients/schema"
treetypes "github.com/sdcio/data-server/pkg/tree/types"
"github.com/sdcio/data-server/pkg/utils/testhelper"
sdcio_schema "github.com/sdcio/data-server/tests/sdcioygot"
tree_persist "github.com/sdcio/sdc-protos/tree_persist"
sdcpb "github.com/sdcio/sdc-protos/sdcpb"
"go.uber.org/mock/gomock"
"google.golang.org/grpc/metadata"
)

// fakeDeviationStream is a minimal in-memory DataServer_WatchDeviationsServer.
// Only Send and Context are called by SendDeviations; everything else is a no-op.
type fakeDeviationStream struct {
ctx context.Context
mu sync.Mutex
received []*sdcpb.WatchDeviationResponse
}

func newFakeDeviationStream(ctx context.Context) *fakeDeviationStream {
return &fakeDeviationStream{ctx: ctx}
}

func (f *fakeDeviationStream) Send(r *sdcpb.WatchDeviationResponse) error {
f.mu.Lock()
defer f.mu.Unlock()
f.received = append(f.received, r)
return nil
}

func (f *fakeDeviationStream) updateResponses() []*sdcpb.WatchDeviationResponse {
f.mu.Lock()
defer f.mu.Unlock()
var out []*sdcpb.WatchDeviationResponse
for _, r := range f.received {
if r.GetEvent() == sdcpb.DeviationEvent_UPDATE {
out = append(out, r)
}
}
return out
}

func (f *fakeDeviationStream) Context() context.Context { return f.ctx }
func (f *fakeDeviationStream) SetHeader(metadata.MD) error { return nil }
func (f *fakeDeviationStream) SendHeader(metadata.MD) error { return nil }
func (f *fakeDeviationStream) SetTrailer(metadata.MD) {}
func (f *fakeDeviationStream) SendMsg(any) error { return nil }
func (f *fakeDeviationStream) RecvMsg(any) error { return nil }

var _ sdcpb.DataServer_WatchDeviationsServer = (*fakeDeviationStream)(nil)

// deviationEntry builds a DeviationEntry with string-valued ExpectedValue and CurrentValue.
func deviationEntry(path *sdcpb.Path, expected, current string) *treetypes.DeviationEntry {
return treetypes.NewDeviationEntry("test-intent", treetypes.DeviationReasonNotApplied, path).
SetExpectedValue(&sdcpb.TypedValue{Value: &sdcpb.TypedValue_StringVal{StringVal: expected}}).
SetCurrentValue(&sdcpb.TypedValue{Value: &sdcpb.TypedValue_StringVal{StringVal: current}})
}

// simplePath builds a root-based sdcpb.Path with a single element.
func simplePath(name string) *sdcpb.Path {
return &sdcpb.Path{
IsRootBased: true,
Elem: []*sdcpb.PathElem{{Name: name}},
}
}

// TestSendDeviations_PassthroughUnredacted verifies that SendDeviations forwards
// deviation entries to clients without modifying values. Sensitivity redaction is
// now the responsibility of the ops layer (GetDeviations), not the datastore layer.
func TestSendDeviations_PassthroughUnredacted(t *testing.T) {
ctx := context.Background()

ds := &Datastore{
config: &config.DatastoreConfig{Name: "test-ds", Validation: config.NewValidationConfig()},
}

path := simplePath("plain-leaf")
ch := make(chan *treetypes.DeviationEntry, 1)
ch <- deviationEntry(path, "expected-value", "current-value")
close(ch)

stream := newFakeDeviationStream(ctx)
ds.SendDeviations(ctx, ch, map[string]sdcpb.DataServer_WatchDeviationsServer{"fake": stream})

updates := stream.updateResponses()
if len(updates) != 1 {
t.Fatalf("expected 1 UPDATE response, got %d", len(updates))
}
if got := updates[0].GetExpectedValue().GetStringVal(); got != "expected-value" {
t.Errorf("ExpectedValue = %q, want %q", got, "expected-value")
}
if got := updates[0].GetCurrentValue().GetStringVal(); got != "current-value" {
t.Errorf("CurrentValue = %q, want %q", got, "current-value")
}
}

// TestWatchDeviations_SensitivePathMasking is the integration acceptance test
// for Issue 05.
//
// Scenario: one intent sets "patterntest" to "running-secret"; a second (marker)
// intent marks /patterntest in its sensitive_paths. The sync tree has no value
// for patterntest, so the deviation engine sees a mismatch. Both ExpectedValue
// and CurrentValue in the emitted deviation must be "***".
func TestWatchDeviations_SensitivePathMasking(t *testing.T) {
ctx := context.Background()

sc, schema, err := testhelper.InitSDCIOSchema()
if err != nil {
t.Fatal(err)
}
// Plain schema client — patterntest is NOT schema-sensitive.
scb := schemaClientPkg.NewSchemaClientBound(schema, sc)

runningDevice := &sdcio_schema.Device{Patterntest: ygot.String("running-secret")}
dataIntent := buildTestIntent(t, ctx, scb, runningDevice, "data-intent", 10, nil)
markerIntent := buildTestIntent(t, ctx, scb, nil, "marker-intent", 5, []*sdcpb.Path{{Elem: []*sdcpb.PathElem{{Name: "patterntest"}}, IsRootBased: true}})

ctrl := gomock.NewController(t)
defer ctrl.Finish()

ccb := mockcacheclient.NewMockCacheClientBound(ctrl)
ccb.EXPECT().
IntentGetAll(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, _ []string, intentChan chan<- *tree_persist.Intent, errChan chan<- error) {
intentChan <- dataIntent
intentChan <- markerIntent
close(intentChan)
close(errChan)
})

// Pre-populate index with marker-intent's sensitive paths.
idx := treetypes.NewSensitivePathIndex()
idx.Set("marker-intent", markerIntent.GetSensitivePaths())

// Empty sync tree → running device has no patterntest → triggers deviation.
syncRoot := buildEmptySyncTree(t, ctx, scb)

ds := &Datastore{
config: &config.DatastoreConfig{Name: "test-ds", Validation: config.NewValidationConfig()},
syncTree: syncRoot,
syncTreeMutex: &sync.RWMutex{},
taskPool: pool.NewSharedTaskPool(ctx, runtime.GOMAXPROCS(0)),
cacheClient: ccb,
schemaClient: scb,
sensitivePathIndex: idx,
}

deviationChan, calcErr := ds.calculateDeviations(ctx)
if calcErr != nil {
t.Fatalf("calculateDeviations() error: %v", calcErr)
}

stream := newFakeDeviationStream(ctx)
ds.SendDeviations(ctx, deviationChan, map[string]sdcpb.DataServer_WatchDeviationsServer{"fake": stream})

found := false
for _, r := range stream.updateResponses() {
elems := r.GetPath().GetElem()
if len(elems) > 0 && elems[len(elems)-1].GetName() == "patterntest" {
found = true
if got := r.GetExpectedValue().GetStringVal(); got != "***" {
t.Errorf("patterntest ExpectedValue = %q, want %q", got, "***")
}
if got := r.GetCurrentValue().GetStringVal(); got != "***" {
t.Errorf("patterntest CurrentValue = %q, want %q", got, "***")
}
}
}
if !found {
t.Error("no deviation found for patterntest — check that the deviation engine emits it")
}
}
Loading
Loading