diff --git a/pom.xml b/pom.xml
index f98071ce6..f8a193288 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,6 +94,7 @@
0.9.3
2.22
3.1.0
+ 2.12.0
1.9.5
4.0.37.Final
${netty.spark-2.11.version}
@@ -502,6 +503,24 @@
${json4s.version}
+
+ org.apache.curator
+ curator-framework
+ ${curator.version}
+
+
+
+ org.apache.curator
+ curator-recipes
+ ${curator.version}
+
+
+
+ org.apache.curator
+ curator-test
+ ${curator.version}
+
+
org.scala-lang
scala-compiler
diff --git a/server/pom.xml b/server/pom.xml
index e708964f1..191a2a351 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -148,6 +148,16 @@
json4s-jackson_${scala.binary.version}
+
+ org.apache.curator
+ curator-framework
+
+
+
+ org.apache.curator
+ curator-recipes
+
+
org.scala-lang
scala-library
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 32b3522d5..dd466c4e3 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -167,6 +167,20 @@ object LivyConf {
Entry("livy.server.thrift.delegation.token.max-lifetime", "7d")
val THRIFT_DELEGATION_TOKEN_RENEW_INTERVAL =
Entry("livy.server.thrift.delegation.token.renew-interval", "1d")
+ val THRIFT_SUPPORT_DYNAMIC_SERVICE_DISCOVERY =
+ Entry("livy.server.thrift.support.dynamic.service.discovery", false)
+ val THRIFT_ZOOKEEPER_NAMESPACE =
+ Entry("livy.server.thrift.zookeeper.namespace", "livy")
+ val THRIFT_ZOOKEEPER_QUORUM =
+ Entry("livy.server.thrift.zookeeper.quorum", "")
+ val THRIFT_ZOOKEEPER_CLIENT_PORT =
+ Entry("livy.server.thrift.zookeeper.client.port", "2181")
+ val THRIFT_ZOOKEEPER_SESSION_TIMEOUT =
+ Entry("livy.server.thrift.zookeeper.session.timeout", "1200000ms")
+ val THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES =
+ Entry("livy.server.thrift.zookeeper.connection.max.retries", 3)
+ val THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME =
+ Entry("livy.server.thrift.zookeeper.connection.basesleeptime", "1000ms")
/**
* Recovery mode of Livy. Possible values:
diff --git a/thriftserver/server/pom.xml b/thriftserver/server/pom.xml
index 4a0f267dd..6f5618e97 100644
--- a/thriftserver/server/pom.xml
+++ b/thriftserver/server/pom.xml
@@ -56,6 +56,21 @@
+
+ org.apache.hive
+ hive-common
+ ${hive.version}
+
+
+ org.apache.hive
+ *
+
+
+ org.eclipse.jetty
+ *
+
+
+
org.apache.livy
@@ -139,6 +154,11 @@
+
+ org.apache.curator
+ curator-test
+ test
+
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftDynamicServiceRegister.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftDynamicServiceRegister.scala
new file mode 100644
index 000000000..512953275
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftDynamicServiceRegister.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import java.nio.charset.Charset
+
+import org.apache.curator.framework.CuratorFramework
+import org.apache.hive.common.util.HiveVersionInfo
+import org.apache.hive.service.auth.HiveAuthConstants.AuthTypes
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.KeeperException
+
+import org.apache.livy.thriftserver.cli.ThriftCLIService
+import org.apache.livy.LivyConf
+import org.apache.livy.Logging
+
+class LivyThriftDynamicServiceRegister(server: LivyThriftServer, thriftCLIService: ThriftCLIService,
+ zookeeperManager: LivyThriftZookeeperManager)
+ extends ThriftService(classOf[LivyThriftDynamicServiceRegister].getName) with Logging {
+
+ override def start(): Unit = {
+ super.start()
+
+ if (server.livyConf.getBoolean(LivyConf.THRIFT_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+ addServerInstanceToZooKeeper()
+ }
+ }
+
+ private def addServerInstanceToZooKeeper(): Unit = {
+ val rootNamespace = server.livyConf.get(LivyConf.THRIFT_ZOOKEEPER_NAMESPACE)
+ val instanceURI = getServerInstanceURI(thriftCLIService)
+ val znodePath = getZnodePath(rootNamespace, instanceURI)
+ val publishConfigs = getPublishConfigs(server.livyConf)
+
+ val zookeeperClient = zookeeperManager.getClient()
+ createParent(zookeeperClient, rootNamespace)
+ createZnode(zookeeperClient, znodePath, publishConfigs)
+ }
+
+ private def createZnode(zookeeperClient: CuratorFramework, znodePath: String,
+ publishConfigs: Map[String, String]): Unit = {
+ try {
+ val znodeData = publishConfigs.map(_.productIterator.mkString("=")).mkString(";")
+ val znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"))
+ zookeeperClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+ .forPath(znodePath, znodeDataUTF8)
+ info(s"Created a znode ${znodePath} on ZooKeeper for Livy ThriftServer")
+ } catch {
+ case e: Exception =>
+ error("Unable to create a znode for this server instance", e)
+ throw e
+ }
+ }
+
+ private def createParent(zookeeperClient: CuratorFramework, rootNamespace: String): Unit = {
+ try {
+ zookeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+ .forPath(LivyThriftZookeeperManager.ZOOKEEPER_PATH_SEPARATOR + rootNamespace)
+ info(s"Created the root name space: $rootNamespace on ZooKeeper for Livy ThriftServer")
+ } catch {
+ case e: KeeperException =>
+ if (e.code ne KeeperException.Code.NODEEXISTS) {
+ error(s"Unable to create Livy ThriftServer " +
+ s"namespace: $rootNamespace on ZooKeeper", e)
+ throw e
+ }
+ }
+ }
+
+ private def getZnodePath(rootNamespace: String, instanceURI: String): String = {
+ LivyThriftZookeeperManager.ZOOKEEPER_PATH_SEPARATOR + rootNamespace +
+ LivyThriftZookeeperManager.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";" +
+ "version=" + HiveVersionInfo.getVersion + ";" + "sequence="
+ }
+
+ private def getServerInstanceURI(thriftCLIService: ThriftCLIService): String = {
+ return s"${getServerHostName()}:${getServerPortNumber()}"
+ }
+
+ private def getServerHostName(): String = {
+ if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress == null)) {
+ throw new Exception("Unable to get the server address; it hasn't been initialized yet.")
+ }
+ val thriftBindHost = server.livyConf.get(LivyConf.THRIFT_BIND_HOST)
+ if (thriftBindHost != null && !thriftBindHost.isEmpty) {
+ return thriftBindHost
+ } else {
+ return thriftCLIService.getServerIPAddress.getHostName
+ }
+ }
+
+ private def getServerPortNumber(): Int = {
+ if (thriftCLIService == null) {
+ throw new Exception("Unable to get the server address; it hasn't been initialized yet.")
+ }
+ return thriftCLIService.getPortNumber
+ }
+
+ private def getPublishConfigs(livyConf: LivyConf): Map[String, String] = {
+ val commonConfigs =
+ Map("hive.server2.thrift.bind.host" -> getServerHostName(),
+ "hive.server2.transport.mode" -> livyConf.get(LivyConf.THRIFT_TRANSPORT_MODE),
+ "hive.server2.authentication" -> livyConf.get(LivyConf.THRIFT_AUTHENTICATION),
+ "hive.server2.use.SSL" -> livyConf.getBoolean(LivyConf.THRIFT_USE_SSL).toString())
+
+ val transportConfigs = if (LivyThriftServer.isHTTPTransportMode(livyConf)) {
+ Map("hive.server2.thrift.http.port" -> getServerPortNumber().toString,
+ "hive.server2.thrift.http.path" -> livyConf.get(LivyConf.THRIFT_HTTP_PATH))
+ } else {
+ Map("hive.server2.thrift.port" -> getServerPortNumber().toString,
+ "hive.server2.thrift.sasl.qop" -> livyConf.get(LivyConf.THRIFT_SASL_QOP))
+ }
+
+ val isAuthKerberos =
+ livyConf.get(LivyConf.THRIFT_AUTHENTICATION).equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName)
+ val kerberosConfigs = if (isAuthKerberos) {
+ Map("hive.server2.authentication.kerberos.principal" ->
+ livyConf.get(LivyConf.AUTH_KERBEROS_PRINCIPAL))
+ } else {
+ Map()
+ }
+
+ commonConfigs ++ transportConfigs ++ kerberosConfigs
+ }
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala
index daf7b8266..08a06dd9f 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala
@@ -26,7 +26,9 @@ import org.apache.livy.server.AccessManager
import org.apache.livy.server.interactive.InteractiveSession
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.InteractiveSessionManager
-import org.apache.livy.thriftserver.cli.{ThriftBinaryCLIService, ThriftHttpCLIService}
+import org.apache.livy.thriftserver.cli.ThriftBinaryCLIService
+import org.apache.livy.thriftserver.cli.ThriftCLIService
+import org.apache.livy.thriftserver.cli.ThriftHttpCLIService
/**
* The main entry point for the Livy thrift server leveraging HiveServer2. Starts up a
@@ -127,6 +129,9 @@ class LivyThriftServer(
new ThriftBinaryCLIService(cliService, oomHook)
}
addService(thriftCLIService)
+ val zookeeperManager = new LivyThriftZookeeperManager(this)
+ addService(zookeeperManager)
+ addService(new LivyThriftDynamicServiceRegister(this, thriftCLIService, zookeeperManager))
super.init(livyConf)
}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftZookeeperManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftZookeeperManager.scala
new file mode 100644
index 000000000..7cf095299
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftZookeeperManager.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+
+import org.apache.livy.{LivyConf, Logging}
+
+object LivyThriftZookeeperManager {
+ val ZOOKEEPER_PATH_SEPARATOR = "/"
+}
+
+class LivyThriftZookeeperManager(server: LivyThriftServer)
+ extends ThriftService(classOf[LivyThriftZookeeperManager].getName) with Logging {
+
+ private var zookeeperClient: CuratorFramework = null
+
+ override def start(): Unit = {
+ val livyConf = server.livyConf
+ val quorum = livyConf.get(LivyConf.THRIFT_ZOOKEEPER_QUORUM).trim
+ if (!quorum.isEmpty) {
+ val port = livyConf.get(LivyConf.THRIFT_ZOOKEEPER_CLIENT_PORT)
+ val quorumServers = getQuorumServers(quorum, port)
+
+ val sessionTimeout = livyConf.getTimeAsMs(LivyConf.THRIFT_ZOOKEEPER_SESSION_TIMEOUT)
+ .asInstanceOf[Int]
+ val baseSleepTime = livyConf.getTimeAsMs(LivyConf.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME)
+ .asInstanceOf[Int]
+ val maxRetries = livyConf.getInt(LivyConf.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES)
+
+ zookeeperClient = CuratorFrameworkFactory.builder()
+ .connectString(quorumServers)
+ .sessionTimeoutMs(sessionTimeout)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+ .build()
+ zookeeperClient.start()
+ }
+
+ super.start()
+ }
+
+ private[thriftserver] def getQuorumServers(quorum: String, clientPort: String): String = {
+ val quorumServers = quorum.split(",").foldLeft(new StringBuilder()) { (sb, x) =>
+ sb.append(x)
+ if (!x.contains(":")) {
+ // if the quorum doesn't contain a port, add the configured port to quorum
+ sb.append(":").append(clientPort)
+ }
+ sb.append(",")
+ }
+
+ if (quorumServers.length > 0) {
+ quorumServers.substring(0, quorumServers.length - 1)
+ } else {
+ ""
+ }
+ }
+
+ def getClient(): CuratorFramework = {
+ if (zookeeperClient == null) {
+ throw new RuntimeException(s"Zookeeper client is not initialized, " +
+ s"please make sure ${LivyConf.THRIFT_ZOOKEEPER_QUORUM.key} is configured correctly.")
+ }
+ zookeeperClient
+ }
+}
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
index 9a7823d2d..2b48a15a7 100644
--- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
@@ -36,6 +36,8 @@ object ServerMode extends Enumeration {
abstract class ThriftServerBaseTest extends FunSuite with BeforeAndAfterAll {
def mode: ServerMode.Value
def port: Int
+ def dynamicServiceDiscovery: Boolean = false
+ def zookeeperQuorum: String = ""
val THRIFT_SERVER_STARTUP_TIMEOUT = 30000 // ms
@@ -56,6 +58,8 @@ abstract class ThriftServerBaseTest extends FunSuite with BeforeAndAfterAll {
Class.forName(classOf[HiveDriver].getCanonicalName)
livyConf.set(LivyConf.THRIFT_TRANSPORT_MODE, mode.toString)
livyConf.set(LivyConf.THRIFT_SERVER_PORT, port)
+ livyConf.set(LivyConf.THRIFT_SUPPORT_DYNAMIC_SERVICE_DISCOVERY, dynamicServiceDiscovery)
+ livyConf.set(LivyConf.THRIFT_ZOOKEEPER_QUORUM, zookeeperQuorum)
// Set formatted Spark and Scala version into livy configuration, this will be used by
// session creation.
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
index a3d9e882e..37c322213 100644
--- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
@@ -17,8 +17,11 @@
package org.apache.livy.thriftserver
+import java.io.File
import java.sql.{Date, SQLException, Statement}
+import org.apache.curator.test.TestingServer
+
import org.apache.livy.LivyConf
@@ -176,3 +179,30 @@ class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests
}
}
}
+
+class DynamicServiceDiscoveryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests {
+ override def mode: ServerMode.Value = ServerMode.binary
+ override def port: Int = 20000
+ override def dynamicServiceDiscovery: Boolean = true
+ override def zookeeperQuorum: String = "localhost:2282"
+
+ override def beforeAll(): Unit = {
+ val zookeeperServer = new TestingServer(2282, new File("/tmp"))
+ zookeeperServer.start()
+
+ super.beforeAll()
+ }
+
+ test("fetch different data types") {
+ val supportMap = hiveSupportEnabled(formattedSparkVersion._1, livyConf)
+ withJdbcConnection(s"jdbc:hive2://${zookeeperQuorum}/;" +
+ s"serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=livy") { c =>
+ val statement = c.createStatement()
+ try {
+ dataTypesTest(statement, supportMap)
+ } finally {
+ statement.close()
+ }
+ }
+ }
+}