From 2880c18a37c7d6f079d8c15b7f73a30b32dba692 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 18 May 2026 15:06:16 +0100 Subject: [PATCH 1/3] CBG-5367: add metadata store to dcp client for both CBS and rosmar --- base/dcp_client.go | 4 + base/dcp_client_test.go | 8 ++ base/gocb_dcp_client.go | 5 +- base/gocb_dcp_feed.go | 124 ------------------ base/rosmar_dcp_client.go | 8 +- db/attachment_compaction.go | 1 + db/background_mgr_attachment_migration.go | 5 +- ...ackground_mgr_attachment_migration_test.go | 12 +- db/background_mgr_resync_dcp.go | 1 + db/import_listener.go | 1 + go.mod | 4 +- go.sum | 8 +- 12 files changed, 37 insertions(+), 144 deletions(-) diff --git a/base/dcp_client.go b/base/dcp_client.go index 74fd2bbac3..7ccfac2230 100644 --- a/base/dcp_client.go +++ b/base/dcp_client.go @@ -48,6 +48,7 @@ type DCPClientOptions struct { Terminator chan bool // optional channel that can be closed to terminate the DCP feed, this will be replaced with a context option. FromLatestSequence bool // If true, start at latest sequence. FeedContent sgbucket.FeedContent // feedContent specifies whether the DCP feed should include values, xattrs, or both + MetadataStore DataStore } // NewDCPClient creates a new DCPClient to receive events from a bucket. @@ -64,6 +65,8 @@ func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DC return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix cannot be provided when MetadataStoreType is InMemory") } else if opts.MetadataStoreType == DCPMetadataStoreCS && opts.CheckpointPrefix == "" { return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix must be provided when MetadataStoreType is persistent") + } else if opts.MetadataStoreType == DCPMetadataStoreCS && opts.MetadataStore == nil { + return nil, fmt.Errorf("DCPClientOptions.MetadataStore must be provided when MetadataStoreType is CS") } underlyingBucket := GetBaseBucket(bucket) if _, ok := underlyingBucket.(*rosmar.Bucket); ok { @@ -122,6 +125,7 @@ func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DC FailOnRollback: opts.FailOnRollback, InitialMetadata: opts.InitialMetadata, FeedContent: opts.FeedContent, + MetadataStore: opts.MetadataStore, } if opts.FromLatestSequence { diff --git a/base/dcp_client_test.go b/base/dcp_client_test.go index fc80ab9597..ede90cd554 100644 --- a/base/dcp_client_test.go +++ b/base/dcp_client_test.go @@ -56,6 +56,7 @@ func TestOneShotDCP(t *testing.T) { OneShot: true, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), Callback: counterCallback, + MetadataStore: bucket.GetMetadataStore(), } dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions) @@ -117,6 +118,7 @@ func TestTerminateDCPFeed(t *testing.T) { OneShot: false, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), Callback: counterCallback, + MetadataStore: bucket.GetMetadataStore(), } dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions) @@ -460,6 +462,7 @@ func TestResumeStoppedFeed(t *testing.T) { OneShot: true, FailOnRollback: false, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + MetadataStore: bucket.GetMetadataStore(), } dcpClient = newDCPClientWithFastCheckpointing(t, bucket, dcpClientOpts) @@ -494,6 +497,7 @@ func TestResumeStoppedFeed(t *testing.T) { Callback: secondCallback, CollectionNames: NewCollectionNameSet(dataStore), CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + MetadataStore: bucket.GetMetadataStore(), } dcpClient2 := newDCPClientWithFastCheckpointing(t, bucket, dcpClientOpts) @@ -664,6 +668,7 @@ func TestDCPFeedEventTypes(t *testing.T) { CollectionNames: NewCollectionNameSet(collection), CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), Callback: callback, + MetadataStore: bucket.GetMetadataStore(), } dcpClient, err := NewDCPClient(ctx, bucket, clientOptions) require.NoError(t, err) @@ -814,6 +819,7 @@ func TestDCPFeedContentBodyOnlyDocs(t *testing.T) { CollectionNames: NewCollectionNameSet(dataStore), FeedContent: mode.feedContent, FromLatestSequence: live, + MetadataStore: bucket.GetMetadataStore(), } dcpClient, err := NewDCPClient(ctx, bucket, feedArgs) require.NoError(t, err) @@ -963,6 +969,7 @@ func TestDCPCheckpointCleanup(t *testing.T) { CheckpointPrefix: checkpointPrefix, Callback: callback, MetadataStoreType: DCPMetadataStoreCS, + MetadataStore: bucket.GetMetadataStore(), } dcpClient := newDCPClientWithFastCheckpointing(t, bucket, dcpOptions) @@ -1113,6 +1120,7 @@ func TestDCPDataType(t *testing.T) { OneShot: true, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), Callback: callback, + MetadataStore: bucket.GetMetadataStore(), } dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions) diff --git a/base/gocb_dcp_client.go b/base/gocb_dcp_client.go index 7dbecf7fb5..a1b7dcd1e3 100644 --- a/base/gocb_dcp_client.go +++ b/base/gocb_dcp_client.go @@ -84,6 +84,7 @@ type GoCBDCPClientOptions struct { CheckpointPrefix string NumVBuckets uint16 // NumVBuckets is used to set the number of vbuckets for the DCP feed, if not set, it will be determined from the bucket when creating the client FeedContent sgbucket.FeedContent // FeedContent specifies whether the DCP feed should include values, xattrs, or both + MetadataStore DataStore // MetadataStore is used for DCPMetadata persistence when MetadataStoreType is DCPMetadataStoreCS } // MemdDcpOpenFlag returns the memd.DcpOpenFlag to use for the given FeedContent option. @@ -152,8 +153,6 @@ func NewGocbDCPClient(ctx context.Context, callback sgbucket.FeedEventCallbackFu agentPriority: options.AgentPriority, collectionIDs: options.CollectionIDs, feedContent: options.FeedContent, - // TODO:: Change to metadataStore passed in for dual metadataStore - metadataStore: bucket.DefaultDataStore(ctx), } // Initialize active vbuckets @@ -164,6 +163,8 @@ func NewGocbDCPClient(ctx context.Context, callback sgbucket.FeedEventCallbackFu switch options.MetadataStoreType { case DCPMetadataStoreCS: + // Metadata store is needed to persist metadata for the DCP feed + client.metadataStore = options.MetadataStore client.metadata = NewDCPMetadataCS(ctx, client.metadataStore, numVbuckets, numWorkers, options.CheckpointPrefix) case DCPMetadataStoreInMemory: client.metadata = NewDCPMetadataMem(numVbuckets) diff --git a/base/gocb_dcp_feed.go b/base/gocb_dcp_feed.go index e57d94bb3f..69b61dafbf 100644 --- a/base/gocb_dcp_feed.go +++ b/base/gocb_dcp_feed.go @@ -10,11 +10,9 @@ package base import ( "context" - "expvar" "fmt" "github.com/couchbase/gocbcore/v10" - sgbucket "github.com/couchbase/sg-bucket" ) // getHighSeqMetadata returns metadata to feed into a DCP client based on the last sequence numbers stored in memory @@ -46,125 +44,3 @@ func getHighSeqMetadata(ctx context.Context, cbstore CouchbaseBucketStore) ([]DC } return metadata, nil } - -// StartGocbDCPFeed starts a DCP Feed. -func StartGocbDCPFeed(ctx context.Context, bucket *GocbV2Bucket, bucketName string, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, metadataStoreType DCPMetadataStoreType, groupID string) error { - - feedName, err := GenerateDcpStreamName(args.ID) - if err != nil { - return err - } - - var collectionIDs []uint32 - if bucket.IsSupported(sgbucket.BucketStoreFeatureCollections) { - cm, err := bucket.GetCollectionManifest() - if err != nil { - return err - } - - // should only be one args.Scope so cheaper to iterate this way around - for scopeName, collections := range args.Scopes { - scopeFound := false - for _, manifestScope := range cm.Scopes { - if scopeName != manifestScope.Name { - continue - } - scopeFound = true - collectionsFound := make(map[string]struct{}) - // should be less than or equal number of args.collections than cm.scope.collections, so iterate this way so that the inner loop completes quicker on average - for _, manifestCollection := range manifestScope.Collections { - for _, collectionName := range collections { - if collectionName != manifestCollection.Name { - continue - } - collectionIDs = append(collectionIDs, manifestCollection.UID) - collectionsFound[collectionName] = struct{}{} - } - } - if len(collectionsFound) != len(collections) { - for _, collectionName := range collections { - if _, ok := collectionsFound[collectionName]; !ok { - return RedactErrorf("collection %s not found in scope %s %+v", MD(collectionName), MD(manifestScope.Name), manifestScope.Collections) - } - } - } - break - } - if !scopeFound { - return RedactErrorf("scope %s not found", MD(scopeName)) - } - } - } - options := GoCBDCPClientOptions{ - MetadataStoreType: metadataStoreType, - DbStats: dbStats, - CollectionIDs: collectionIDs, - AgentPriority: gocbcore.DcpAgentPriorityMed, - CheckpointPrefix: args.CheckpointPrefix, - FeedID: args.ID, - FeedContent: args.FeedContent, - } - - if args.Backfill == sgbucket.FeedNoBackfill { - metadata, err := getHighSeqMetadata(ctx, bucket) - if err != nil { - return err - } - options.InitialMetadata = metadata - } - - dcpClient, err := NewGocbDCPClient( - ctx, - callback, - options, - bucket) - if err != nil { - return err - } - - doneChan, err := dcpClient.Start() - if err != nil { - ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err) - // simplify in CBG-2234 - closeErr := dcpClient.Close() - ErrorfCtx(ctx, "Finished called async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName)) - if closeErr != nil { - ErrorfCtx(ctx, "Close error from DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), closeErr) - } - asyncCloseErr := <-doneChan - ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), asyncCloseErr) - return err - } - InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName)) - go func() { - select { - case dcpCloseError := <-doneChan: - // simplify close in CBG-2234 - // This is a close because DCP client closed on its own, which should never happen since once - // DCP feed is started, there is nothing that will close it - InfofCtx(ctx, KeyDCP, "Forced closed DCP Feed %q for %q", feedName, MD(bucketName)) - // wait for channel close - <-doneChan - if dcpCloseError != nil { - WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseError) - } - // FIXME: close dbContext here - break - case <-args.Terminator: - InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName)) - dcpCloseErr := dcpClient.Close() - if dcpCloseErr != nil { - WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr) - } - dcpCloseErr = <-doneChan - if dcpCloseErr != nil { - WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr) - } - break - } - if args.DoneChan != nil { - close(args.DoneChan) - } - }() - return err -} diff --git a/base/rosmar_dcp_client.go b/base/rosmar_dcp_client.go index c5a448bdc2..a71cc4fc72 100644 --- a/base/rosmar_dcp_client.go +++ b/base/rosmar_dcp_client.go @@ -48,6 +48,7 @@ func (dc *RosmarDCPClient) Start() (chan error, error) { Scopes: dc.opts.CollectionNames.ToCollectionNames(), Backfill: sgbucket.FeedResume, FeedContent: dc.opts.FeedContent, + MetadataStore: dc.opts.MetadataStore, } if dc.opts.FromLatestSequence { feedArgs.Backfill = sgbucket.FeedNoBackfill @@ -90,12 +91,15 @@ func (dc *RosmarDCPClient) GetMetadataKeyPrefix() string { return dc.opts.CheckpointPrefix } -func (dc *RosmarDCPClient) metadataStore(ctx context.Context) sgbucket.DataStore { +func (dc *RosmarDCPClient) getMetadataStore(ctx context.Context) sgbucket.DataStore { + if dc.opts.MetadataStore != nil { + return dc.opts.MetadataStore + } return dc.bucket.DefaultDataStore(ctx) } // PurgeCheckpoints deletes the checkpoint document for the feed. Calling this function while the feed is running // will not alter the feed nor remove the checkpoint for the future. func (dc *RosmarDCPClient) PurgeCheckpoints(ctx context.Context) error { - return PurgeDCPCheckpoints(ctx, dc.metadataStore(ctx), dc.opts.CheckpointPrefix, DCPFeedRosmar) + return PurgeDCPCheckpoints(ctx, dc.getMetadataStore(ctx), dc.opts.CheckpointPrefix, DCPFeedRosmar) } diff --git a/db/attachment_compaction.go b/db/attachment_compaction.go index e873e82a20..7ba9b11519 100644 --- a/db/attachment_compaction.go +++ b/db/attachment_compaction.go @@ -575,6 +575,7 @@ func getCompactionDCPClientOptions(db *Database, compactionID string, collection Callback: callback, FeedID: fmt.Sprintf("att_compaction:%v_%v", compactionID, phase), CheckpointPrefix: GetAttachmentCompactionDCPCheckpointPrefix(db.DatabaseContext, compactionID, phase), + MetadataStore: db.MetadataStore, } } diff --git a/db/background_mgr_attachment_migration.go b/db/background_mgr_attachment_migration.go index 52184254b1..5bdea3e1fd 100644 --- a/db/background_mgr_attachment_migration.go +++ b/db/background_mgr_attachment_migration.go @@ -162,7 +162,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string if err != nil { return err } - dcpOptions := a.getDCPClientOptions(a.MigrationID, scopes, callback) + dcpOptions := a.getDCPClientOptions(a.MigrationID, scopes, callback, db.MetadataStore) // check for mismatch in collection id's between current collections on the db and prev run @@ -297,7 +297,7 @@ func (a *AttachmentMigrationManager) getCheckpointPrefix(migrationID string) str // getMigrationDCPClientOptions returns options for DCP client for attachment migration. CollectionIDs represent the Couchbase Server // CollectionIDs and prefix represents the checkpoint prefix for checkpoint documents. -func (a *AttachmentMigrationManager) getDCPClientOptions(migrationID string, scopes base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc) base.DCPClientOptions { +func (a *AttachmentMigrationManager) getDCPClientOptions(migrationID string, scopes base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc, metadataStore base.DataStore) base.DCPClientOptions { return base.DCPClientOptions{ FeedID: fmt.Sprintf("att_migration:%v", migrationID), OneShot: true, @@ -306,6 +306,7 @@ func (a *AttachmentMigrationManager) getDCPClientOptions(migrationID string, sco CollectionNames: scopes, CheckpointPrefix: a.getCheckpointPrefix(migrationID), Callback: callback, + MetadataStore: metadataStore, } } diff --git a/db/background_mgr_attachment_migration_test.go b/db/background_mgr_attachment_migration_test.go index e533f918b5..d24f9fa4a9 100644 --- a/db/background_mgr_attachment_migration_test.go +++ b/db/background_mgr_attachment_migration_test.go @@ -299,14 +299,10 @@ func TestAttachmentMigrationCheckpointPrefix(t *testing.T) { defer db.Close(ctx) mgr := NewAttachmentMigrationManager(db) - clientOptions := mgr.Process.(*AttachmentMigrationManager).getDCPClientOptions( - migrationID, - db.collectionNameSet(), - func(sgbucket.FeedEvent) bool { - require.FailNow(t, "DCP callback should not be called") - return false - }, - ) + clientOptions := mgr.Process.(*AttachmentMigrationManager).getDCPClientOptions(migrationID, db.collectionNameSet(), func(sgbucket.FeedEvent) bool { + require.FailNow(t, "DCP callback should not be called") + return false + }, db.MetadataStore) dcpClient, err := base.NewDCPClient( ctx, diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index c848d1aabd..394b678dda 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -532,6 +532,7 @@ func (r *ResyncManagerDCP) getDCPClientOptions(db *DatabaseContext, resyncID str CollectionNames: collectionNames, CheckpointPrefix: GetResyncDCPCheckpointPrefix(db, resyncID, distributed), Callback: callback, + MetadataStore: db.MetadataStore, } } diff --git a/db/import_listener.go b/db/import_listener.go index 442b212982..5884ff2342 100644 --- a/db/import_listener.go +++ b/db/import_listener.go @@ -86,6 +86,7 @@ func (il *importListener) StartImportFeed(dbContext *DatabaseContext) (err error CollectionNames: dbContext.collectionNameSet(), Callback: il.ProcessFeedEvent, DBStats: dbContext.DbStats.Database().ImportFeedMapStats.Map, + MetadataStore: dbContext.MetadataStore, } _, err := base.StartDCPFeed(ctx, dbContext.Bucket, feedArgs) diff --git a/go.mod b/go.mod index 8fc9aabd11..eb0d4088fc 100644 --- a/go.mod +++ b/go.mod @@ -13,11 +13,11 @@ require ( github.com/couchbase/gocb/v2 v2.12.1 github.com/couchbase/gocbcore/v10 v10.9.2-0.20260430103215-edcade542007 github.com/couchbase/gomemcached v0.2.1 - github.com/couchbase/sg-bucket v0.0.0-20260511142200-015dc3334b19 + github.com/couchbase/sg-bucket v0.0.0-20260518140148-dcaed0c11947 github.com/couchbasedeps/fast-skiplist v0.0.0-20250722125747-e0dd031fe2ac github.com/couchbaselabs/go-fleecedelta v0.0.0-20220909152808-6d09efa7a338 github.com/couchbaselabs/gocbconnstr v1.0.5 - github.com/couchbaselabs/rosmar v0.0.0-20260511150952-d9eb548723c3 + github.com/couchbaselabs/rosmar v0.0.0-20260518140303-b1150de004b1 github.com/elastic/gosigar v0.14.4 github.com/felixge/fgprof v0.9.5 github.com/go-jose/go-jose/v4 v4.1.4 diff --git a/go.sum b/go.sum index 17f2cc0b06..027ddfa06b 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,8 @@ github.com/couchbase/goprotostellar v1.0.5 h1:pmR4H87zbYymIdTR1owyUZsfQ7NupkfCuN github.com/couchbase/goprotostellar v1.0.5/go.mod h1:X58ot5FRqlBTBkwG/oI4klunpu4MApjGktheqeRWQw0= github.com/couchbase/goutils v0.1.2 h1:gWr8B6XNWPIhfalHNog3qQKfGiYyh4K4VhO3P2o9BCs= github.com/couchbase/goutils v0.1.2/go.mod h1:h89Ek/tiOxxqjz30nPPlwZdQbdB8BwgnuBxeoUe/ViE= -github.com/couchbase/sg-bucket v0.0.0-20260511142200-015dc3334b19 h1:rlC0lsj4tymc5/5IOUP+2sxwLpuzYAioixy1+RGzpw8= -github.com/couchbase/sg-bucket v0.0.0-20260511142200-015dc3334b19/go.mod h1:zZspn0FpL3Q9ol/KN9dpR0u/p5f8cJpETk8GVxcegqA= +github.com/couchbase/sg-bucket v0.0.0-20260518140148-dcaed0c11947 h1:sGjQIJd/jPbCMepMf3d8YqSiCNWfaBYz6s6fmZquk2k= +github.com/couchbase/sg-bucket v0.0.0-20260518140148-dcaed0c11947/go.mod h1:zZspn0FpL3Q9ol/KN9dpR0u/p5f8cJpETk8GVxcegqA= github.com/couchbase/tools-common/cloud/v8 v8.1.3 h1:+fH2+3E8KV8xyXXzbEJNhosMqh08ahauZWlu0m5qtJA= github.com/couchbase/tools-common/cloud/v8 v8.1.3/go.mod h1:5wEMtM4rX92Zl9ylQKEJFgmYNUyFVoWzXCvo31YNO+U= github.com/couchbase/tools-common/fs v1.0.3 h1:KXhisN+hmp5yicOWkUBNjJcd/WsblHA2SVShjL2eGiY= @@ -68,8 +68,8 @@ github.com/couchbaselabs/gocbconnstr v1.0.5 h1:e0JokB5qbcz7rfnxEhNRTKz8q1svoRvDo github.com/couchbaselabs/gocbconnstr v1.0.5/go.mod h1:KV3fnIKMi8/AzX0O9zOrO9rofEqrRF1d2rG7qqjxC7o= github.com/couchbaselabs/gocbconnstr/v2 v2.0.0 h1:HU9DlAYYWR69jQnLN6cpg0fh0hxW/8d5hnglCXXjW78= github.com/couchbaselabs/gocbconnstr/v2 v2.0.0/go.mod h1:o7T431UOfFVHDNvMBUmUxpHnhivwv7BziUao/nMl81E= -github.com/couchbaselabs/rosmar v0.0.0-20260511150952-d9eb548723c3 h1:n9hQ9wtp6aAyKl7md96UhAcNVN4ELDfeng5TzGxqHvY= -github.com/couchbaselabs/rosmar v0.0.0-20260511150952-d9eb548723c3/go.mod h1:++TYMtmRwsMAou0pWuNRZFB9bCf1vk2xPx1xLPBNszw= +github.com/couchbaselabs/rosmar v0.0.0-20260518140303-b1150de004b1 h1:ZO3gJQvEwKbyih2u3eTAtqGvQxWlCIlWG6E38e9cJRY= +github.com/couchbaselabs/rosmar v0.0.0-20260518140303-b1150de004b1/go.mod h1:CdMXYG6aUHXtyy/R7IjyqkSP5PjDHppGKJCu/gEvcqw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= From 6e84e7a00c9573615ba52a363dfb26ec43f73e8c Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 18 May 2026 16:45:06 +0100 Subject: [PATCH 2/3] review address and dependency update --- base/dcp_client.go | 2 +- base/gocb_dcp_client.go | 7 ++++++- go.mod | 4 ++-- go.sum | 8 ++++---- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/base/dcp_client.go b/base/dcp_client.go index 7ccfac2230..f454bd89ab 100644 --- a/base/dcp_client.go +++ b/base/dcp_client.go @@ -66,7 +66,7 @@ func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DC } else if opts.MetadataStoreType == DCPMetadataStoreCS && opts.CheckpointPrefix == "" { return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix must be provided when MetadataStoreType is persistent") } else if opts.MetadataStoreType == DCPMetadataStoreCS && opts.MetadataStore == nil { - return nil, fmt.Errorf("DCPClientOptions.MetadataStore must be provided when MetadataStoreType is CS") + return nil, fmt.Errorf("DCPClientOptions.MetadataStore must be provided when MetadataStoreType is persistent") } underlyingBucket := GetBaseBucket(bucket) if _, ok := underlyingBucket.(*rosmar.Bucket); ok { diff --git a/base/gocb_dcp_client.go b/base/gocb_dcp_client.go index a1b7dcd1e3..a1259b08c1 100644 --- a/base/gocb_dcp_client.go +++ b/base/gocb_dcp_client.go @@ -163,7 +163,12 @@ func NewGocbDCPClient(ctx context.Context, callback sgbucket.FeedEventCallbackFu switch options.MetadataStoreType { case DCPMetadataStoreCS: - // Metadata store is needed to persist metadata for the DCP feed + // Persistent DCP metadata requires a backing store. Validate here so direct + // callers get a regular constructor error instead of a nil dereference while + // loading checkpoints. + if options.MetadataStore == nil { + return nil, fmt.Errorf("MetadataStore must be provided when MetadataStoreType is persistent") + } client.metadataStore = options.MetadataStore client.metadata = NewDCPMetadataCS(ctx, client.metadataStore, numVbuckets, numWorkers, options.CheckpointPrefix) case DCPMetadataStoreInMemory: diff --git a/go.mod b/go.mod index eb0d4088fc..ef56493b66 100644 --- a/go.mod +++ b/go.mod @@ -13,11 +13,11 @@ require ( github.com/couchbase/gocb/v2 v2.12.1 github.com/couchbase/gocbcore/v10 v10.9.2-0.20260430103215-edcade542007 github.com/couchbase/gomemcached v0.2.1 - github.com/couchbase/sg-bucket v0.0.0-20260518140148-dcaed0c11947 + github.com/couchbase/sg-bucket v0.0.0-20260518141224-124a6a2b318e github.com/couchbasedeps/fast-skiplist v0.0.0-20250722125747-e0dd031fe2ac github.com/couchbaselabs/go-fleecedelta v0.0.0-20220909152808-6d09efa7a338 github.com/couchbaselabs/gocbconnstr v1.0.5 - github.com/couchbaselabs/rosmar v0.0.0-20260518140303-b1150de004b1 + github.com/couchbaselabs/rosmar v0.0.0-20260518154355-0e98444f7414 github.com/elastic/gosigar v0.14.4 github.com/felixge/fgprof v0.9.5 github.com/go-jose/go-jose/v4 v4.1.4 diff --git a/go.sum b/go.sum index 027ddfa06b..a004800a85 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,8 @@ github.com/couchbase/goprotostellar v1.0.5 h1:pmR4H87zbYymIdTR1owyUZsfQ7NupkfCuN github.com/couchbase/goprotostellar v1.0.5/go.mod h1:X58ot5FRqlBTBkwG/oI4klunpu4MApjGktheqeRWQw0= github.com/couchbase/goutils v0.1.2 h1:gWr8B6XNWPIhfalHNog3qQKfGiYyh4K4VhO3P2o9BCs= github.com/couchbase/goutils v0.1.2/go.mod h1:h89Ek/tiOxxqjz30nPPlwZdQbdB8BwgnuBxeoUe/ViE= -github.com/couchbase/sg-bucket v0.0.0-20260518140148-dcaed0c11947 h1:sGjQIJd/jPbCMepMf3d8YqSiCNWfaBYz6s6fmZquk2k= -github.com/couchbase/sg-bucket v0.0.0-20260518140148-dcaed0c11947/go.mod h1:zZspn0FpL3Q9ol/KN9dpR0u/p5f8cJpETk8GVxcegqA= +github.com/couchbase/sg-bucket v0.0.0-20260518141224-124a6a2b318e h1:tPaPP/sHmz3ACUgEDgRIeM68TLSrEax2zNE17zpe1Mw= +github.com/couchbase/sg-bucket v0.0.0-20260518141224-124a6a2b318e/go.mod h1:zZspn0FpL3Q9ol/KN9dpR0u/p5f8cJpETk8GVxcegqA= github.com/couchbase/tools-common/cloud/v8 v8.1.3 h1:+fH2+3E8KV8xyXXzbEJNhosMqh08ahauZWlu0m5qtJA= github.com/couchbase/tools-common/cloud/v8 v8.1.3/go.mod h1:5wEMtM4rX92Zl9ylQKEJFgmYNUyFVoWzXCvo31YNO+U= github.com/couchbase/tools-common/fs v1.0.3 h1:KXhisN+hmp5yicOWkUBNjJcd/WsblHA2SVShjL2eGiY= @@ -68,8 +68,8 @@ github.com/couchbaselabs/gocbconnstr v1.0.5 h1:e0JokB5qbcz7rfnxEhNRTKz8q1svoRvDo github.com/couchbaselabs/gocbconnstr v1.0.5/go.mod h1:KV3fnIKMi8/AzX0O9zOrO9rofEqrRF1d2rG7qqjxC7o= github.com/couchbaselabs/gocbconnstr/v2 v2.0.0 h1:HU9DlAYYWR69jQnLN6cpg0fh0hxW/8d5hnglCXXjW78= github.com/couchbaselabs/gocbconnstr/v2 v2.0.0/go.mod h1:o7T431UOfFVHDNvMBUmUxpHnhivwv7BziUao/nMl81E= -github.com/couchbaselabs/rosmar v0.0.0-20260518140303-b1150de004b1 h1:ZO3gJQvEwKbyih2u3eTAtqGvQxWlCIlWG6E38e9cJRY= -github.com/couchbaselabs/rosmar v0.0.0-20260518140303-b1150de004b1/go.mod h1:CdMXYG6aUHXtyy/R7IjyqkSP5PjDHppGKJCu/gEvcqw= +github.com/couchbaselabs/rosmar v0.0.0-20260518154355-0e98444f7414 h1:eq6+zJXkB/lcx2FRFmW9/gcHsulonOX66LARFki60JE= +github.com/couchbaselabs/rosmar v0.0.0-20260518154355-0e98444f7414/go.mod h1:026Pe8kW5vBQqxE08AyOsIy0hRfrI0D2h13sByJ+Msk= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= From b76ad906d863f7a2104ef9b49b5ac815051a3339 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 18 May 2026 17:50:25 +0100 Subject: [PATCH 3/3] test fix --- base/dcp_client_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/base/dcp_client_test.go b/base/dcp_client_test.go index ede90cd554..5c030bbb69 100644 --- a/base/dcp_client_test.go +++ b/base/dcp_client_test.go @@ -228,6 +228,7 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) { FailOnRollback: true, CollectionIDs: collectionIDs, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + MetadataStore: bucket.GetMetadataStore(), } gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) @@ -265,6 +266,7 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) { OneShot: true, CollectionIDs: collectionIDs, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + MetadataStore: bucket.GetMetadataStore(), } dcpClient2, err := NewGocbDCPClient(ctx, counterCallback, dcpClientOpts, gocbv2Bucket) require.NoError(t, err) @@ -283,6 +285,7 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) { OneShot: true, CollectionIDs: collectionIDs, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + MetadataStore: bucket.GetMetadataStore(), } dcpClient3, err := NewGocbDCPClient(ctx, counterCallback, dcpClientOpts, gocbv2Bucket)