diff --git a/atlas_test.go b/atlas_test.go new file mode 100644 index 0000000..fff4baa --- /dev/null +++ b/atlas_test.go @@ -0,0 +1,63 @@ +package flexmgo_test + +import ( + "testing" + "time" + + "git.kanosolution.net/kano/dbflex" + "github.com/sebarcode/codekit" + "github.com/smartystreets/goconvey/convey" +) + +func TestAtlas(t *testing.T) { + convey.Convey("prepare db", t, func() { + connTxt := "mongodb+srv://coba-user:Password.1@cluster0.lobvo.mongodb.net/appdb?retryWrites=true&w=majority" + conn, err := dbflex.NewConnectionFromURI(connTxt, nil) + convey.So(err, convey.ShouldBeNil) + + convey.Convey("connect", func() { + err = conn.Connect() + convey.So(err, convey.ShouldBeNil) + defer conn.Close() + conn.SetFieldNameTag("json") + + convey.Convey("read data", func() { + dest := codekit.M{} + err = conn.Cursor(dbflex.From("info").Select().Take(1), nil).Fetch(&dest).Error() + convey.So(err, convey.ShouldBeNil) + convey.So(dest.GetInt("Version"), convey.ShouldNotEqual, 0) + convey.Println() + convey.Println(codekit.JsonString(dest)) + }) + }) + }) +} + +func TestCluster(t *testing.T) { + convey.Convey("prepare db", t, func() { + connTxt := "mongodb://devops:w0bTOURMkypnd4mIPH2ZPgbB@node01.mongo.bagong.kanosolution.app:27017,node02.mongo.bagong.kanosolution.app:27017,node03.mongo.bagong.kanosolution.app:27017/bis-stg?authSource=admin&retryWrites=true&w=majority" + conn, err := dbflex.NewConnectionFromURI(connTxt, nil) + convey.So(err, convey.ShouldBeNil) + + convey.Convey("connect", func() { + err = conn.Connect() + convey.So(err, convey.ShouldBeNil) + defer conn.Close() + conn.SetFieldNameTag("json") + + convey.Convey("insert data", func() { + _, err = conn.Execute(dbflex.From("info").Insert(), codekit.M{}.Set("data", codekit.M{}.Set("Version", 1).Set("Ts", time.Now()))) + convey.So(err, convey.ShouldBeNil) + + convey.Convey("read data", func() { + dest := codekit.M{} + err = conn.Cursor(dbflex.From("info").Select().Take(1), nil).Fetch(&dest).Error() + convey.So(err, convey.ShouldBeNil) + convey.So(dest.GetInt("Version"), convey.ShouldNotEqual, 0) + convey.Println() + convey.Println(codekit.JsonString(dest)) + }) + }) + }) + }) +} diff --git a/command_handler.go b/command_handler.go new file mode 100644 index 0000000..1e3e842 --- /dev/null +++ b/command_handler.go @@ -0,0 +1,157 @@ +package flexmgo + +import ( + "bufio" + "errors" + "fmt" + "io" + "strings" + + df "git.kanosolution.net/kano/dbflex" + "github.com/sebarcode/codekit" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo/gridfs" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func (q *Query) handleExecuteCommand(conn *Connection) (interface{}, error) { + tablename := q.Config(df.ConfigKeyTableName, "").(string) + coll := conn.db.Collection(tablename) + parts := q.Config(df.ConfigKeyGroupedQueryItems, df.QueryItems{}).(df.QueryItems) + + commands, ok := parts[df.QueryCommand] + if !ok { + return nil, fmt.Errorf("no command") + } + + where := q.Config(df.ConfigKeyWhere, codekit.M{}).(codekit.M) + cmdName := commands.Op + cmdValue := commands.Value + + var ( + bucket *gridfs.Bucket + gfsBuffSize int32 + err error + cmdParm codekit.M + mOK bool + ) + if strings.ToLower(cmdName)[:3] == "gfs" { + gfsBuffSize = int32(cmdParm.Get("size", 1024).(int)) + bucketOpt := new(options.BucketOptions) + bucketOpt.SetChunkSizeBytes(gfsBuffSize) + bucketOpt.SetName(tablename) + bucket, err = gridfs.NewBucket(conn.db, bucketOpt) + if err != nil { + return nil, fmt.Errorf("error prepare GridFS bucket. %s", err.Error()) + } + + cmdParm, mOK = cmdValue.(codekit.M) + if !mOK { + cmdParm = codekit.M{} + } + } + + switch cmdName { + case "gfswrite": + var reader io.Reader + gfsId, hasId := cmdParm["id"] + gfsMetadata, hasMetadata := cmdParm["metadata"] + gfsFileName := cmdParm.GetString("name") + if gfsFileName == "" { + gfsFileName = codekit.RandomString(32) + } + reader, readerOK := cmdParm.Get("source", nil).(io.Reader) + if !readerOK { + return nil, fmt.Errorf("invalid reader") + } + + //-- check if file exist, delete if already exist + if hasId { + bucket.Delete(gfsId) + } + + if !hasMetadata { + gfsMetadata = codekit.M{} + } + uploadOpt := new(options.UploadOptions) + uploadOpt.SetMetadata(gfsMetadata) + if gfsFileName == "" && hasId { + gfsFileName = gfsId.(string) + } + if gfsFileName == "" { + gfsFileName = codekit.RandomString(32) + } + + var objId primitive.ObjectID + if hasId { + err = bucket.UploadFromStreamWithID(gfsId, gfsFileName, reader, uploadOpt) + } else { + objId, err = bucket.UploadFromStream(gfsFileName, reader, uploadOpt) + } + if err != nil { + return nil, fmt.Errorf("error upload file to GridFS. %s", err.Error()) + } + return objId, nil + + case "gfsread": + gfsId, hasId := cmdParm["id"] + gfsFileName := cmdParm.GetString("name") + if gfsFileName == "" && hasId { + gfsFileName = gfsId.(string) + } + dest := cmdParm.Get("output", &bufio.Writer{}).(io.Writer) + var err error + + var ds *gridfs.DownloadStream + if hasId { + ds, err = bucket.OpenDownloadStream(gfsId) + } else { + ds, err = bucket.OpenDownloadStreamByName(gfsFileName) + } + defer ds.Close() + + if err != nil { + return nil, fmt.Errorf("unable to open GFS %s-%s. %s", tablename, gfsFileName, err.Error()) + } + defer ds.Close() + + io.Copy(dest, ds) + return nil, nil + + case "gfsremove", "gfsdelete": + gfsId, hasId := cmdParm["id"] + var err error + if hasId && gfsId != "" { + err = bucket.Delete(gfsId) + } + return nil, err + + case "gfstruncate": + err := bucket.Drop() + return nil, err + + case "distinct": + fieldName := "" + switch cmdValue := cmdValue.(type) { + case string: + fieldName = cmdValue + + case codekit.M: + fieldName = cmdValue.GetString("field") + if fieldName == "" { + return nil, errors.New("field attribute is mandatory") + } + + default: + return nil, errors.New("distinct only accepts string or codekit.M") + } + vs, err := coll.Distinct(conn.ctx, fieldName, where) + if err != nil { + return nil, err + } + return vs, nil + + default: + return nil, fmt.Errorf("invalid command: %v", cmdName) + } +} diff --git a/connection.go b/connection.go index 579ffaa..a5acd2f 100644 --- a/connection.go +++ b/connection.go @@ -2,13 +2,20 @@ package flexmgo import ( "context" + "crypto/tls" + "errors" + "fmt" "strings" "time" - "git.eaciitapp.com/sebar/dbflex" - "github.com/eaciit/toolkit" + "git.kanosolution.net/kano/dbflex" + "github.com/sebarcode/codekit" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readconcern" + "go.mongodb.org/mongo-driver/mongo/writeconcern" ) type Connection struct { @@ -16,50 +23,84 @@ type Connection struct { ctx context.Context client *mongo.Client db *mongo.Database + sess mongo.Session + + _disableTx bool } func (c *Connection) Connect() error { - connURI := "mongodb://" - connURI += c.Host + "/" - - opts := options.Client().ApplyURI(connURI) - //opts.SetConnectTimeout(5 * time.Second) - //opts.SetSocketTimeout(3 * time.Second) - //opts.SetServerSelectionTimeout(3 * time.Second) - if c.User != "" { - opts.SetAuth(options.Credential{ - Username: c.User, - Password: c.Password, - AuthSource: "admin", - }) - } - - for k, v := range c.Config { - klow := strings.ToLower(k) - switch klow { - case "serverselectiontimeout": - opts.SetServerSelectionTimeout( - time.Duration(toolkit.ToInt(v, toolkit.RoundingAuto)) * time.Millisecond) + var opts *options.ClientOptions + + if c.ServerInfo.ConnectionString == "" { + configString := "?" + for k, v := range c.Config { + if strings.ToLower(k) == "authmechanism" && strings.ToLower(v.(string)) == "default" { + configString += k + "=SCRAM-SHA-256&" + continue + } + configString += k + "=" + v.(string) + "&" } - } - //toolkit.Logger().Debugf("opts: %s", toolkit.JsonString(opts)) - client, err := mongo.NewClient(opts) - if err != nil { - return err + connURI := "mongodb://" + + if c.User != "" { + authPrefix := c.User + ":" + c.Password + connURI += authPrefix + "@" + } + connURI += c.Host + "/" + connURI += configString + + opts = options.Client().ApplyURI(connURI) + for k, v := range c.Config { + klow := strings.ToLower(k) + switch klow { + case "serverselectiontimeout": + opts.SetServerSelectionTimeout( + time.Duration(codekit.ToInt(v, codekit.RoundingAuto)) * time.Millisecond) + + case "replicaset": + opts.SetReplicaSet(v.(string)) + //opts.SetWriteConcern() + + case "poolsize": + poolSize := codekit.ToInt(v.(string), codekit.RoundingAuto) + if poolSize > 0 { + opts.SetMaxPoolSize(uint64(poolSize)) + } + + case "tlsinsecure": + opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) + + case "tlscertificate": + opts.SetTLSConfig(loadCerts(&tls.Config{}, strings.Split(v.(string), ",")...)) + + case "writeconcern": + wConcern := writeconcern.New(writeconcern.WMajority()) + opts.SetWriteConcern(wConcern) + + case "idle": + idle := codekit.ToInt(v.(string), codekit.RoundingAuto) + if idle > 0 { + opts.SetMaxConnIdleTime(time.Duration(idle) * time.Second) + } + } + } + } else { + opts = options.Client().ApplyURI(c.ServerInfo.ConnectionString) } - //toolkit.Logger().Debug("client generated: OK") + // logger.Logger().Debug("client generated: OK") if c.ctx == nil { - c.ctx = context.Background() + c.ctx = context.TODO() } - //toolkit.Logger().Debug("context generated: OK") - if err = client.Connect(c.ctx); err != nil { + //logger.Logger().Debugf("opts: %s", codekit.JsonString(opts)) + client, err := mongo.Connect(c.ctx, opts) + if err != nil { return err } - //toolkit.Logger().Debug("client connected: OK") + //logger.Logger().Debug("client connected: OK") if err = client.Ping(c.ctx, nil); err != nil { return err } @@ -72,6 +113,10 @@ func (c *Connection) Connect() error { return nil } +func loadCerts(cfg *tls.Config, certFiles ...string) *tls.Config { + return cfg +} + func (c *Connection) Mdb() *mongo.Database { return c.db } @@ -99,52 +144,158 @@ func (c *Connection) NewQuery() dbflex.IQuery { return q } -func (c *Connection) DropTable(name string) error { - return c.db.Collection(name).Drop(c.ctx) +func (c *Connection) EnsureTable(name string, keys []string, obj interface{}) error { + return nil } -/* -func (c *Connection) Prepare(dbflex.ICommand) (dbflex.IQuery, error) { - panic("not implemented") -} +func (c *Connection) EnsureIndex(tableName, indexName string, isUnique bool, fields ...string) error { + indexFound := false + currentIndex := bson.M{} + ctx := context.Background() + coll := c.db.Collection(tableName) + cursorIndex, e := coll.Indexes().List(ctx) + if e != nil { + return e + } -func (c *Connection) Execute(dbflex.ICommand, toolkit.M) (interface{}, error) { - panic("not implemented") -} + for cursorIndex.Next(ctx) { + if e := cursorIndex.Decode(¤tIndex); e != nil { + continue + } + + if currentIndex["name"].(string) == indexName { + indexFound = true + break + } + } + + createIndex := false + if indexFound { + keys := currentIndex["key"].(primitive.M) + unique, uniqueOK := currentIndex["unique"] + if uniqueOK && unique != isUnique { + createIndex = true + } else { + keyChecking: + for _, f := range fields { + fieldName := f + indexValue := 1 + if f[0] == '-' { + fieldName = f[1:] + indexValue = -1 + } + + if existingIndexValue, ok := keys[fieldName]; !ok { + createIndex = true + break keyChecking + } else if existingIndexValue != indexValue { + createIndex = true + break keyChecking + } + } + + } + } else { + createIndex = true + } + + if createIndex { + if indexFound { + coll.Indexes().DropOne(ctx, indexName) + } + + indexKeys := bson.D{} + for _, f := range fields { + if f[0] == '-' { + indexKeys = append(indexKeys, bson.E{f[1:], -1}) + } else { + indexKeys = append(indexKeys, bson.E{f, 1}) + } + } -func (c *Connection) Cursor(dbflex.ICommand, toolkit.M) dbflex.ICursor { - panic("not implemented") + if _, err := coll.Indexes().CreateOne(ctx, mongo.IndexModel{ + Keys: indexKeys, + Options: options.Index().SetName(indexName).SetUnique(isUnique), + }); err != nil { + return err + } + + } + return nil } -func (c *Connection) NewQuery() dbflex.IQuery { - panic("not implemented") +func (c *Connection) DropTable(name string) error { + return c.db.Collection(name).Drop(c.ctx) } -func (c *Connection) ObjectNames(dbflex.ObjTypeEnum) []string { - panic("not implemented") +func (c *Connection) BeginTx() error { + if c._disableTx { + return errors.New("tx is disabled") + } + + wc := writeconcern.Majority() + rc := readconcern.Snapshot() + txnOpts := options.Transaction().SetWriteConcern(wc).SetReadConcern(rc) + + if c.sess != nil { + return fmt.Errorf("session already exist. Pls commit or rollback last") + } + + sess, err := c.client.StartSession() + if err != nil { + return fmt.Errorf("unable to start new transaction. %s", err.Error()) + } + sess.StartTransaction(txnOpts) + c.sess = sess + return nil } -func (c *Connection) ValidateTable(interface{}, bool) error { - panic("not implemented") +func (c *Connection) Commit() error { + if c.sess == nil { + return fmt.Errorf("transaction session is not exists yet") + } + + err := c.sess.CommitTransaction(c.ctx) + if err != nil { + return fmt.Errorf("unable to commit. %s", err.Error()) + } + + c.sess = nil + return nil } -func (c *Connection) DropTable(string) error { - panic("not implemented") +func (c *Connection) RollBack() error { + if c.sess == nil { + return fmt.Errorf("transaction session is not exists yet") + } + + err := c.sess.AbortTransaction(c.ctx) + if err != nil { + return fmt.Errorf("unable to rollback. %s", err.Error()) + } + + c.sess = nil + return nil } -func (c *Connection) SetThis(dbflex.IConnection) dbflex.IConnection { - panic("not implemented") +func (c *Connection) DisableTx(disable bool) { + c._disableTx = disable } -func (c *Connection) This() dbflex.IConnection { - panic("not implemented") +func (c *Connection) IsTx() bool { + return c.sess != nil } -func (c *Connection) SetFieldNameTag(string) { - panic("not implemented") +// SupportTx to identify if underlying connection support Tx or not +func (c *Connection) SupportTx() bool { + return !c._disableTx } -func (c *Connection) FieldNameTag() string { - panic("not implemented") +func (c *Connection) ObjectNames(kind dbflex.ObjTypeEnum) []string { + if !(kind == dbflex.ObjTypeAll || kind == dbflex.ObjTypeTable) { + return []string{} + } + + names, _ := c.db.ListCollectionNames(context.TODO(), bson.D{}) + return names } -*/ diff --git a/cursor.go b/cursor.go index 5248998..c6c5006 100644 --- a/cursor.go +++ b/cursor.go @@ -5,143 +5,146 @@ import ( "io" "reflect" - "git.eaciitapp.com/sebar/dbflex" - "github.com/eaciit/toolkit" + "git.kanosolution.net/kano/dbflex" + "github.com/sebarcode/codekit" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) type Cursor struct { dbflex.CursorBase mc *mongo.Cursor - tablename string - countParm toolkit.M + //tablename string + countParm codekit.M conn *Connection cursor *mongo.Cursor } -func (cr *Cursor) Close() { +func (cr *Cursor) Close() error { + e := cr.Error() if cr.mc != nil { cr.mc.Close(cr.conn.ctx) } + return e } func (cr *Cursor) Count() int { - sr := cr.conn.db.RunCommand(cr.conn.ctx, cr.countParm) - if sr.Err() != nil { - dbflex.Logger().Errorf("unablet to get count. %s", sr.Err().Error()) - return -1 + if cr.countParm == nil || len(cr.countParm) == 0 { + return 0 } - countModel := new(struct{ N int }) - if err := sr.Decode(countModel); err != nil { - dbflex.Logger().Errorf("unablet to decode count. %s", sr.Err().Error()) - return -1 + if cr.countParm.Get("count") == "" { + return 0 } - return countModel.N + + tableName := cr.countParm.GetString("count") + where := cr.countParm.Get("query", nil) + if where == nil { + n, _ := cr.conn.db.Collection(tableName).EstimatedDocumentCount(cr.conn.ctx) //.CountDocuments(cr.conn.ctx, codekit.M{}) + return int(n) + } else { + n, _ := cr.conn.db.Collection(tableName).CountDocuments(cr.conn.ctx, where, options.Count().SetLimit(10000000)) + return int(n) + } + + /* + sr := cr.conn.db.RunCommand(cr.conn.ctx, cr.countParm) + if sr.Err() != nil { + dbflex.Logger().Errorf("unable to get count. %s, countparm: %s", + sr.Err().Error(), + codekit.JsonString(cr.countParm)) + return 0 + } + + countModel := new(struct{ N int }) + if err := sr.Decode(countModel); err != nil { + dbflex.Logger().Errorf("unablet to decode count. %s", sr.Err().Error()) + return 0 + } + return countModel.N + */ } -func (cr *Cursor) Fetch(out interface{}) error { +func (cr *Cursor) Fetch(out interface{}) dbflex.ICursor { if cr.Error() != nil { - return toolkit.Errorf("unable to fetch data. %s", cr.Error()) + cr.SetError(fmt.Errorf("unable to fetch data. %s", cr.Error())) + return cr } if neof := cr.cursor.Next(cr.conn.ctx); !neof { - return io.EOF + cr.SetError(io.EOF) + return cr } if err := cr.cursor.Decode(out); err != nil { - return toolkit.Errorf("unable to decode output. %s", err.Error()) + cr.SetError(fmt.Errorf("unable to decode output. %s", err.Error())) + return cr } - return nil + return cr } -func (cr *Cursor) Fetchs(result interface{}, n int) error { +func (cr *Cursor) Fetchs(result interface{}, n int) dbflex.ICursor { if cr.Error() != nil { - return toolkit.Errorf("unable to fetch data. %s", cr.Error()) + cr.SetError(fmt.Errorf("unable to fetch data. %s", cr.Error())) + return cr } - v := reflect.TypeOf(result).Elem().Elem() - ivs := reflect.MakeSlice(reflect.SliceOf(v), 0, 0) + v := reflect.ValueOf(result) + if v.Kind() != reflect.Ptr { + return cr.SetError(fmt.Errorf("result should be a pointer of slice")) + } + v = v.Elem() + if v.Kind() != reflect.Slice { + return cr.SetError(fmt.Errorf("result should be a pointer of slice")) + } + + sliceType := v.Type() + elemType := sliceType.Elem() + elemIsPtr := elemType.Kind() == reflect.Ptr + destBuffer := reflect.MakeSlice(sliceType, 1000, 1000) read := 0 - for { - if !cr.cursor.Next(cr.conn.ctx) { - break + used := 0 + for cr.cursor.Next(cr.conn.ctx) { + destItemValue := createPtrFromType(elemType) + destItem := destItemValue.Interface() + err := cr.cursor.Decode(destItem) + if err != nil { + cr.SetError(fmt.Errorf("unable to decode cursor data. %s", err.Error())) + return cr } - iv := reflect.New(v).Interface() - err := cr.cursor.Decode(iv) - if err != nil { - return fmt.Errorf("unable to decode cursor data. %s", err.Error()) + if elemIsPtr { + destBuffer.Index(read).Set(reflect.ValueOf(destItem)) + } else { + destBuffer.Index(read).Set(reflect.ValueOf(destItem).Elem()) } - ivs = reflect.Append(ivs, reflect.ValueOf(iv).Elem()) read++ + used++ + + if used == 1000 { + used = 0 + newLen := read + 1000 + biggerBuffer := reflect.MakeSlice(sliceType, newLen, newLen) + reflect.Copy(biggerBuffer, destBuffer) + destBuffer = biggerBuffer + } + if n != 0 && read == n { break } } - reflect.ValueOf(result).Elem().Set(ivs) - return nil -} - -/* -func (cr *Cursor) Reset() error { - panic("not implemented") -} - -func (cr *Cursor) Fetch(interface{}) error { - panic("not implemented") -} - -func (cr *Cursor) Fetchs(interface{}, int) error { - panic("not implemented") -} + if destBuffer.Len() != read { + lesseBuffer := reflect.MakeSlice(sliceType, read, read) + reflect.Copy(lesseBuffer, destBuffer) + v.Set(lesseBuffer) + } else { + v.Set(destBuffer) + } -func (cr *Cursor) CountAsync() <-chan int { - panic("not implemented") -} - -func (cr *Cursor) Error() error { - panic("not implemented") -} - -func (cr *Cursor) CloseAfterFetch() bool { - panic("not implemented") -} - -func (cr *Cursor) SetCountCommand(dbflex.ICommand) { - panic("not implemented") -} - -func (cr *Cursor) CountCommand() dbflex.ICommand { - panic("not implemented") -} - -func (cr *Cursor) Connection() dbflex.IConnection { - panic("not implemented") -} - -func (cr *Cursor) SetConnection(dbflex.IConnection) { - panic("not implemented") -} - -func (cr *Cursor) ConfigRef(key string, def interface{}, out interface{}) { - panic("not implemented") -} - -func (cr *Cursor) Set(key string, value interface{}) { - panic("not implemented") -} - -func (cr *Cursor) SetCloseAfterFetch() dbflex.ICursor { - panic("not implemented") -} - -func (cr *Cursor) AutoClose(time.Duration) dbflex.ICursor { - panic("not implemented") + return cr } -*/ diff --git a/filter_test.go b/filter_test.go new file mode 100644 index 0000000..128c4fa --- /dev/null +++ b/filter_test.go @@ -0,0 +1,23 @@ +package flexmgo_test + +import ( + "fmt" + "testing" + + "git.kanosolution.net/kano/dbflex" + "github.com/ariefdarmawan/flexmgo" + "github.com/sebarcode/codekit" +) + +func TestElemMatch(t *testing.T) { + queryFilter := dbflex.And( + dbflex.Eq("SafeCardType", "Safe"), + dbflex.Or( + dbflex.ElemMatch("Dimension", dbflex.Eq("Kind", "Company"), dbflex.Eq("Value", "Kano")), + dbflex.ElemMatch("Dimension", dbflex.Eq("Kind", "Project"), dbflex.Eq("Value", "Petrosea")), + )) + + q := new(flexmgo.Query) + qfm, _ := q.BuildFilter(queryFilter) + fmt.Println("filter", codekit.JsonString(qfm)) +} diff --git a/flexmgo.go b/flexmgo.go index 3012413..8b88c51 100644 --- a/flexmgo.go +++ b/flexmgo.go @@ -1,6 +1,6 @@ package flexmgo -import "git.eaciitapp.com/sebar/dbflex" +import "git.kanosolution.net/kano/dbflex" func init() { dbflex.RegisterDriver("mongodb", func(si *dbflex.ServerInfo) dbflex.IConnection { @@ -9,4 +9,11 @@ func init() { c.SetThis(c) return c }) + + dbflex.RegisterDriver("mongodb+srv", func(si *dbflex.ServerInfo) dbflex.IConnection { + c := new(Connection) + c.ServerInfo = *si + c.SetThis(c) + return c + }) } diff --git a/flexmgo_test.go b/flexmgo_test.go index 938e8de..19aead3 100644 --- a/flexmgo_test.go +++ b/flexmgo_test.go @@ -3,28 +3,36 @@ package flexmgo_test import ( "bufio" "bytes" - "errors" + "context" "fmt" "math" "strings" "testing" "time" - "git.eaciitapp.com/sebar/dbflex/orm" - _ "github.com/eaciit/flexmgo" - - "git.eaciitapp.com/sebar/dbflex" - "github.com/eaciit/toolkit" + "git.kanosolution.net/kano/dbflex/orm" + _ "github.com/ariefdarmawan/flexmgo" + logger "github.com/sebarcode/logger" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "git.kanosolution.net/kano/dbflex" + "github.com/sebarcode/codekit" + "github.com/smartystreets/goconvey/convey" cv "github.com/smartystreets/goconvey/convey" ) const ( connTxt = "mongodb://localhost:27017/dbapp" + host = "mongodb://localhost:27017" + dbname = "dbapp" ) func init() { fmt.Println("Debug level is activated") - toolkit.Logger().SetLevelStdOut(toolkit.DebugLevel, true) + logger.Logger().SetLevelStdOut(logger.DebugLevel, true) } func TestConnect(t *testing.T) { @@ -35,7 +43,7 @@ func TestConnect(t *testing.T) { }) } -var tablename = "testrecord" +var tablename = "test_arief" func TestConnectFail(t *testing.T) { cv.Convey("connect", t, func() { @@ -56,14 +64,14 @@ func TestSaveData(t *testing.T) { es := []error{} for i := 1; i <= 10; i++ { r := new(Record) - r.ID = toolkit.Sprintf("record-id-%d", i) - r.Title = "Title is " + toolkit.RandomString(32) - r.Age = toolkit.RandInt(10) + 18 - r.Salary = toolkit.RandFloat(8000, 4) + float64(5000) + r.ID = fmt.Sprintf("record-id-%d", i) + r.Title = "Title is " + codekit.RandomString(32) + r.Age = codekit.RandInt(10) + 18 + r.Salary = codekit.RandFloat(8000, 4) + float64(5000) r.DateJoin = time.Date(2000, 1, 1, 0, 0, 0, 0, - time.Now().Location()).Add(24 * time.Hour * time.Duration(toolkit.RandInt(1000))) + time.Now().Location()).Add(24 * time.Hour * time.Duration(codekit.RandInt(1000))) - if _, err := conn.Execute(cmd, toolkit.M{}.Set("data", r)); err != nil { + if _, err := conn.Execute(cmd, codekit.M{}.Set("data", r)); err != nil { es = append(es, err) break } @@ -99,46 +107,49 @@ func TestListData(t *testing.T) { }, "gt": { - dbflex.Gt("salary", 1000), + dbflex.Gt("Salary", 1000), func(r *Record) bool { return r.Salary > 1000 }, }, "gte": { - dbflex.Gte("salary", 1000), func(r *Record) bool { + dbflex.Gte("Salary", 1000), func(r *Record) bool { return r.Salary >= 1000 }, }, - "lt": {dbflex.Lt("age", 80), func(r *Record) bool { + "lt": {dbflex.Lt("Age", 80), func(r *Record) bool { return r.Age < 80 }, }, - "lte": {dbflex.Lte("age", 90), func(r *Record) bool { + "lte": {dbflex.Lte("Age", 90), func(r *Record) bool { return r.Age <= 90 }, }, - "range": {dbflex.Range("datejoin", - toolkit.String2Date("2000-01-01", "YYYY-mm-dd"), + "range": {dbflex.Range("DateJoin", + codekit.String2Date("2000-01-01", "YYYY-MM-DD"), time.Now()), func(r *Record) bool { - return r.DateJoin.After(toolkit.String2Date("2000-01-01", "YYYY-mm-dd")) && + return r.DateJoin.After(codekit.String2Date("2000-01-01", "YYYY-MM-DD")) && r.DateJoin.Before(time.Now()) }, }, } for key, sc := range scenarios { + if key != "range" { + continue + } cv.Convey("scenario "+key, t, func() { conn, err := connect() cv.So(err, cv.ShouldBeNil) defer conn.Close() - cv.Convey("fetchs", func() { + cv.Convey("fetch ", func() { cur := conn.Cursor(dbflex.From(tablename).Select().Where(sc.filter), nil) defer cur.Close() rs := []*Record{} - err := cur.Fetchs(&rs, 0) + err := cur.Fetchs(&rs, 0).Error() cv.So(err, cv.ShouldBeNil) cv.So(len(rs), cv.ShouldBeGreaterThan, 0) cv.So(sc.validator(rs[0]), cv.ShouldBeTrue) @@ -157,12 +168,13 @@ func TestUpdateData(t *testing.T) { r := new(Record) cmdget := dbflex.From(tablename).Select().Where(dbflex.Eq("_id", "record-id-3")) cur := conn.Cursor(cmdget, nil) - err := cur.Fetch(r) + err := cur.Fetch(r).Error() cv.So(err, cv.ShouldBeNil) cv.So(r.Title, cv.ShouldStartWith, "Title is ") cv.Convey("update data", func() { - _, err = conn.Execute(dbflex.From(tablename).Save(), toolkit.M{}.Set("data", r)) + cmd := dbflex.From(tablename).Update("title") + _, err = conn.Execute(cmd, codekit.M{}.Set("data", r)) cv.So(err, cv.ShouldBeNil) cv.Convey("vaidate", func() { @@ -175,37 +187,107 @@ func TestUpdateData(t *testing.T) { }) } -/* -func TestWatch(t *testing.T) { - cv.Convey("change stream", t, func() { - conn, err := connect() +type country struct { + ID string `bson:"_id" json:"_id" ecname:"_id"` + Title string +} + +type state struct { + ID string `bson:"_id" json:"_id" ecname:"_id"` + Title string + CountryID string +} + +var ( + countriesTableName = "countries" + stateTableName = "states" +) + +func TestMdbTrx(t *testing.T) { + cv.Convey("connect", t, func() { + conn, err := connectTrx() cv.So(err, cv.ShouldBeNil) defer conn.Close() - changed := make(chan toolkit.M) - cmd := dbflex.From(tablename).Command("watch") - _, err = conn.Execute(cmd, toolkit.M{}. - Set("fn", func(data toolkit.M) { - changed <- data - close(changed) - })) - cv.So(err, cv.ShouldBeNil) + cv.Convey("insert countries without trx", func() { + countries := []*country{ + {"SG", "Singapore"}, + {"ID", "Indonesia"}, + {"MY", "Malaysia"}, + {"IN", "India"}, + } - cv.Convey("validate", func() { - m := toolkit.M{} - cmdGet := dbflex.From(tablename).Select().Where(dbflex.Eq("_id", "record-id-5")) - err := conn.Cursor(cmdGet, nil).SetCloseAfterFetch().Fetch(&m) + err = nil + cmd := dbflex.From(countriesTableName).Save() + for _, country := range countries { + _, err = conn.Execute(cmd, codekit.M{}.Set("data", country)) + if err != nil { + break + } + } cv.So(err, cv.ShouldBeNil) - m.Set("title", "Test Change Stream") - _, err = conn.Execute(dbflex.From(tablename).Save(), toolkit.M{}.Set("data", m)) + cmd = dbflex.From(countriesTableName).Select() + cur := conn.Cursor(cmd, nil) + cv.So(cur.Error(), cv.ShouldBeNil) + defer cur.Close() + + ms := []codekit.M{} + cv.So(cur.Fetchs(&ms, 0).Error(), cv.ShouldBeNil) + cv.So(len(ms), cv.ShouldEqual, len(countries)) + + cv.Convey("insert state with trx", func() { + conn.Execute(dbflex.From(stateTableName).Delete(), nil) + + err = conn.BeginTx() + cv.So(err, cv.ShouldBeNil) + + states := []*state{ + {"SG", "Singapore", "SG"}, + {"JK", "Jakarta", "ID"}, + {"MB", "Mumbai", "IN"}, + } + cmd := dbflex.From(stateTableName).Save() + for _, state := range states { + _, err = conn.Execute(cmd, codekit.M{}.Set("data", state)) + if err != nil { + break + } + } + cv.So(err, cv.ShouldBeNil) + + commitErr := conn.Commit() + cv.So(commitErr, cv.ShouldBeNil) - changedData := <-changed - cv.So(changedData.GetString("title"), cv.ShouldEqual, "Test Change Stream") + cmd = dbflex.From(stateTableName).Select() + cur := conn.Cursor(cmd, nil) + cv.So(cur.Error(), cv.ShouldBeNil) + cur.Close() + ms := []codekit.M{} + cv.So(cur.Fetchs(&ms, 0).Error(), cv.ShouldBeNil) + cv.So(len(ms), cv.ShouldEqual, len(states)) + + cv.Convey("rollback", func() { + err = conn.BeginTx() + cv.So(err, cv.ShouldBeNil) + + cmd := dbflex.From(stateTableName).Insert() + conn.Execute(cmd, codekit.M{}.Set("data", &state{"JT", "Jawa Timur", "ID"})) + err = conn.RollBack() + cv.So(err, cv.ShouldBeNil) + + cmd = dbflex.From(stateTableName).Select() + cur := conn.Cursor(cmd, nil) + cv.So(cur.Error(), cv.ShouldBeNil) + cur.Close() + ms1 := []codekit.M{} + cv.So(cur.Fetchs(&ms1, 0).Error(), cv.ShouldBeNil) + cv.So(len(ms1), cv.ShouldEqual, len(states)) + }) + }) }) }) } -*/ func TestDeleteData(t *testing.T) { cv.Convey("connect", t, func() { @@ -235,7 +317,7 @@ func TestAggregateData(t *testing.T) { defer conn.Close() cv.Convey("aggregate", func() { - cmd := dbflex.From(tablename).Aggr(dbflex.NewAggrItem("salary", dbflex.AggrSum, "salary")) + cmd := dbflex.From(tablename).Aggr(dbflex.NewAggrItem("Salary", dbflex.AggrSum, "Salary")) cur := conn.Cursor(cmd, nil) cv.So(cur.Error(), cv.ShouldBeNil) defer cur.Close() @@ -247,7 +329,7 @@ func TestAggregateData(t *testing.T) { total := float64(0) for { r := new(Record) - if err := cur2.Fetch(r); err == nil { + if err := cur2.Fetch(r).Error(); err == nil { total += r.Salary } else { break @@ -258,7 +340,7 @@ func TestAggregateData(t *testing.T) { cur.Fetch(aggrModel) cv.So(math.Abs(aggrModel.Salary-total), cv.ShouldBeLessThan, 1) - toolkit.Logger().Debugf("total is: %v", total) + logger.Logger().Debugf("total is: %v", total) }) }) }) @@ -279,7 +361,7 @@ func TestDropTable(t *testing.T) { func TestGridFsUpdate(t *testing.T) { cv.Convey("preparing file", t, func() { - data := []byte(toolkit.RandomString(512)) + data := []byte(codekit.RandomString(512)) conn, _ := connect() defer conn.Close() @@ -287,28 +369,36 @@ func TestGridFsUpdate(t *testing.T) { buff := bytes.NewReader([]byte(data)) reader := bufio.NewReader(buff) - cmd := dbflex.From("fs").Command("GfsWrite") - metadata := toolkit.M{}.Set("data", "ini adalah meta") - _, err := conn.Execute(cmd, toolkit.M{}. + metadata1 := codekit.M{}.Set("data", "ini adalah meta1") + cmd := dbflex.From("fs").Command("gfswrite", codekit.M{}. Set("id", "doc1"). - Set("metadata", metadata). + Set("metadata", metadata1). + Set("source", reader)) + _, err := conn.Execute(cmd, nil) + cv.So(err, cv.ShouldBeNil) + + metadata2 := codekit.M{}.Set("data", "ini adalah meta2") + cmd = dbflex.From("fs").Command("gfswrite", codekit.M{}. + Set("id", "doc2"). + Set("metadata", metadata2). Set("source", reader)) + _, err = conn.Execute(cmd, nil) cv.So(err, cv.ShouldBeNil) cv.Convey("reading from grid", func() { var buff bytes.Buffer writer := bufio.NewWriter(&buff) - cmd := dbflex.From("fs").Command("gfsread") - _, err := conn.Execute(cmd, toolkit.M{}. + cmd := dbflex.From("fs").Command("gfsread", codekit.M{}. Set("id", "doc1"). Set("output", writer)) + _, err := conn.Execute(cmd, nil) cv.So(err, cv.ShouldBeNil) cv.So(string(data), cv.ShouldEqual, string(buff.Bytes())) cv.Convey("delete grid", func() { - cmd := dbflex.From("fs").Command("gfsdelete") - _, err := conn.Execute(cmd, toolkit.M{}.Set("id", "doc1")) + cmd := dbflex.From("fs").Command("gfsdelete", codekit.M{}.Set("id", "doc1")) + _, err := conn.Execute(cmd, nil) cv.So(err, cv.ShouldBeNil) }) }) @@ -316,6 +406,73 @@ func TestGridFsUpdate(t *testing.T) { }) } +func TestCheckIndex(t *testing.T) { + cv.Convey("connect and create index", t, func() { + conn, _ := connect() + defer conn.Close() + + e := conn.EnsureIndex(tablename, "record_salary_index", false, "Age", "-Salary") + cv.So(e, cv.ShouldBeNil) + + cv.Convey("validate", func() { + ctx := context.TODO() + mconn, _ := mongo.Connect(ctx, options.Client().ApplyURI(host)) + defer mconn.Disconnect(ctx) + + indexCursor, e := mconn.Database(dbname).Collection(tablename).Indexes().List(ctx) + cv.So(e, cv.ShouldBeNil) + + indexModels := []bson.M{} + indexCursor.All(ctx, &indexModels) + + found := false + indexOk := false + for _, index := range indexModels { + //cv.Println(codekit.JsonString(index)) + name, nameOk := index["name"].(string) + if nameOk && name == "record_salary_index" { + found = true + key := index["key"].(primitive.M) + indexOk = key["Age"].(int32) == 1 && key["Salary"].(int32) == -1 + } + } + + cv.So(found, cv.ShouldBeTrue) + cv.So(indexOk, cv.ShouldBeTrue) + + convey.Convey("reindex", func() { + e := conn.EnsureIndex(tablename, "record_salary_index", false, "Age", "-Salary") + cv.So(e, cv.ShouldBeNil) + + mconn.Database(dbname).Collection(tablename).Indexes().DropOne(ctx, "record_salary_index") + }) + }) + }) +} + +func TestSelectCommand(t *testing.T) { + convey.Convey("select command", t, func() { + convey.Convey("connect", func() { + c, e := connect() + convey.So(e, convey.ShouldBeNil) + defer c.Close() + + convey.Convey("select", func() { + cmd := dbflex.From("mytable").Command("command", codekit.M{"find": "mytable"}) + cur := c.Cursor(cmd, nil) + convey.So(cur.Error(), convey.ShouldBeNil) + defer cur.Close() + + res := []codekit.M{} + cur.Fetchs(&res, 0) + convey.So(cur.Error(), convey.ShouldBeNil) + convey.Printf("records returned: %d\n", len(res)) + convey.Printf("serialized: %s\n", res) + }) + }) + }) +} + /* TO DO - Command Cursor @@ -326,6 +483,20 @@ TO DO func connect() (dbflex.IConnection, error) { if conn, err := dbflex.NewConnectionFromURI(connTxt, nil); err == nil { if err = conn.Connect(); err == nil { + conn.SetFieldNameTag("json") + return conn, nil + } else { + return nil, err + } + } else { + return nil, err + } +} + +func connectTrx() (dbflex.IConnection, error) { + if conn, err := dbflex.NewConnectionFromURI("mongodb://localhost:27017/rsdb", nil); err == nil { + if err = conn.Connect(); err == nil { + conn.SetFieldNameTag("json") return conn, nil } else { return nil, err @@ -333,12 +504,11 @@ func connect() (dbflex.IConnection, error) { } else { return nil, err } - return nil, errors.New("not implemented yet") } type Record struct { - orm.DataModelBase `bson:"-" json:"-" ecname:"-"` - ID string `bson:"_id" json:"_id" ecname:"_id"` + orm.DataModelBase `bson:"-" json:"-"` + ID string `bson:"_id" json:"_id"` Title string Age int Salary float64 diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..822cd93 --- /dev/null +++ b/go.mod @@ -0,0 +1,36 @@ +module github.com/ariefdarmawan/flexmgo + +go 1.19 + +// replace git.kanosolution.net/kano/dbflex => ../dbflex + +require ( + git.kanosolution.net/kano/dbflex v1.3.10-0.20250929123842-faeaeb54926e + github.com/ariefdarmawan/serde v0.1.1 + github.com/sebarcode/codekit v0.1.2 + github.com/sebarcode/logger v0.1.1 + github.com/smartystreets/goconvey v1.7.2 + go.mongodb.org/mongo-driver v1.12.1 +) + +require ( + github.com/ariefdarmawan/reflector v0.0.3 // indirect + github.com/eaciit/toolkit v0.0.0-20210610161449-593d5fadf78e // indirect + github.com/golang/snappy v0.0.1 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect + github.com/jtolds/gls v4.20.0+incompatible // indirect + github.com/klauspost/compress v1.13.6 // indirect + github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/smartystreets/assertions v1.2.0 // indirect + github.com/stretchr/testify v1.6.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect + golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect + golang.org/x/text v0.7.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..0094c4c --- /dev/null +++ b/go.sum @@ -0,0 +1,95 @@ +git.kanosolution.net/kano/dbflex v1.3.10-0.20250929123842-faeaeb54926e h1:ERrW3A9gryryblfQFh6P2dRC3PIMdhKWCynNi8P8Dwk= +git.kanosolution.net/kano/dbflex v1.3.10-0.20250929123842-faeaeb54926e/go.mod h1:lITYkX18Kv7GL/4cPFEkNevlcyqzhUTGegk5XQsDhfg= +github.com/ariefdarmawan/reflector v0.0.3 h1:qbfwP5iKMvdYBWRG2jZyoXRLPsGedJ1sNcC8bw+MhpY= +github.com/ariefdarmawan/reflector v0.0.3/go.mod h1:xxeY7n6iT0q5pK4j3s8l2xo1CvmQgS2oYtj0lsV4eo4= +github.com/ariefdarmawan/serde v0.1.1 h1:TIhTKKFfK4WfYcS/hW1nSD81Uqd1jkY9Xli8e/3h/MA= +github.com/ariefdarmawan/serde v0.1.1/go.mod h1:geSqYU2wYlGlmOePjLbU9S1A4FkkaQB59vuAnTeZclM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eaciit/toolkit v0.0.0-20210610161449-593d5fadf78e h1:2/wdIW25ZaPHfKQ5HvQUSAqeH9KAZQH1PlCMnrSJ9xE= +github.com/eaciit/toolkit v0.0.0-20210610161449-593d5fadf78e/go.mod h1:r4OKDNGrY6n6gCVqEFdld+JTfgdNgp78RPvZHr2B9jU= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sebarcode/codekit v0.1.0/go.mod h1:o54sVKGC7+M1leMkKhVXssOXwuHCylW5GJto0UiQJEg= +github.com/sebarcode/codekit v0.1.2 h1:ycQnlEo+YTbqXfRlp+K7J3ASQkDiu/3sWUm790Eexl4= +github.com/sebarcode/codekit v0.1.2/go.mod h1:CNHNFOALfsqgDmu+GIFSo4RAoCsECBCS84RmHbqj88I= +github.com/sebarcode/logger v0.1.1 h1:qy8Ip5UacaK4LFE0hN/HXUCIu3ZBe4uta8WHQ067jfQ= +github.com/sebarcode/logger v0.1.1/go.mod h1:dBcREBmgQLUN1kAAs5t9y7dN5RUp+OYEsk4rqUWuM5Q= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= +github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= +github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.12.1 h1:nLkghSU8fQNaK7oUmDhQFsnrtcoNy7Z6LVFKsEecqgE= +go.mongodb.org/mongo-driver v1.12.1/go.mod h1:/rGBTebI3XYboVmgz+Wv3Bcbl3aD0QF9zl6kDDw18rQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helper.go b/helper.go new file mode 100644 index 0000000..e3d7ac2 --- /dev/null +++ b/helper.go @@ -0,0 +1,21 @@ +package flexmgo + +import "reflect" + +func createPtrFromType(t reflect.Type) reflect.Value { + isPtr := t.Kind() == reflect.Ptr + elemType := t + + if isPtr { + elemType = elemType.Elem() + } + + if elemType.Kind() == reflect.Map { + ptr := reflect.New(elemType) + m := reflect.MakeMap(elemType) + ptr.Elem().Set(m) + return ptr + } + + return reflect.New(elemType) +} diff --git a/query.go b/query.go index 6900b4c..e3eb9fe 100644 --- a/query.go +++ b/query.go @@ -1,21 +1,19 @@ package flexmgo import ( + "context" + "errors" "fmt" - "io" "strings" - "time" - "git.eaciitapp.com/sebar/dbflex" - df "git.eaciitapp.com/sebar/dbflex" - "github.com/eaciit/toolkit" - . "github.com/eaciit/toolkit" + "git.kanosolution.net/kano/dbflex" + df "git.kanosolution.net/kano/dbflex" + "github.com/ariefdarmawan/serde" + "github.com/sebarcode/codekit" + . "github.com/sebarcode/codekit" - "bufio" - - "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/gridfs" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -30,11 +28,13 @@ func (q *Query) BuildCommand() (interface{}, error) { func (q *Query) BuildFilter(f *df.Filter) (interface{}, error) { fm := M{} if f.Op == df.OpEq { - fm.Set(f.Field, M{}.Set("$eq", f.Value)) + fm.Set(f.Field, f.Value) } else if f.Op == df.OpNe { fm.Set(f.Field, M{}.Set("$ne", f.Value)) + } else if f.Op == "$text" { + fm.Set("$text", M{}.Set("$search", f.Value)) } else if f.Op == df.OpContains { - fs := f.Value.([]string) + fs := f.Value.([]interface{}) if len(fs) > 1 { bfs := []interface{}{} for _, ff := range fs { @@ -50,13 +50,13 @@ func (q *Query) BuildFilter(f *df.Filter) (interface{}, error) { Set("$regex", fmt.Sprintf(".*%s.*", fs[0])). Set("$options", "i")) } - } else if f.Op == df.OpStartWith { + } else if f.Op == df.OpStartsWith { fm.Set(f.Field, M{}. - Set("$regex", fmt.Sprintf("^%s.*$", f.Value)). + Set("$regex", fmt.Sprintf("^%s", f.Value)). Set("$options", "i")) - } else if f.Op == df.OpEndWith { + } else if f.Op == df.OpEndsWith { fm.Set(f.Field, M{}. - Set("$regex", fmt.Sprintf("^.*%s$", f.Value)). + Set("$regex", fmt.Sprintf("%s$", f.Value)). Set("$options", "i")) } else if f.Op == df.OpIn { fm.Set(f.Field, M{}.Set("$in", f.Value)) @@ -85,7 +85,6 @@ func (q *Query) BuildFilter(f *df.Filter) (interface{}, error) { bfs = append(bfs, bf) } } - fm.Set(string(f.Op), bfs) } else if f.Op == df.OpNot { bf, eb := q.BuildFilter(f.Items[0]) @@ -93,8 +92,27 @@ func (q *Query) BuildFilter(f *df.Filter) (interface{}, error) { field := f.Items[0].Field fm.Set(field, M{}.Set("$not", bf.(M).Get(field))) } + } else if f.Op == df.OpAll { + values, ok := f.Value.([]interface{}) + if !ok { + return nil, fmt.Errorf("fail to translate %s. %s", f.Op, JsonString(f)) + } + fm.Set(f.Field, M{}.Set("$all", values)) + } else if f.Op == df.OpElemMatch { + matchM := M{} + for _, item := range f.Items { + bf, eb := q.BuildFilter(item) + if eb != nil { + return nil, fmt.Errorf("error translate filter %s", JsonString(item)) + } + bfm := bf.(codekit.M) + for k, v := range bfm { + matchM.Set(k, v) + } + } + fm.Set(f.Field, M{}.Set("$elemMatch", matchM)) } else { - return nil, fmt.Errorf("Filter Op %s is not defined", f.Op) + return nil, fmt.Errorf("filter op %s is not defined", f.Op) } return fm, nil } @@ -107,18 +125,18 @@ func (q *Query) Cursor(m M) df.ICursor { tablename := q.Config(df.ConfigKeyTableName, "").(string) coll := conn.db.Collection(tablename) - parts := q.Config(df.ConfigKeyGroupedQueryItems, df.GroupedQueryItems{}).(df.GroupedQueryItems) + parts := q.Config(df.ConfigKeyGroupedQueryItems, df.QueryItems{}).(df.QueryItems) where := q.Config(df.ConfigKeyWhere, M{}).(M) hasWhere := where != nil aggrs, hasAggr := parts[df.QueryAggr] groupby, hasGroup := parts[df.QueryGroup] - //commandParts, hasCommand := parts[df.QueryCommand] commandParts, hasCommand := parts[df.QueryCommand] + // run simple aggregation if hasAggr { pipes := []M{} - items := aggrs[0].Value.([]*df.AggrItem) + items := aggrs.Value.([]*df.AggrItem) aggrExpression := M{} for _, item := range items { if item.Op == df.AggrCount { @@ -132,12 +150,10 @@ func (q *Query) Cursor(m M) df.ICursor { } else { groups := func() M { s := M{} - for _, v := range groupby { - gs := v.Value.([]string) - for _, g := range gs { - if strings.TrimSpace(g) != "" { - s.Set(strings.Replace(g, ".", "_", -1), "$"+g) - } + gs := groupby.Value.([]string) + for _, g := range gs { + if strings.TrimSpace(g) != "" { + s.Set(strings.Replace(g, ".", "_", -1), "$"+g) } } return s @@ -146,76 +162,139 @@ func (q *Query) Cursor(m M) df.ICursor { } if hasWhere { + //fmt.Println("filters:", codekit.JsonString(where)) pipes = append(pipes, M{}.Set("$match", where)) } pipes = append(pipes, M{}.Set("$group", aggrExpression)) - cur, err := coll.Aggregate(conn.ctx, pipes, new(options.AggregateOptions).SetAllowDiskUse(true)) + var cur *mongo.Cursor + err := wrapTx(conn, func(ctx mongo.SessionContext) error { + var err error + cur, err = coll.Aggregate(ctx, pipes, new(options.AggregateOptions).SetAllowDiskUse(true)) + return err + }) if err != nil { cursor.SetError(err) } else { cursor.cursor = cur cursor.conn = conn - cursor.countParm = toolkit.M{}. - Set("count", tablename). - Set("query", where) + if len(where) == 0 { + cursor.countParm = codekit.M{}. + Set("count", tablename) + } else { + cursor.countParm = codekit.M{}. + Set("count", tablename). + Set("query", where) + } } + + // run a specific command } else if hasCommand { - mCmd := commandParts[0].Value.(toolkit.M) - cmdObj, _ := mCmd["command"] - switch cmdObj.(type) { - case toolkit.M: - //cmdParm := cmdObj.(toolkit.M).Get("commandParm") - curCommand, err := conn.db.RunCommandCursor(conn.ctx, cmdObj) + cmdName := commandParts.Op + cmdValue := commandParts.Value + switch cmdName { + case "aggregate", "aggr", "pipe": + pipes := []codekit.M{} + if hasWhere && len(where) > 0 { + pipes = append(pipes, M{}.Set("$match", where)) + } + + pipeM := cmdValue + var ( + pipeMs []codekit.M + //ok bool + cur *mongo.Cursor + err error + ) + + serde.Serde(pipeM, &pipeMs) + if len(pipeMs) > 0 { + if _, has := pipeMs[0]["$text"]; has { + pipes = pipeMs + } else { + pipes = append(pipes, pipeMs...) + } + } + + if cur, err = coll.Aggregate(conn.ctx, pipes, new(options.AggregateOptions).SetAllowDiskUse(true)); err != nil { + cursor.SetError(err) + return cursor + } + + cursor.cursor = cur + cursor.conn = conn + cursor.countParm = nil + return cursor + + case "command": + var curCommand *mongo.Cursor + err := wrapTx(conn, func(ctx mongo.SessionContext) error { + var err error + curCommand, err = conn.db.RunCommandCursor(ctx, cmdValue) + return err + }) if err != nil { cursor.SetError(err) } else { cursor.cursor = curCommand + cursor.conn = conn + cursor.countParm = nil } return cursor default: - cursor.SetError(toolkit.Errorf("invalid command %v", cmdObj)) + cursor.SetError(fmt.Errorf("invalid command %v", cmdName)) return cursor } - /* - case "gfsfind": - b, err := gridfs.NewBucket(conn.db) - qry := q.db.GridFS(tablename).Find(where) - cursor.mgocursor = qry - cursor.mgoiter = qry.Iter() - case "pipe": - pipe, ok := m["pipe"] - if !ok { - cursor.SetError(toolkit.Errorf("invalid command, calling pipe without pipe data")) - return cursor - } - cursor.mgoiter = coll.Pipe(pipe).AllowDiskUse().Iter() - */ - cursor.SetError(fmt.Errorf("pipe and command is not yet applied")) - return cursor + // basic find } else { opt := options.Find() if items, ok := parts[df.QuerySelect]; ok { - fields := items[0].Value.([]string) - if len(fields) > 0 { - opt.SetProjection(fields) + if fields, ok := items.Value.([]string); ok { + if len(fields) > 0 { + projection := codekit.M{} + for _, field := range fields { + if strings.Contains(field, ":") { + alias := strings.Split(field, ":") + if len(alias) <= 1 { + cursor.SetError(errors.New("error on translating projection field " + field)) + return cursor + } + projection.Set(alias[0], alias[1]) + } else { + projection.Set(field, 1) + } + } + opt.SetProjection(projection) + } } } if items, ok := parts[df.QueryOrder]; ok { - sortKeys := items[0].Value.([]string) - opt.SetSort(sortKeys) + sortKeys := items.Value.([]string) + sortDoc := bson.D{} + for _, key := range sortKeys { + if key[0] == '-' { + sortDoc = append(sortDoc, bson.E{key[1:], -1}) + } else { + sortDoc = append(sortDoc, bson.E{key, 1}) + } + } + + if len(sortDoc) > 0 { + opt.SetSort(sortDoc) + } } if items, ok := parts[df.QuerySkip]; ok { - skip := items[0].Value.(int64) + skip := int64(items.Value.(int)) opt.SetSkip(skip) } if items, ok := parts[df.QueryTake]; ok { - take := items[0].Value.(int64) - opt.SetLimit(take) + if take, ok := items.Value.(int); ok { + opt.SetLimit(int64(take)) + } } var ( @@ -223,8 +302,11 @@ func (q *Query) Cursor(m M) df.ICursor { err error ) - qry, err = coll.Find(conn.ctx, where, opt) - //toolkit.Logger().Debugf("querying data. where:%v error:%v", where, err) + err = wrapTx(conn, func(ctx mongo.SessionContext) error { + var err error + qry, err = coll.Find(ctx, where, opt) + return err + }) if err != nil { cursor.SetError(err) @@ -232,7 +314,11 @@ func (q *Query) Cursor(m M) df.ICursor { } cursor.cursor = qry - cursor.countParm = toolkit.M{}.Set("count", tablename).Set("query", where) + if len(where) == 0 { + cursor.countParm = codekit.M{}.Set("count", tablename) + } else { + cursor.countParm = codekit.M{}.Set("count", tablename).Set("query", where) + } cursor.conn = conn } return cursor @@ -244,27 +330,38 @@ func (q *Query) Execute(m M) (interface{}, error) { coll := conn.db.Collection(tablename) data := m.Get("data") - parts := q.Config(df.ConfigKeyGroupedQueryItems, df.GroupedQueryItems{}).(df.GroupedQueryItems) + parts := q.Config(df.ConfigKeyGroupedQueryItems, df.QueryItems{}).(df.QueryItems) where := q.Config(df.ConfigKeyWhere, M{}).(M) hasWhere := where != nil ct := q.Config(df.ConfigKeyCommandType, "N/A") switch ct { case df.QueryInsert: - return coll.InsertOne(conn.ctx, data) + var res *mongo.InsertOneResult + dataM, _ := codekit.ToMTag(data, conn.FieldNameTag()) + err := wrapTx(conn, func(ctx mongo.SessionContext) error { + var err error + res, err = coll.InsertOne(ctx, dataM) + return err + }) + if err != nil { + return nil, err + } + dataM.Set("_id", res.InsertedID) + return res.InsertedID, nil case df.QueryUpdate: var err error if hasWhere { - //singleupdate := m.Get("singleupdate", true).(bool) - singleupdate := false + singleupdate := m.Get("singleupdate", false).(bool) + //singleupdate := false if !singleupdate { //-- get the field for update updateqi, _ := parts[df.QueryUpdate] - updatevals := updateqi[0].Value.([]string) + updatevals := updateqi.Value.([]string) - var dataM toolkit.M - dataM, err = ToM(data) + var dataM codekit.M + dataM, err = codekit.ToMTag(data, conn.FieldNameTag()) dataS := M{} if err != nil { return nil, err @@ -283,253 +380,71 @@ func (q *Query) Execute(m M) (interface{}, error) { dataS[k] = v } } - //updatedData := toolkit.M{}.Set("$set", dataS) - - _, err = coll.UpdateMany(conn.ctx, where, dataS, - new(options.UpdateOptions).SetUpsert(true)) + //updatedData := codekit.M{}.Set("$set", dataS) + + err = wrapTx(conn, func(ctx mongo.SessionContext) error { + _, err := coll.UpdateMany(ctx, where, + codekit.M{}.Set("$set", dataS), + new(options.UpdateOptions).SetUpsert(false)) + return err + }) } else { - _, err = coll.UpdateOne(conn.ctx, where, data, - new(options.UpdateOptions).SetUpsert(true)) + err = wrapTx(conn, func(ctx mongo.SessionContext) error { + _, err := coll.UpdateOne(ctx, where, + codekit.M{}.Set("$set", data), + new(options.UpdateOptions).SetUpsert(false)) + return err + }) } return nil, err } else { - return nil, toolkit.Errorf("update need to have where clause") + return nil, fmt.Errorf("update need to have where clause") } case df.QueryDelete: if hasWhere { - _, err := coll.DeleteMany(conn.ctx, where) + err := wrapTx(conn, func(ctx mongo.SessionContext) error { + _, err := coll.DeleteMany(ctx, where) + return err + }) return nil, err } else { - return nil, toolkit.Errorf("delete need to have where clause. For delete all data in a collection, please use DropTable instead of Delete") + return nil, fmt.Errorf("delete need to have where clause. To delete all data in a collection, please use DropTable instead of Delete") } case df.QuerySave: whereSave := M{} - datam, err := toolkit.ToM(data) + datam, err := codekit.ToMTag(data, conn.FieldNameTag()) if err != nil { - return nil, toolkit.Errorf("unable to deserialize data: %s", err.Error()) + return nil, fmt.Errorf("unable to deserialize data: %s", err.Error()) } if datam.Has("_id") { whereSave = M{}.Set("_id", datam.Get("_id")) } else { - return nil, toolkit.Error("_id field is required") + return nil, errors.New("_id field is required") } - _, err = coll.UpdateMany(conn.ctx, whereSave, - toolkit.M{}.Set("$set", datam), - new(options.UpdateOptions).SetUpsert(true)) + err = wrapTx(conn, func(ctx mongo.SessionContext) error { + _, err := coll.ReplaceOne(ctx, whereSave, datam, + new(options.ReplaceOptions).SetUpsert(true)) + return err + }) return nil, err case df.QueryCommand: - commands, ok := parts[df.QueryCommand] - if !ok { - return nil, toolkit.Errorf("No command") - } - - mCommand := commands[0].Value.(toolkit.M) - cmd, _ := mCommand["command"] - - switch cmd.(type) { - case string: - commandTxt := cmd.(string) - if commandTxt == "" { - return nil, toolkit.Errorf("No command") - } - - var ( - bucket *gridfs.Bucket - gfsBuffSize int32 - err error - ) - if strings.ToLower(commandTxt)[:3] == "gfs" { - gfsBuffSize = int32(m.Get("size", 1024).(int)) - bucketOpt := new(options.BucketOptions) - bucketOpt.SetChunkSizeBytes(gfsBuffSize) - bucketOpt.SetName(tablename) - bucket, err = gridfs.NewBucket(conn.db, bucketOpt) - if err != nil { - return nil, toolkit.Errorf("error prepare GridFS bucket. %s", err.Error()) - } - } - - switch strings.ToLower(commandTxt) { - case "gfswrite": - var reader io.Reader - gfsId, hasId := m["id"] - gfsMetadata, hasMetadata := m["metadata"] - gfsFileName := m.GetString("name") - reader = m.Get("source", nil).(io.Reader) - if reader == nil { - return nil, toolkit.Errorf("invalid reader") - } - - //-- check if file exist, delete if already exist - if hasId { - bucket.Delete(gfsId) - } - - if !hasMetadata { - gfsMetadata = toolkit.M{} - } - uploadOpt := new(options.UploadOptions) - uploadOpt.SetMetadata(gfsMetadata) - if gfsFileName == "" && hasId { - gfsFileName = gfsId.(string) - } - if gfsFileName == "" { - gfsFileName = toolkit.RandomString(32) - } - - var objId primitive.ObjectID - if hasId { - err = bucket.UploadFromStreamWithID(gfsId, gfsFileName, reader, uploadOpt) - } else { - objId, err = bucket.UploadFromStream(gfsFileName, reader, uploadOpt) - } - if err != nil { - return nil, toolkit.Errorf("error upload file to GridFS. %s", err.Error()) - } - return objId, nil - - case "gfsread": - gfsId, hasId := m["id"] - gfsFileName := m.GetString("name") - if gfsFileName == "" && hasId { - gfsFileName = gfsId.(string) - } - dest := m.Get("output", &bufio.Writer{}).(io.Writer) - var err error - - var ds *gridfs.DownloadStream - if hasId { - ds, err = bucket.OpenDownloadStream(gfsId) - } else { - ds, err = bucket.OpenDownloadStreamByName(gfsFileName) - } - defer ds.Close() - - if err != nil { - return nil, toolkit.Errorf("unable to open GFS %s-%s. %s", tablename, gfsFileName, err.Error()) - } - defer ds.Close() - - io.Copy(dest, ds) - return nil, nil - - case "gfsremove", "gfsdelete": - gfsId, hasId := m["id"] - - var err error - if hasId && gfsId != "" { - err = bucket.Delete(gfsId) - } - return nil, err - - case "gfstruncate": - err := bucket.Drop() - return nil, err - - case "watch": - watchFn := m.Get("fn", nil) - if watchFn == nil { - return nil, fmt.Errorf("watch need a func(toolkit.M)") - } - - opt := options.ChangeStream() - opt.SetBatchSize(1024) - opt.SetMaxAwaitTime(24 * time.Hour) - - toolkit.Logger().Debugf("prepare to wacth %s", tablename) - cs, err := coll.Watch(conn.ctx, []toolkit.M{}, opt) - if err != nil { - toolkit.Logger().Debugf("watch %s has error", tablename, err.Error()) - return nil, err - } - toolkit.Logger().Debugf("watch %s is currently running", tablename) - - go func() { - defer cs.Close(conn.ctx) - for cs.Next(conn.ctx) { - data := toolkit.M{} - if err = cs.Decode(&data); err != nil { - continue - } - - watchFn.(func(toolkit.M))(data) - } - }() - default: - return nil, toolkit.Errorf("Invalid command: %v", commandTxt) - } - - case toolkit.M: - cmdM := cmd.(toolkit.M) - sr := conn.db.RunCommand(conn.ctx, cmdM) - if sr.Err() != nil { - return nil, toolkit.Errorf("unablet to run command. %s. Command: %s", - sr.Err().Error(), toolkit.JsonString(cmdM)) - } - return sr, nil - - default: - return nil, toolkit.Errorf("Unknown command %v", cmd) - } - + return q.handleExecuteCommand(conn) } return nil, nil } -/* -func (q *Query) SetThis(q dbflex.IQuery) { - panic("not implemented") -} - -func (q *Query) This() dbflex.IQuery { - panic("not implemented") -} - -func (q *Query) BuildFilter(*dbflex.Filter) (interface{}, error) { - panic("not implemented") -} - -func (q *Query) BuildCommand() (interface{}, error) { - panic("not implemented") -} - -func (q *Query) Cursor(toolkit.M) dbflex.ICursor { - panic("not implemented") -} - -func (q *Query) Execute(toolkit.M) (interface{}, error) { - panic("not implemented") -} - -func (q *Query) SetConfig(string, interface{}) { - panic("not implemented") -} - -func (q *Query) SetConfigM(toolkit.M) { - panic("not implemented") -} - -func (q *Query) Config(string, interface{}) interface{} { - panic("not implemented") -} - -func (q *Query) ConfigRef(string, interface{}, interface{}) { - panic("not implemented") -} - -func (q *Query) DeleteConfig(...string) { - panic("not implemented") -} - -func (q *Query) Connection() dbflex.IConnection { - panic("not implemented") -} - -func (q *Query) SetConnection(dbflex.IConnection) { - panic("not implemented") +func wrapTx(conn *Connection, fn func(ctx mongo.SessionContext) error) error { + var err error + if conn.sess != nil { + err = mongo.WithSession(conn.ctx, conn.sess, func(sc mongo.SessionContext) error { + return fn(sc) + }) + } else { + err = fn(mongo.NewSessionContext(context.Background(), nil)) + } + return err } -*/