From a7bf01bc435fb72ed0a8f0630dc6cc83638b820e Mon Sep 17 00:00:00 2001 From: Keith Schaffer Date: Wed, 27 Feb 2019 09:36:07 -0500 Subject: [PATCH 1/8] return partial response from zk multi request --- zketcd.go | 52 ++++++++++++++++++++++------------------------------ 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/zketcd.go b/zketcd.go index d396ac9..e4d1b20 100644 --- a/zketcd.go +++ b/zketcd.go @@ -470,15 +470,23 @@ func (z *zkEtcd) Sync(xid Xid, op *SyncRequest) ZKResponse { func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { bs := make([]opBundle, len(mreq.Ops)) + mresp := &MultiResponse{ + Ops: make([]MultiResponseOp, len(mreq.Ops)), + DoneHeader: MultiHeader{Type: opMulti}, + } for i, op := range mreq.Ops { switch req := op.Op.(type) { case *CreateRequest: bs[i] = z.mkCreateTxnOp(req) + mresp.Ops[i].Header.Type = opCreate case *DeleteRequest: bs[i] = z.mkDeleteTxnOp(req) + mresp.Ops[i].Header.Type = opDelete case *SetDataRequest: bs[i] = z.mkSetDataTxnOp(req) + mresp.Ops[i].Header.Type = opSetData case *CheckVersionRequest: + mresp.Ops[i].Header.Type = opCheck bs[i] = z.mkCheckVersionPathTxnOp(req) default: panic(fmt.Sprintf("unknown multi %+v %T", op.Op, op.Op)) @@ -491,8 +499,15 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { } apply := func(s v3sync.STM) error { - for _, b := range bs { + for i, b := range bs { if err := b.apply(s); err != nil { + var ok bool + if mresp.Ops[i].Header.Err, ok = errorToErrCode[err]; !ok { + mresp.Ops[i].Header.Err = errAPIError + } + if mresp.DoneHeader.Err == 0 { + mresp.DoneHeader.Err = mresp.Ops[i].Header.Err + } return err } } @@ -500,52 +515,29 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { } reply := func(xid Xid, zxid ZXid) ZKResponse { - ops := make([]MultiResponseOp, len(bs)) for i, b := range bs { resp := b.reply(xid, zxid) - ops[i].Header = MultiHeader{Err: 0} switch r := resp.Resp.(type) { case *CreateResponse: - ops[i].Header.Type = opCreate - ops[i].String = r.Path + mresp.Ops[i].String = r.Path case *SetDataResponse: - ops[i].Header.Type = opSetData - ops[i].Stat = &r.Stat - case *DeleteResponse: - ops[i].Header.Type = opDelete - case *struct{}: - ops[i].Header.Type = opCheck - default: - panic(fmt.Sprintf("unknown multi %+v %T", resp, resp)) + mresp.Ops[i].Stat = &r.Stat } } - mresp := &MultiResponse{ - Ops: ops, - DoneHeader: MultiHeader{Type: opMulti}, - } return mkZKResp(xid, zxid, mresp) } - resp, err := z.doSTM(apply, prefetch...) + resp, _ := z.doSTM(apply, prefetch...) if resp == nil { - // txn aborted, possibly due to any API error - if _, ok := errorToErrCode[err]; !ok { - // aborted due to non-API error - return mkErr(err) - } zxid, zerr := z.incrementAndGetZxid() if zerr != nil { return mkErr(zerr) } - // zkdocker seems to always return API error... - zkresp := apiErrToZKErr(xid, zxid, err) - zkresp.Hdr.Err = errAPIError - return zkresp + return reply(xid, zxid) } - mresp := reply(xid, ZXid(resp.Header.Revision)) - glog.V(7).Infof("Multi(%v) = (zxid=%v); txnresp: %+v", *mreq, resp.Header.Revision, *resp) - return mresp + fmt.Printf("Multi(%v) = (zxid=%v); txnresp: %+v\n", *mreq, resp.Header.Revision, *resp) + return reply(xid, ZXid(resp.Header.Revision)) } func (z *zkEtcd) mkCheckVersionPathTxnOp(op *CheckVersionRequest) opBundle { From f3a11767c17e5995e9daa925d5ca82fc6d311844 Mon Sep 17 00:00:00 2001 From: Keith Schaffer Date: Wed, 27 Feb 2019 10:08:21 -0500 Subject: [PATCH 2/8] removed debug println --- zketcd.go | 1 - 1 file changed, 1 deletion(-) diff --git a/zketcd.go b/zketcd.go index e4d1b20..79a753e 100644 --- a/zketcd.go +++ b/zketcd.go @@ -536,7 +536,6 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { return reply(xid, zxid) } - fmt.Printf("Multi(%v) = (zxid=%v); txnresp: %+v\n", *mreq, resp.Header.Revision, *resp) return reply(xid, ZXid(resp.Header.Revision)) } From 63a1c2ec56aedd8ed97740cd2569a95265814dfd Mon Sep 17 00:00:00 2001 From: Keith Schaffer Date: Wed, 27 Feb 2019 12:41:15 -0500 Subject: [PATCH 3/8] zketcd: fixed bug causing hidden multi-op error zkResponse was hiding multi-op error in partial response, added new response with error function. Also, fixed test to expect correct errors instead of api errors Fixes #98 --- integration/integration_test.go | 12 ++++++------ zketcd.go | 6 +++++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/integration/integration_test.go b/integration/integration_test.go index 0e2f686..3fbfdef 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -626,8 +626,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) { ops = []interface{}{ &zk.CreateRequest{Path: "/foo", Data: []byte("foo"), Acl: acl}, } - if _, err := c.Multi(ops...); err == nil || err.Error() != zetcd.ErrAPIError.Error() { - t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err) + if _, err := c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNodeExists.Error() { + t.Fatalf("expected %v, got %v", zetcd.ErrNodeExists, err) } // test create+delete on same key == no key ops = []interface{}{ @@ -653,8 +653,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) { &zk.CheckVersionRequest{Path: "/foo", Version: 2}, } _, err = c.Multi(ops...) - if err == nil || err.Error() != zetcd.ErrAPIError.Error() { - t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err) + if err == nil || err.Error() != zetcd.ErrBadVersion.Error() { + t.Fatalf("expected %v, got %v", zetcd.ErrBadVersion, err) } if _, s1, err = c.Get("/test1"); err == nil || err.Error() != zetcd.ErrNoNode.Error() { t.Fatalf("expected %v, got (%v,%v)", zetcd.ErrNoNode, s1, err) @@ -681,8 +681,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) { ops = []interface{}{ &zk.CheckVersionRequest{Path: "/missing-key", Version: 0}, } - if _, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrAPIError.Error() { - t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err) + if _, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNoNode.Error() { + t.Fatalf("expected %v, got %v", zetcd.ErrNoNode, err) } // test empty operation list if resp, err = c.Multi(); err != nil || len(resp) != 0 { diff --git a/zketcd.go b/zketcd.go index 79a753e..4a98ed2 100644 --- a/zketcd.go +++ b/zketcd.go @@ -524,7 +524,7 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { mresp.Ops[i].Stat = &r.Stat } } - return mkZKResp(xid, zxid, mresp) + return mkZKPartialResp(xid, zxid, mresp, mresp.DoneHeader.Err) } resp, _ := z.doSTM(apply, prefetch...) @@ -728,6 +728,10 @@ func mkZKResp(xid Xid, zxid ZXid, resp interface{}) ZKResponse { return ZKResponse{Hdr: &ResponseHeader{xid, zxid - 1, 0}, Resp: resp} } +func mkZKPartialResp(xid Xid, zxid ZXid, resp interface{}, err ErrCode) ZKResponse { + return ZKResponse{Hdr: &ResponseHeader{xid, zxid - 1, err}, Resp: resp} +} + // wrapErr is to pass back error info but still get the txn response func wrapErr(err *error, f func(s v3sync.STM) error) func(s v3sync.STM) error { return func(s v3sync.STM) error { From 1d32e195a71822d4bf05df19d6be4720ef97f3b2 Mon Sep 17 00:00:00 2001 From: Keith Schaffer Date: Thu, 28 Feb 2019 11:31:10 -0500 Subject: [PATCH 4/8] zetcd: fix encoding of multiop errors The current multiop implementation was not properly encoding a single error in a multiop request. If op 1 succeeds, but op 2 fails, then zookeeper should return two responses, one success and one failure of type -1. Fixes #98 --- constants.go | 1 + encode.go | 2 ++ integration/integration_test.go | 17 +++++++++++++++++ server.go | 2 +- zketcd.go | 11 +++-------- 5 files changed, 24 insertions(+), 9 deletions(-) diff --git a/constants.go b/constants.go index b6c6136..75e4180 100644 --- a/constants.go +++ b/constants.go @@ -23,6 +23,7 @@ const ( ) const ( + opError = -1 opCreate = 1 opDelete = 2 opExists = 3 diff --git a/encode.go b/encode.go index 348c5aa..26ca827 100644 --- a/encode.go +++ b/encode.go @@ -329,6 +329,8 @@ func (r *MultiResponse) Encode(buf []byte) (int, error) { n, err = encodePacketValue(buf[total:], reflect.ValueOf(op.String)) case opSetData: n, err = encodePacketValue(buf[total:], reflect.ValueOf(op.Stat)) + case opError: + n, err = encodePacketValue(buf[total:], reflect.ValueOf(&op.Header.Err)) } total += n if err != nil { diff --git a/integration/integration_test.go b/integration/integration_test.go index 3fbfdef..beffbb6 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -705,6 +705,23 @@ func testMultiOp(t *testing.T, c *zk.Conn) { if s1.Mzxid != s2.Mzxid { t.Fatalf("expected zxids in %+v to match %+v", *s1, *s2) } + // test partial success + ops = []interface{}{ + &zk.CheckVersionRequest{Path: "/test2", Version: 0}, + &zk.CreateRequest{Path: "/foo", Data: []byte("foo"), Acl: acl}, + } + if resp, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNodeExists.Error() { + t.Fatalf("expected %v, got %v", zetcd.ErrNodeExists, err) + } + if len(resp) != 2 { + t.Fatalf("expected %d results, got %d", 2, len(resp)) + } + if resp[0].Error != nil { + t.Fatalf("expected checkop error to be nil, got %v", resp[0].Error) + } + if resp[1].Error == nil || resp[1].Error.Error() != zetcd.ErrNodeExists.Error() { + t.Fatalf("expected createop error to be %v, got %v", zetcd.ErrNodeExists.Error(), resp[1].Error) + } } func runTest(t *testing.T, f func(*testing.T, *zk.Conn)) { diff --git a/server.go b/server.go index e079534..baba6af 100644 --- a/server.go +++ b/server.go @@ -76,7 +76,7 @@ func serveRequest(s Session, zke ZK, zkreq ZKRequest) error { } zkresp := DispatchZK(zke, zkreq.xid, zkreq.req) if zkresp.Err != nil { - glog.V(9).Infof("dispatch error", zkresp.Err) + glog.V(9).Infof("dispatch error %v", zkresp.Err) return zkresp.Err } if zkresp.Hdr.Err == 0 { diff --git a/zketcd.go b/zketcd.go index 4a98ed2..bc0c8c4 100644 --- a/zketcd.go +++ b/zketcd.go @@ -502,12 +502,10 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { for i, b := range bs { if err := b.apply(s); err != nil { var ok bool + mresp.Ops[i].Header.Type = opError if mresp.Ops[i].Header.Err, ok = errorToErrCode[err]; !ok { mresp.Ops[i].Header.Err = errAPIError } - if mresp.DoneHeader.Err == 0 { - mresp.DoneHeader.Err = mresp.Ops[i].Header.Err - } return err } } @@ -524,7 +522,7 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { mresp.Ops[i].Stat = &r.Stat } } - return mkZKPartialResp(xid, zxid, mresp, mresp.DoneHeader.Err) + return mkZKResp(xid, zxid, mresp) } resp, _ := z.doSTM(apply, prefetch...) @@ -536,6 +534,7 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { return reply(xid, zxid) } + glog.V(7).Infof("Multi(%v) = (zxid=%v); txnresp: %+v\n", *mreq, resp.Header.Revision, *resp) return reply(xid, ZXid(resp.Header.Revision)) } @@ -728,10 +727,6 @@ func mkZKResp(xid Xid, zxid ZXid, resp interface{}) ZKResponse { return ZKResponse{Hdr: &ResponseHeader{xid, zxid - 1, 0}, Resp: resp} } -func mkZKPartialResp(xid Xid, zxid ZXid, resp interface{}, err ErrCode) ZKResponse { - return ZKResponse{Hdr: &ResponseHeader{xid, zxid - 1, err}, Resp: resp} -} - // wrapErr is to pass back error info but still get the txn response func wrapErr(err *error, f func(s v3sync.STM) error) func(s v3sync.STM) error { return func(s v3sync.STM) error { From a46236afbb2b84b81933db961c8fab6ab623966e Mon Sep 17 00:00:00 2001 From: Keith Schaffer Date: Thu, 28 Feb 2019 16:26:06 -0500 Subject: [PATCH 5/8] zetcd: code consistency update moved mkCheckVersionPathTxnOp up 1 line to be consistent with other cases Fixes #98 --- zketcd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zketcd.go b/zketcd.go index bc0c8c4..fef0c66 100644 --- a/zketcd.go +++ b/zketcd.go @@ -486,8 +486,8 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { bs[i] = z.mkSetDataTxnOp(req) mresp.Ops[i].Header.Type = opSetData case *CheckVersionRequest: - mresp.Ops[i].Header.Type = opCheck bs[i] = z.mkCheckVersionPathTxnOp(req) + mresp.Ops[i].Header.Type = opCheck default: panic(fmt.Sprintf("unknown multi %+v %T", op.Op, op.Op)) } From 450b10d0fd6e527c8354e64f3c9905a11e5fa3d3 Mon Sep 17 00:00:00 2001 From: Keith Schaffer Date: Fri, 1 Mar 2019 12:46:17 -0500 Subject: [PATCH 6/8] testing: repair docker cross check tests The xchk with docker integration tests are broken. The docker files were attempting to download old versions of Kafka and drill that no longer existed on the servers. In addition the RUOK test was causing all tests to hang in the docker mode by causing a NPE Zookeeper and never completely starting up. --- integration/docker_test.go | 2 +- integration/drill/Dockerfile | 12 +++++++----- integration/integration_test.go | 21 -------------------- integration/integration_zetcd_test.go | 28 +++++++++++++++++++++++++++ integration/kafka/Dockerfile | 2 +- 5 files changed, 37 insertions(+), 28 deletions(-) create mode 100644 integration/integration_zetcd_test.go diff --git a/integration/docker_test.go b/integration/docker_test.go index 52be0c6..2185e39 100644 --- a/integration/docker_test.go +++ b/integration/docker_test.go @@ -53,7 +53,7 @@ func NewContainer(containerName, dockerFile string, ports []string) (*Container, } func newContainerFiles(cfg ContainerConfig) (c *Container, err error) { - dc, err := docker.NewClient("unix://var/run/docker.sock") + dc, err := docker.NewClient("unix:///var/run/docker.sock") if err != nil { return nil, err } diff --git a/integration/drill/Dockerfile b/integration/drill/Dockerfile index 4611246..db25fcb 100644 --- a/integration/drill/Dockerfile +++ b/integration/drill/Dockerfile @@ -1,12 +1,14 @@ FROM java:openjdk-8-jdk # needs jdk to submit sql queries!? +ENV DRILL_VERSION 1.15.0 + RUN mkdir -p /opt/drill && \ - wget -q -O - http://www-us.apache.org/dist/drill/drill-1.10.0/apache-drill-1.10.0.tar.gz | tar -zxvf - -C /opt/drill + wget -q -O - http://www-us.apache.org/dist/drill/drill-${DRILL_VERSION}/apache-drill-${DRILL_VERSION}.tar.gz | tar -zxvf - -C /opt/drill EXPOSE 8047 -ENV DRILL_HOME /opt/drill/apache-drill-1.10.0 +ENV DRILL_HOME /opt/drill/apache-drill-${DRILL_VERSION} ENV DRILL_LOG_DIR ${DRILL_HOME}/log/ ENV DRILL_LOG_PREFIX ${DRILL_LOG_PATH}/drill @@ -23,8 +25,8 @@ ENTRYPOINT ["java",\ "-Ddrill.exec.enable-epoll=false",\ "-XX:+CMSClassUnloadingEnabled",\ "-XX:+UseG1GC",\ - "-Dlog.path=/opt/drill/apache-drill-1.10.0/log/drillbit.log",\ - "-Dlog.query.path=/opt/drill/apache-drill-1.10.0/log/drillbit_queries.json",\ + "-Dlog.path=/opt/drill/apache-drill-${DRILL_VERSION}/log/drillbit.log",\ + "-Dlog.query.path=/opt/drill/apache-drill-${DRILL_VERSION}/log/drillbit_queries.json",\ "-cp",\ - "/opt/drill/apache-drill-1.10.0/conf:/opt/drill/apache-drill-1.10.0/jars/*:/opt/drill/apache-drill-1.10.0/jars/ext/*:/opt/drill/apache-drill-1.10.0/jars/3rdparty/*:/opt/drill/apache-drill-1.10.0/jars/classb/*",\ + "/opt/drill/apache-drill-${DRILL_VERSION}/conf:/opt/drill/apache-drill-${DRILL_VERSION}/jars/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/ext/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/3rdparty/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/classb/*",\ "org.apache.drill.exec.server.Drillbit"] diff --git a/integration/integration_test.go b/integration/integration_test.go index beffbb6..c2f11da 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -15,7 +15,6 @@ package integration import ( - "net" "testing" "time" @@ -557,26 +556,6 @@ func TestCreateInvalidPath(t *testing.T) { }) } -func TestRUOK(t *testing.T) { - zkclus := newZKCluster(t) - defer zkclus.Close(t) - - conn, err := net.Dial("tcp", zkclus.Addr()) - if err != nil { - t.Fatal(err) - } - if _, err := conn.Write([]byte("ruok")); err != nil { - t.Fatal(err) - } - buf := make([]byte, 4) - if _, err := conn.Read(buf); err != nil { - t.Fatal(err) - } - if string(buf) != "imok" { - t.Fatalf(`expected "imok", got %q`, string(buf)) - } -} - func TestMultiOp(t *testing.T) { runTest(t, testMultiOp) } func testMultiOp(t *testing.T, c *zk.Conn) { diff --git a/integration/integration_zetcd_test.go b/integration/integration_zetcd_test.go new file mode 100644 index 0000000..2c3b579 --- /dev/null +++ b/integration/integration_zetcd_test.go @@ -0,0 +1,28 @@ +// +build !zkdocker,!xchk + +package integration + +import ( + "net" + "testing" +) + +func TestRUOK(t *testing.T) { + zkclus := newZKCluster(t) + defer zkclus.Close(t) + + conn, err := net.Dial("tcp", zkclus.Addr()) + if err != nil { + t.Fatal(err) + } + if _, err := conn.Write([]byte("ruok")); err != nil { + t.Fatal(err) + } + buf := make([]byte, 4) + if _, err := conn.Read(buf); err != nil { + t.Fatal(err) + } + if string(buf) != "imok" { + t.Fatalf(`expected "imok", got %q`, string(buf)) + } +} diff --git a/integration/kafka/Dockerfile b/integration/kafka/Dockerfile index c8fce83..285a886 100644 --- a/integration/kafka/Dockerfile +++ b/integration/kafka/Dockerfile @@ -8,7 +8,7 @@ RUN apt-get install -y wget supervisor dnsutils RUN rm -rf /var/lib/apt/lists/*; apt-get clean ENV SCALA_VERSION 2.11 -ENV KAFKA_VERSION 0.11.0.0 +ENV KAFKA_VERSION 0.11.0.3 RUN wget -q http://www-us.apache.org/dist/kafka/"$KAFKA_VERSION"/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -O /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz RUN tar xfz /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -C /opt && rm /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && mv /opt/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION" /kafka # 9092 is kafka port From 289bdbcaacf0245f09f42f4772faaffec4fe70c7 Mon Sep 17 00:00:00 2001 From: Keith Schaffer Date: Fri, 1 Mar 2019 15:29:14 -0500 Subject: [PATCH 7/8] integration: implemented multi integration test Existing Multi() integration test just had panic(wut). Implemented the integration test and fixed one issue identified by it. The multi-op case was returning a different xid than zookeeper when an empty list was passed in to a multi-op --- encode.go | 2 ++ xchk/zk.go | 8 +++++++- zketcd.go | 7 +++++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/encode.go b/encode.go index 26ca827..6324b31 100644 --- a/encode.go +++ b/encode.go @@ -373,6 +373,8 @@ func (r *MultiResponse) Decode(buf []byte) (int, error) { res.Stat = new(Stat) w = reflect.ValueOf(res.Stat) case opCheck, opDelete: + case opError: + w = reflect.ValueOf(&res.Header.Err) } if w.IsValid() { n, err := decodePacketValue(buf[total:], w) diff --git a/xchk/zk.go b/xchk/zk.go index a9b7bce..4011c6f 100644 --- a/xchk/zk.go +++ b/xchk/zk.go @@ -245,7 +245,13 @@ func (xchk *zkXchk) GetChildren2(xid zetcd.Xid, op *zetcd.GetChildren2Request) z return or } -func (xchk *zkXchk) Multi(xid zetcd.Xid, op *zetcd.MultiRequest) zetcd.ZKResponse { panic("wut") } +func (xchk *zkXchk) Multi(xid zetcd.Xid, op *zetcd.MultiRequest) zetcd.ZKResponse { + cf := func() zetcd.ZKResponse { return xchk.cZK.Multi(xid, op) } + of := func() zetcd.ZKResponse { return xchk.oZK.Multi(xid, op) } + cr, or, err := xchk.xchkResp(cf, of) + defer func() { xchk.reportErr(cr, or, err) }() + return or +} func (xchk *zkXchk) Close(xid zetcd.Xid, op *zetcd.CloseRequest) zetcd.ZKResponse { cf := func() zetcd.ZKResponse { return xchk.cZK.Close(xid, op) } diff --git a/zketcd.go b/zketcd.go index fef0c66..9f979c0 100644 --- a/zketcd.go +++ b/zketcd.go @@ -474,6 +474,13 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { Ops: make([]MultiResponseOp, len(mreq.Ops)), DoneHeader: MultiHeader{Type: opMulti}, } + if len(mreq.Ops) == 0 { + zxid, zerr := z.incrementAndGetZxid() + if zerr != nil { + return mkErr(zerr) + } + return mkZKResp(xid, zxid, mresp) + } for i, op := range mreq.Ops { switch req := op.Op.(type) { case *CreateRequest: From 69cebedb1cbf6e1cedd9450c16538bf488cf39eb Mon Sep 17 00:00:00 2001 From: Keith Schaffer Date: Fri, 1 Mar 2019 15:57:01 -0500 Subject: [PATCH 8/8] integration: fixed kafka test Kafka test was failing due to wrong ports and non-executable run file --- integration/kafka/Dockerfile | 1 + integration/kafka/kafka.chroot.properties | 3 +-- integration/kafka/kafka.server.properties | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/integration/kafka/Dockerfile b/integration/kafka/Dockerfile index 285a886..232c445 100644 --- a/integration/kafka/Dockerfile +++ b/integration/kafka/Dockerfile @@ -16,4 +16,5 @@ EXPOSE 9092 COPY kafka/ /kafka/config/ ADD kafka/run.sh /kafka/run.sh +RUN chmod uga+x /kafka/run.sh ENTRYPOINT [ "/bin/bash", "/kafka/run.sh" ] diff --git a/integration/kafka/kafka.chroot.properties b/integration/kafka/kafka.chroot.properties index 38e923b..c5736c7 100644 --- a/integration/kafka/kafka.chroot.properties +++ b/integration/kafka/kafka.chroot.properties @@ -15,8 +15,7 @@ transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 -# NOTE: assumes port 30001 for cross-checking configuration -zookeeper.connect=172.17.0.1:30001/kafka-chroot +zookeeper.connect=172.17.0.1:2181/kafka-chroot zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true diff --git a/integration/kafka/kafka.server.properties b/integration/kafka/kafka.server.properties index f8c1a0b..8b902f2 100644 --- a/integration/kafka/kafka.server.properties +++ b/integration/kafka/kafka.server.properties @@ -15,8 +15,7 @@ transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 -# NOTE: assumes port 30001 for cross-checking configuration -zookeeper.connect=172.17.0.1:30001 +zookeeper.connect=172.17.0.1:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true