Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions base/dcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type DCPClientOptions struct {
Terminator chan bool // optional channel that can be closed to terminate the DCP feed, this will be replaced with a context option.
FromLatestSequence bool // If true, start at latest sequence.
FeedContent sgbucket.FeedContent // feedContent specifies whether the DCP feed should include values, xattrs, or both
MetadataStore DataStore
}

// NewDCPClient creates a new DCPClient to receive events from a bucket.
Expand All @@ -64,6 +65,8 @@ func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DC
return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix cannot be provided when MetadataStoreType is InMemory")
} else if opts.MetadataStoreType == DCPMetadataStoreCS && opts.CheckpointPrefix == "" {
return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix must be provided when MetadataStoreType is persistent")
} else if opts.MetadataStoreType == DCPMetadataStoreCS && opts.MetadataStore == nil {
return nil, fmt.Errorf("DCPClientOptions.MetadataStore must be provided when MetadataStoreType is persistent")
}
underlyingBucket := GetBaseBucket(bucket)
if _, ok := underlyingBucket.(*rosmar.Bucket); ok {
Expand Down Expand Up @@ -122,6 +125,7 @@ func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DC
FailOnRollback: opts.FailOnRollback,
InitialMetadata: opts.InitialMetadata,
FeedContent: opts.FeedContent,
MetadataStore: opts.MetadataStore,
}

if opts.FromLatestSequence {
Expand Down
11 changes: 11 additions & 0 deletions base/dcp_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestOneShotDCP(t *testing.T) {
OneShot: true,
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
Callback: counterCallback,
MetadataStore: bucket.GetMetadataStore(),
}

dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions)
Expand Down Expand Up @@ -117,6 +118,7 @@ func TestTerminateDCPFeed(t *testing.T) {
OneShot: false,
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
Callback: counterCallback,
MetadataStore: bucket.GetMetadataStore(),
}

dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions)
Expand Down Expand Up @@ -226,6 +228,7 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) {
FailOnRollback: true,
CollectionIDs: collectionIDs,
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
MetadataStore: bucket.GetMetadataStore(),
}

gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket)
Expand Down Expand Up @@ -263,6 +266,7 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) {
OneShot: true,
CollectionIDs: collectionIDs,
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
MetadataStore: bucket.GetMetadataStore(),
}
dcpClient2, err := NewGocbDCPClient(ctx, counterCallback, dcpClientOpts, gocbv2Bucket)
require.NoError(t, err)
Expand All @@ -281,6 +285,7 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) {
OneShot: true,
CollectionIDs: collectionIDs,
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
MetadataStore: bucket.GetMetadataStore(),
}

dcpClient3, err := NewGocbDCPClient(ctx, counterCallback, dcpClientOpts, gocbv2Bucket)
Expand Down Expand Up @@ -460,6 +465,7 @@ func TestResumeStoppedFeed(t *testing.T) {
OneShot: true,
FailOnRollback: false,
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
MetadataStore: bucket.GetMetadataStore(),
}

dcpClient = newDCPClientWithFastCheckpointing(t, bucket, dcpClientOpts)
Expand Down Expand Up @@ -494,6 +500,7 @@ func TestResumeStoppedFeed(t *testing.T) {
Callback: secondCallback,
CollectionNames: NewCollectionNameSet(dataStore),
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
MetadataStore: bucket.GetMetadataStore(),
}

dcpClient2 := newDCPClientWithFastCheckpointing(t, bucket, dcpClientOpts)
Expand Down Expand Up @@ -664,6 +671,7 @@ func TestDCPFeedEventTypes(t *testing.T) {
CollectionNames: NewCollectionNameSet(collection),
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
Callback: callback,
MetadataStore: bucket.GetMetadataStore(),
}
dcpClient, err := NewDCPClient(ctx, bucket, clientOptions)
require.NoError(t, err)
Expand Down Expand Up @@ -814,6 +822,7 @@ func TestDCPFeedContentBodyOnlyDocs(t *testing.T) {
CollectionNames: NewCollectionNameSet(dataStore),
FeedContent: mode.feedContent,
FromLatestSequence: live,
MetadataStore: bucket.GetMetadataStore(),
}
dcpClient, err := NewDCPClient(ctx, bucket, feedArgs)
require.NoError(t, err)
Expand Down Expand Up @@ -963,6 +972,7 @@ func TestDCPCheckpointCleanup(t *testing.T) {
CheckpointPrefix: checkpointPrefix,
Callback: callback,
MetadataStoreType: DCPMetadataStoreCS,
MetadataStore: bucket.GetMetadataStore(),
}

dcpClient := newDCPClientWithFastCheckpointing(t, bucket, dcpOptions)
Expand Down Expand Up @@ -1113,6 +1123,7 @@ func TestDCPDataType(t *testing.T) {
OneShot: true,
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
Callback: callback,
MetadataStore: bucket.GetMetadataStore(),
}

dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions)
Expand Down
10 changes: 8 additions & 2 deletions base/gocb_dcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type GoCBDCPClientOptions struct {
CheckpointPrefix string
NumVBuckets uint16 // NumVBuckets is used to set the number of vbuckets for the DCP feed, if not set, it will be determined from the bucket when creating the client
FeedContent sgbucket.FeedContent // FeedContent specifies whether the DCP feed should include values, xattrs, or both
MetadataStore DataStore // MetadataStore is used for DCPMetadata persistence when MetadataStoreType is DCPMetadataStoreCS
}

// MemdDcpOpenFlag returns the memd.DcpOpenFlag to use for the given FeedContent option.
Expand Down Expand Up @@ -152,8 +153,6 @@ func NewGocbDCPClient(ctx context.Context, callback sgbucket.FeedEventCallbackFu
agentPriority: options.AgentPriority,
collectionIDs: options.CollectionIDs,
feedContent: options.FeedContent,
// TODO:: Change to metadataStore passed in for dual metadataStore
metadataStore: bucket.DefaultDataStore(ctx),
}

// Initialize active vbuckets
Expand All @@ -164,6 +163,13 @@ func NewGocbDCPClient(ctx context.Context, callback sgbucket.FeedEventCallbackFu

switch options.MetadataStoreType {
case DCPMetadataStoreCS:
// Persistent DCP metadata requires a backing store. Validate here so direct
// callers get a regular constructor error instead of a nil dereference while
// loading checkpoints.
if options.MetadataStore == nil {
return nil, fmt.Errorf("MetadataStore must be provided when MetadataStoreType is persistent")
}
client.metadataStore = options.MetadataStore
client.metadata = NewDCPMetadataCS(ctx, client.metadataStore, numVbuckets, numWorkers, options.CheckpointPrefix)
case DCPMetadataStoreInMemory:
client.metadata = NewDCPMetadataMem(numVbuckets)
Expand Down
124 changes: 0 additions & 124 deletions base/gocb_dcp_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ package base

import (
"context"
"expvar"
"fmt"

"github.com/couchbase/gocbcore/v10"
sgbucket "github.com/couchbase/sg-bucket"
)

// getHighSeqMetadata returns metadata to feed into a DCP client based on the last sequence numbers stored in memory
Expand Down Expand Up @@ -46,125 +44,3 @@ func getHighSeqMetadata(ctx context.Context, cbstore CouchbaseBucketStore) ([]DC
}
return metadata, nil
}

// StartGocbDCPFeed starts a DCP Feed.
func StartGocbDCPFeed(ctx context.Context, bucket *GocbV2Bucket, bucketName string, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, metadataStoreType DCPMetadataStoreType, groupID string) error {

feedName, err := GenerateDcpStreamName(args.ID)
if err != nil {
return err
}

var collectionIDs []uint32
if bucket.IsSupported(sgbucket.BucketStoreFeatureCollections) {
cm, err := bucket.GetCollectionManifest()
if err != nil {
return err
}

// should only be one args.Scope so cheaper to iterate this way around
for scopeName, collections := range args.Scopes {
scopeFound := false
for _, manifestScope := range cm.Scopes {
if scopeName != manifestScope.Name {
continue
}
scopeFound = true
collectionsFound := make(map[string]struct{})
// should be less than or equal number of args.collections than cm.scope.collections, so iterate this way so that the inner loop completes quicker on average
for _, manifestCollection := range manifestScope.Collections {
for _, collectionName := range collections {
if collectionName != manifestCollection.Name {
continue
}
collectionIDs = append(collectionIDs, manifestCollection.UID)
collectionsFound[collectionName] = struct{}{}
}
}
if len(collectionsFound) != len(collections) {
for _, collectionName := range collections {
if _, ok := collectionsFound[collectionName]; !ok {
return RedactErrorf("collection %s not found in scope %s %+v", MD(collectionName), MD(manifestScope.Name), manifestScope.Collections)
}
}
}
break
}
if !scopeFound {
return RedactErrorf("scope %s not found", MD(scopeName))
}
}
}
options := GoCBDCPClientOptions{
MetadataStoreType: metadataStoreType,
DbStats: dbStats,
CollectionIDs: collectionIDs,
AgentPriority: gocbcore.DcpAgentPriorityMed,
CheckpointPrefix: args.CheckpointPrefix,
FeedID: args.ID,
FeedContent: args.FeedContent,
}

if args.Backfill == sgbucket.FeedNoBackfill {
metadata, err := getHighSeqMetadata(ctx, bucket)
if err != nil {
return err
}
options.InitialMetadata = metadata
}

dcpClient, err := NewGocbDCPClient(
ctx,
callback,
options,
bucket)
if err != nil {
return err
}

doneChan, err := dcpClient.Start()
if err != nil {
ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err)
// simplify in CBG-2234
closeErr := dcpClient.Close()
ErrorfCtx(ctx, "Finished called async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName))
if closeErr != nil {
ErrorfCtx(ctx, "Close error from DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), closeErr)
}
asyncCloseErr := <-doneChan
ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), asyncCloseErr)
return err
}
InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName))
go func() {
select {
case dcpCloseError := <-doneChan:
// simplify close in CBG-2234
// This is a close because DCP client closed on its own, which should never happen since once
// DCP feed is started, there is nothing that will close it
InfofCtx(ctx, KeyDCP, "Forced closed DCP Feed %q for %q", feedName, MD(bucketName))
// wait for channel close
<-doneChan
if dcpCloseError != nil {
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseError)
}
// FIXME: close dbContext here
break
case <-args.Terminator:
InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName))
dcpCloseErr := dcpClient.Close()
if dcpCloseErr != nil {
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr)
}
dcpCloseErr = <-doneChan
if dcpCloseErr != nil {
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr)
}
break
}
if args.DoneChan != nil {
close(args.DoneChan)
}
}()
return err
}
8 changes: 6 additions & 2 deletions base/rosmar_dcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (dc *RosmarDCPClient) Start() (chan error, error) {
Scopes: dc.opts.CollectionNames.ToCollectionNames(),
Backfill: sgbucket.FeedResume,
FeedContent: dc.opts.FeedContent,
MetadataStore: dc.opts.MetadataStore,
}
if dc.opts.FromLatestSequence {
feedArgs.Backfill = sgbucket.FeedNoBackfill
Expand Down Expand Up @@ -90,12 +91,15 @@ func (dc *RosmarDCPClient) GetMetadataKeyPrefix() string {
return dc.opts.CheckpointPrefix
}

func (dc *RosmarDCPClient) metadataStore(ctx context.Context) sgbucket.DataStore {
func (dc *RosmarDCPClient) getMetadataStore(ctx context.Context) sgbucket.DataStore {
if dc.opts.MetadataStore != nil {
return dc.opts.MetadataStore
}
return dc.bucket.DefaultDataStore(ctx)
}

// PurgeCheckpoints deletes the checkpoint document for the feed. Calling this function while the feed is running
// will not alter the feed nor remove the checkpoint for the future.
func (dc *RosmarDCPClient) PurgeCheckpoints(ctx context.Context) error {
return PurgeDCPCheckpoints(ctx, dc.metadataStore(ctx), dc.opts.CheckpointPrefix, DCPFeedRosmar)
return PurgeDCPCheckpoints(ctx, dc.getMetadataStore(ctx), dc.opts.CheckpointPrefix, DCPFeedRosmar)
}
1 change: 1 addition & 0 deletions db/attachment_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ func getCompactionDCPClientOptions(db *Database, compactionID string, collection
Callback: callback,
FeedID: fmt.Sprintf("att_compaction:%v_%v", compactionID, phase),
CheckpointPrefix: GetAttachmentCompactionDCPCheckpointPrefix(db.DatabaseContext, compactionID, phase),
MetadataStore: db.MetadataStore,
}
}

Expand Down
5 changes: 3 additions & 2 deletions db/background_mgr_attachment_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string
if err != nil {
return err
}
dcpOptions := a.getDCPClientOptions(a.MigrationID, scopes, callback)
dcpOptions := a.getDCPClientOptions(a.MigrationID, scopes, callback, db.MetadataStore)

// check for mismatch in collection id's between current collections on the db and prev run

Expand Down Expand Up @@ -297,7 +297,7 @@ func (a *AttachmentMigrationManager) getCheckpointPrefix(migrationID string) str

// getMigrationDCPClientOptions returns options for DCP client for attachment migration. CollectionIDs represent the Couchbase Server
// CollectionIDs and prefix represents the checkpoint prefix for checkpoint documents.
func (a *AttachmentMigrationManager) getDCPClientOptions(migrationID string, scopes base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc) base.DCPClientOptions {
func (a *AttachmentMigrationManager) getDCPClientOptions(migrationID string, scopes base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc, metadataStore base.DataStore) base.DCPClientOptions {
return base.DCPClientOptions{
FeedID: fmt.Sprintf("att_migration:%v", migrationID),
OneShot: true,
Expand All @@ -306,6 +306,7 @@ func (a *AttachmentMigrationManager) getDCPClientOptions(migrationID string, sco
CollectionNames: scopes,
CheckpointPrefix: a.getCheckpointPrefix(migrationID),
Callback: callback,
MetadataStore: metadataStore,
}
}

Expand Down
12 changes: 4 additions & 8 deletions db/background_mgr_attachment_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,10 @@ func TestAttachmentMigrationCheckpointPrefix(t *testing.T) {
defer db.Close(ctx)

mgr := NewAttachmentMigrationManager(db)
clientOptions := mgr.Process.(*AttachmentMigrationManager).getDCPClientOptions(
migrationID,
db.collectionNameSet(),
func(sgbucket.FeedEvent) bool {
require.FailNow(t, "DCP callback should not be called")
return false
},
)
clientOptions := mgr.Process.(*AttachmentMigrationManager).getDCPClientOptions(migrationID, db.collectionNameSet(), func(sgbucket.FeedEvent) bool {
require.FailNow(t, "DCP callback should not be called")
return false
}, db.MetadataStore)

dcpClient, err := base.NewDCPClient(
ctx,
Expand Down
1 change: 1 addition & 0 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func (r *ResyncManagerDCP) getDCPClientOptions(db *DatabaseContext, resyncID str
CollectionNames: collectionNames,
CheckpointPrefix: GetResyncDCPCheckpointPrefix(db, resyncID, distributed),
Callback: callback,
MetadataStore: db.MetadataStore,
}
}

Expand Down
1 change: 1 addition & 0 deletions db/import_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (il *importListener) StartImportFeed(dbContext *DatabaseContext) (err error
CollectionNames: dbContext.collectionNameSet(),
Callback: il.ProcessFeedEvent,
DBStats: dbContext.DbStats.Database().ImportFeedMapStats.Map,
MetadataStore: dbContext.MetadataStore,
}

_, err := base.StartDCPFeed(ctx, dbContext.Bucket, feedArgs)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ require (
github.com/couchbase/gocb/v2 v2.12.1
github.com/couchbase/gocbcore/v10 v10.9.2-0.20260430103215-edcade542007
github.com/couchbase/gomemcached v0.2.1
github.com/couchbase/sg-bucket v0.0.0-20260511142200-015dc3334b19
github.com/couchbase/sg-bucket v0.0.0-20260518141224-124a6a2b318e
github.com/couchbasedeps/fast-skiplist v0.0.0-20250722125747-e0dd031fe2ac
github.com/couchbaselabs/go-fleecedelta v0.0.0-20220909152808-6d09efa7a338
github.com/couchbaselabs/gocbconnstr v1.0.5
github.com/couchbaselabs/rosmar v0.0.0-20260511150952-d9eb548723c3
github.com/couchbaselabs/rosmar v0.0.0-20260518154355-0e98444f7414
github.com/elastic/gosigar v0.14.4
github.com/felixge/fgprof v0.9.5
github.com/go-jose/go-jose/v4 v4.1.4
Expand Down
Loading
Loading