From 18254ffb0ba7f6afbc1e6b589efc08defbf6cbbf Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Wed, 31 Jul 2019 20:16:12 +0300 Subject: [PATCH 01/26] [LIVY-616] Livy Server discovery --- .../main/scala/org/apache/livy/LivyConf.scala | 10 +- .../org/apache/livy/server/LivyServer.scala | 16 ++- .../server/discovery/DiscoveryManager.scala | 44 ++++++ .../livy/server/discovery/JsonMapper.scala | 34 +++++ .../server/discovery/ZooKeeperManager.scala | 126 ++++++++++++++++++ .../livy/server/recovery/StateStore.scala | 20 +-- .../server/recovery/ZooKeeperStateStore.scala | 87 ++---------- .../discovery/DiscoveryManagerSpec.scala | 113 ++++++++++++++++ .../recovery/ZooKeeperStateStoreSpec.scala | 19 ++- 9 files changed, 360 insertions(+), 109 deletions(-) create mode 100644 server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala create mode 100644 server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala create mode 100644 server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala create mode 100644 server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 32b3522d5..028480aa7 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -22,9 +22,7 @@ import java.lang.{Boolean => JBoolean, Long => JLong} import java.util.{Map => JMap} import scala.collection.JavaConverters._ - import org.apache.hadoop.conf.Configuration - import org.apache.livy.client.common.ClientConf import org.apache.livy.client.common.ClientConf.ConfEntry import org.apache.livy.client.common.ClientConf.DeprecatedConf @@ -187,10 +185,16 @@ object LivyConf { /** * For filesystem state store, the path of the state store directory. Please don't use a * filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. - * For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 + * For zookeeper, use LIVY_ZOOKEEPER_URLS */ val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "") + /** + * ZooKeeper address witch will be used for Livy Server discovery + * and Zookeeper state store. e.g. host1:port1,host2:port2 + */ + val LIVY_ZOOKEEPER_URL = Entry("livy.server.zookeeper.url", null) + // Livy will cache the max no of logs specified. 0 means don't cache the logs. val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 60f3961dc..f16218557 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -21,12 +21,13 @@ import java.io.{BufferedInputStream, InputStream} import java.net.InetAddress import java.util.concurrent._ import java.util.EnumSet + import javax.servlet._ +import org.apache.curator.framework.CuratorFramework import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future - import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation} import org.apache.hadoop.security.authentication.server._ import org.eclipse.jetty.servlet.FilterHolder @@ -34,9 +35,9 @@ import org.scalatra.{NotFound, ScalatraServlet} import org.scalatra.metrics.MetricsBootstrap import org.scalatra.metrics.MetricsSupportExtensions._ import org.scalatra.servlet.{MultipartConfig, ServletApiImplicits} - import org.apache.livy._ import org.apache.livy.server.batch.BatchSessionServlet +import org.apache.livy.server.discovery.DiscoveryManager import org.apache.livy.server.interactive.InteractiveSessionServlet import org.apache.livy.server.recovery.{SessionStore, StateStore} import org.apache.livy.server.ui.UIServlet @@ -72,6 +73,8 @@ class LivyServer extends Logging { maxFileSize = Some(livyConf.getLong(LivyConf.FILE_UPLOAD_MAX_SIZE)) ).toMultipartConfigElement + setServerAddress(livyConf) + // Make sure the `spark-submit` program exists, otherwise much of livy won't work. testSparkHome(livyConf) @@ -385,6 +388,15 @@ class LivyServer extends Logging { } } + private[livy] def setServerAddress(livyConf: LivyConf, address: Option[String] = None, + mockCuratorClient: Option[CuratorFramework] = None): Unit = { + if (Option(livyConf.get(LIVY_ZOOKEEPER_URL)).isDefined) { + val discoveryManager = DiscoveryManager(livyConf, mockCuratorClient) + val host = InetAddress.getLocalHost.getHostName + discoveryManager.setAddress(address.getOrElse(s"$host:${livyConf.getInt(LivyConf.SERVER_PORT)}")) + } + } + private[livy] def testRecovery(livyConf: LivyConf): Unit = { if (!livyConf.isRunningOnYarn()) { // If recovery is turned on but we are not running on YARN, quit. diff --git a/server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala new file mode 100644 index 000000000..320a4ab58 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.server.discovery + +import org.apache.curator.framework.CuratorFramework +import org.apache.livy.LivyConf + +class DiscoveryManager(livyConf: LivyConf, + mockCuratorClient: Option[CuratorFramework] = None) + extends ZooKeeperManager(livyConf, mockCuratorClient) { + + import DiscoveryManager._ + + def setAddress(address: String): Unit = { + setData(LIVY_SERVER_URL_KEY, address) + } + + def getAddress(): String = { + getData[String](LIVY_SERVER_URL_KEY).getOrElse("") + } +} + +object DiscoveryManager { + val LIVY_SERVER_URL_KEY = "server.url" + + def apply(livyConf: LivyConf, + mockCuratorClient: Option[CuratorFramework] = None): DiscoveryManager = { + new DiscoveryManager(livyConf, mockCuratorClient) + } +} diff --git a/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala b/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala new file mode 100644 index 000000000..f5dc26ec6 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.server.discovery + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.livy.sessions.SessionKindModule + +import scala.reflect.{ClassTag, classTag} + +protected[server] trait JsonMapper { + protected val mapper = new ObjectMapper() + .registerModule(DefaultScalaModule) + .registerModule(new SessionKindModule()) + + def serializeToBytes(value: Object): Array[Byte] = mapper.writeValueAsBytes(value) + + def deserialize[T: ClassTag](json: Array[Byte]): T = + mapper.readValue(json, classTag[T].runtimeClass.asInstanceOf[Class[T]]) +} diff --git a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala new file mode 100644 index 000000000..2c8b2ad84 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.server.discovery + +import org.apache.curator.framework.api.UnhandledErrorListener +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.retry.RetryNTimes +import org.apache.livy.LivyConf.Entry +import org.apache.livy.{LivyConf, Logging} +import org.apache.zookeeper.KeeperException.NoNodeException + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + + +class ZooKeeperManager( + livyConf: LivyConf, + mockCuratorClient: Option[CuratorFramework] = None) // For testing + extends JsonMapper with Logging { + + import ZooKeeperManager._ + + // Constructor defined for StateStore factory to new this class using reflection. + def this(livyConf: LivyConf) { + this(livyConf, None) + } + + private val zkAddress = livyConf.get(LivyConf.LIVY_ZOOKEEPER_URL) + require(Option(zkAddress).isDefined, s"Please config ${LivyConf.LIVY_ZOOKEEPER_URL.key}.") + private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF) + private val retryValue = livyConf.get(ZK_RETRY_CONF) + // a regex to match patterns like "m, n" where m and n both are integer values + private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r + private[server] val retryPolicy = retryValue match { + case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt) + case _ => throw new IllegalArgumentException( + s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " + + "Correct format is ,. e.g. 5,100") + } + + private val curatorClient = mockCuratorClient.getOrElse { + CuratorFrameworkFactory.newClient(zkAddress, retryPolicy) + } + + Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { + override def run(): Unit = { + curatorClient.close() + } + })) + + curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener { + def unhandledError(message: String, e: Throwable): Unit = { + error(s"Fatal Zookeeper error. Shutting down Livy server.") + System.exit(1) + } + }) + curatorClient.start() + // TODO Make sure ZK path has proper secure permissions so that other users cannot read its + // contents. + + def setData(key: String, value: Object): Unit = { + val prefixedKey = prefixKey(key) + val data = serializeToBytes(value) + if (exist(prefixedKey)) { + curatorClient.setData().forPath(prefixedKey, data) + } else { + curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data) + } + } + + def getData[T: ClassTag](key: String): Option[T] = { + val prefixedKey = prefixKey(key) + if (exist(prefixedKey)) { + Option(deserialize[T](curatorClient.getData().forPath(prefixedKey))) + } else { + None + } + } + + def getChildren(key: String): Seq[String] = { + val prefixedKey = prefixKey(key) + if (exist(prefixedKey)) { + curatorClient.getChildren.forPath(prefixedKey).asScala + } else { + Seq.empty[String] + } + } + + def delete(key: String): Unit = { + try { + curatorClient.delete().guaranteed().forPath(prefixKey(key)) + } catch { + case _: NoNodeException => + } + } + + def exist(key: String): Boolean = { + Option(curatorClient.checkExists().forPath(key)).isDefined + } + + private def prefixKey(key: String) = s"/$zkKeyPrefix/$key" +} + +object ZooKeeperManager { + val ZK_KEY_PREFIX_CONF = Entry("livy.server.zookeeper.key-prefix", "livy") + val ZK_RETRY_CONF = Entry("livy.server.zookeeper.retry-policy", "5,100") + + def apply(livyConf: LivyConf, + mockCuratorClient: Option[CuratorFramework] = None): ZooKeeperManager = { + new ZooKeeperManager(livyConf, mockCuratorClient) + } +} diff --git a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala index a6c3275d7..6447b71ed 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala @@ -16,25 +16,11 @@ */ package org.apache.livy.server.recovery -import scala.reflect.{classTag, ClassTag} - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule - -import org.apache.livy.{LivyConf, Logging} -import org.apache.livy.sessions.SessionKindModule +import org.apache.livy.server.discovery.JsonMapper import org.apache.livy.sessions.SessionManager._ +import org.apache.livy.{LivyConf, Logging} -protected trait JsonMapper { - protected val mapper = new ObjectMapper() - .registerModule(DefaultScalaModule) - .registerModule(new SessionKindModule()) - - def serializeToBytes(value: Object): Array[Byte] = mapper.writeValueAsBytes(value) - - def deserialize[T: ClassTag](json: Array[Byte]): T = - mapper.readValue(json, classTag[T].runtimeClass.asInstanceOf[Class[T]]) -} +import scala.reflect.ClassTag /** * Interface of a key-value pair storage for state storage. diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala index ec6b9df18..0c318c1f1 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala @@ -16,103 +16,36 @@ */ package org.apache.livy.server.recovery -import scala.collection.JavaConverters._ -import scala.reflect.ClassTag -import scala.util.Try - -import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} -import org.apache.curator.framework.api.UnhandledErrorListener -import org.apache.curator.retry.RetryNTimes -import org.apache.zookeeper.KeeperException.NoNodeException - +import org.apache.curator.framework.CuratorFramework +import org.apache.livy.server.discovery.ZooKeeperManager import org.apache.livy.{LivyConf, Logging} -import org.apache.livy.LivyConf.Entry -object ZooKeeperStateStore { - val ZK_KEY_PREFIX_CONF = Entry("livy.server.recovery.zk-state-store.key-prefix", "livy") - val ZK_RETRY_CONF = Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100") -} +import scala.reflect.ClassTag -class ZooKeeperStateStore( - livyConf: LivyConf, - mockCuratorClient: Option[CuratorFramework] = None) // For testing +class ZooKeeperStateStore(livyConf: LivyConf, + mockCuratorClient: Option[CuratorFramework] = None) // For testing extends StateStore(livyConf) with Logging { - import ZooKeeperStateStore._ + val zooKeeperManager = ZooKeeperManager(livyConf, mockCuratorClient) // Constructor defined for StateStore factory to new this class using reflection. def this(livyConf: LivyConf) { this(livyConf, None) } - private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL) - require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.") - private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF) - private val retryValue = livyConf.get(ZK_RETRY_CONF) - // a regex to match patterns like "m, n" where m and n both are integer values - private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r - private[recovery] val retryPolicy = retryValue match { - case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt) - case _ => throw new IllegalArgumentException( - s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " + - "Correct format is ,. e.g. 5,100") - } - - private val curatorClient = mockCuratorClient.getOrElse { - CuratorFrameworkFactory.newClient(zkAddress, retryPolicy) - } - - Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { - override def run(): Unit = { - curatorClient.close() - } - })) - - curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener { - def unhandledError(message: String, e: Throwable): Unit = { - error(s"Fatal Zookeeper error. Shutting down Livy server.") - System.exit(1) - } - }) - curatorClient.start() - // TODO Make sure ZK path has proper secure permissions so that other users cannot read its - // contents. - override def set(key: String, value: Object): Unit = { - val prefixedKey = prefixKey(key) - val data = serializeToBytes(value) - if (curatorClient.checkExists().forPath(prefixedKey) == null) { - curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data) - } else { - curatorClient.setData().forPath(prefixedKey, data) - } + zooKeeperManager.setData(key, value) } override def get[T: ClassTag](key: String): Option[T] = { - val prefixedKey = prefixKey(key) - if (curatorClient.checkExists().forPath(prefixedKey) == null) { - None - } else { - Option(deserialize[T](curatorClient.getData().forPath(prefixedKey))) - } + zooKeeperManager.getData[T](key) } override def getChildren(key: String): Seq[String] = { - val prefixedKey = prefixKey(key) - if (curatorClient.checkExists().forPath(prefixedKey) == null) { - Seq.empty[String] - } else { - curatorClient.getChildren.forPath(prefixedKey).asScala - } + zooKeeperManager.getChildren(key) } override def remove(key: String): Unit = { - try { - curatorClient.delete().guaranteed().forPath(prefixKey(key)) - } catch { - case _: NoNodeException => - } + zooKeeperManager.delete(key) } - - private def prefixKey(key: String) = s"/$zkKeyPrefix/$key" } diff --git a/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala b/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala new file mode 100644 index 000000000..bee90e4c3 --- /dev/null +++ b/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.server.discovery + +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.api.{ExistsBuilder, GetDataBuilder, SetDataBuilder, UnhandledErrorListener} +import org.apache.curator.framework.listen.Listenable +import org.apache.livy.LivyConf.LIVY_ZOOKEEPER_URL +import org.apache.livy.server.LivyServer +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.zookeeper.data.Stat +import org.mockito.Mockito.{never, verify, when} +import org.scalatest.FunSpec +import org.scalatest.mock.MockitoSugar.mock + + +class DiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite + with JsonMapper { + describe("DiscoveryManagerSpec") { + case class TestFixture(discoveryManager: DiscoveryManager, curatorClient: CuratorFramework) + val conf = new LivyConf() + conf.set(LivyConf.LIVY_ZOOKEEPER_URL, "host") + val key = DiscoveryManager.LIVY_SERVER_URL_KEY + val prefixedKey = s"/livy/$key" + val testAddress = s"0.0.0.0:${conf.getInt(LivyConf.SERVER_PORT)}" + val testData: Array[Byte] = serializeToBytes(testAddress) + + def withMock[R](testBody: TestFixture => R): R = { + val curatorClient = mock[CuratorFramework] + when(curatorClient.getUnhandledErrorListenable()) + .thenReturn(mock[Listenable[UnhandledErrorListener]]) + val discoveryManager = DiscoveryManager(conf, Some(curatorClient)) + testBody(TestFixture(discoveryManager, curatorClient)) + } + + def mockExistsBuilder(curatorClient: CuratorFramework, exists: Boolean): Unit = { + val existsBuilder = mock[ExistsBuilder] + when(curatorClient.checkExists()).thenReturn(existsBuilder) + if (exists) { + when(existsBuilder.forPath(prefixedKey)).thenReturn(mock[Stat]) + } + } + + it("setAddress should use curatorClient") { + withMock { f => + mockExistsBuilder(f.curatorClient, exists = true) + + val setDataBuilder = mock[SetDataBuilder] + when(f.curatorClient.setData()).thenReturn(setDataBuilder) + + f.discoveryManager.setAddress(testAddress) + + verify(f.curatorClient).start() + verify(setDataBuilder).forPath(prefixedKey, testData) + } + } + + it("getAddress should use curatorClient") { + withMock { f => + mockExistsBuilder(f.curatorClient, exists = true) + val getDataBuilder = mock[GetDataBuilder] + when(f.curatorClient.getData()).thenReturn(getDataBuilder) + when(getDataBuilder.forPath(prefixedKey)).thenReturn(testData) + + f.discoveryManager.getAddress() + + verify(f.curatorClient).start() + verify(getDataBuilder).forPath(prefixedKey) + } + } + + it("Livy Server should use DiscoveryManager") { + withMock { f => + val livyConf = new LivyConf() + livyConf.set(LIVY_ZOOKEEPER_URL, "host:port") + val s = new LivyServer() + + mockExistsBuilder(f.curatorClient, exists = true) + + val setDataBuilder = mock[SetDataBuilder] + when(f.curatorClient.setData()).thenReturn(setDataBuilder) + + s.setServerAddress(livyConf, Some(testAddress), Some(f.curatorClient)) + verify(setDataBuilder).forPath(prefixedKey, testData) + } + } + + it("Livy Server should skip DiscoveryManager if ZooKeeper url isn't defined") { + withMock { f => + val livyConf = new LivyConf() + val s = new LivyServer() + + s.setServerAddress(livyConf, Some(testAddress), Some(f.curatorClient)) + verify(f.curatorClient, never).setData() + } + } + } + +} diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala index 88e530f62..8f7eba09e 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -18,23 +18,22 @@ package org.apache.livy.server.recovery import scala.collection.JavaConverters._ - import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.api._ import org.apache.curator.framework.listen.Listenable +import org.apache.livy.server.discovery.ZooKeeperManager import org.apache.zookeeper.data.Stat import org.mockito.Mockito._ import org.scalatest.FunSpec import org.scalatest.Matchers._ import org.scalatest.mock.MockitoSugar.mock - import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { describe("ZooKeeperStateStore") { case class TestFixture(stateStore: ZooKeeperStateStore, curatorClient: CuratorFramework) val conf = new LivyConf() - conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host") + conf.set(LivyConf.LIVY_ZOOKEEPER_URL, "host") val key = "key" val prefixedKey = s"/livy/$key" @@ -57,11 +56,11 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { it("should throw on bad config") { withMock { f => val conf = new LivyConf() - intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) } + intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf, Some(f.curatorClient)) } - conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host") - conf.set(ZooKeeperStateStore.ZK_RETRY_CONF, "bad") - intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) } + conf.set(LivyConf.LIVY_ZOOKEEPER_URL, "host") + conf.set(ZooKeeperManager.ZK_RETRY_CONF, "bad") + intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf, Some(f.curatorClient)) } } } @@ -96,12 +95,12 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { } it("get should retrieve retry policy configs") { - conf.set(org.apache.livy.server.recovery.ZooKeeperStateStore.ZK_RETRY_CONF, "11,77") + conf.set(ZooKeeperManager.ZK_RETRY_CONF, "11,77") withMock { f => mockExistsBuilder(f.curatorClient, true) - f.stateStore.retryPolicy should not be null - f.stateStore.retryPolicy.getN shouldBe 11 + f.stateStore.zooKeeperManager.retryPolicy should not be null + f.stateStore.zooKeeperManager.retryPolicy.getN shouldBe 11 } } From 65260520f38ec927bfe6509a57419066b1fabdbd Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Wed, 31 Jul 2019 21:55:11 +0300 Subject: [PATCH 02/26] scalastyle import order and line lengths --- server/src/main/scala/org/apache/livy/LivyConf.scala | 2 +- .../src/main/scala/org/apache/livy/server/LivyServer.scala | 7 ++++--- .../org/apache/livy/server/discovery/JsonMapper.scala | 3 +-- .../apache/livy/server/discovery/ZooKeeperManager.scala | 6 ++---- .../scala/org/apache/livy/server/recovery/StateStore.scala | 3 +-- .../livy/server/discovery/DiscoveryManagerSpec.scala | 2 +- .../livy/server/recovery/ZooKeeperStateStoreSpec.scala | 4 ++-- 7 files changed, 12 insertions(+), 15 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 028480aa7..196799ad0 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -21,11 +21,11 @@ import java.io.File import java.lang.{Boolean => JBoolean, Long => JLong} import java.util.{Map => JMap} -import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.livy.client.common.ClientConf import org.apache.livy.client.common.ClientConf.ConfEntry import org.apache.livy.client.common.ClientConf.DeprecatedConf +import scala.collection.JavaConverters._ object LivyConf { diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index f16218557..33df89dc6 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -19,10 +19,10 @@ package org.apache.livy.server import java.io.{BufferedInputStream, InputStream} import java.net.InetAddress -import java.util.concurrent._ import java.util.EnumSet - +import java.util.concurrent._ import javax.servlet._ + import org.apache.curator.framework.CuratorFramework import scala.collection.JavaConverters._ @@ -393,7 +393,8 @@ class LivyServer extends Logging { if (Option(livyConf.get(LIVY_ZOOKEEPER_URL)).isDefined) { val discoveryManager = DiscoveryManager(livyConf, mockCuratorClient) val host = InetAddress.getLocalHost.getHostName - discoveryManager.setAddress(address.getOrElse(s"$host:${livyConf.getInt(LivyConf.SERVER_PORT)}")) + discoveryManager.setAddress( + address.getOrElse(s"$host:${livyConf.getInt(LivyConf.SERVER_PORT)}")) } } diff --git a/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala b/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala index f5dc26ec6..2d2f1a76b 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala @@ -19,8 +19,7 @@ package org.apache.livy.server.discovery import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.livy.sessions.SessionKindModule - -import scala.reflect.{ClassTag, classTag} +import scala.reflect.{classTag, ClassTag} protected[server] trait JsonMapper { protected val mapper = new ObjectMapper() diff --git a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala index 2c8b2ad84..c368cc5ed 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala @@ -16,17 +16,15 @@ */ package org.apache.livy.server.discovery -import org.apache.curator.framework.api.UnhandledErrorListener import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.framework.api.UnhandledErrorListener import org.apache.curator.retry.RetryNTimes -import org.apache.livy.LivyConf.Entry import org.apache.livy.{LivyConf, Logging} +import org.apache.livy.LivyConf.Entry import org.apache.zookeeper.KeeperException.NoNodeException - import scala.collection.JavaConverters._ import scala.reflect.ClassTag - class ZooKeeperManager( livyConf: LivyConf, mockCuratorClient: Option[CuratorFramework] = None) // For testing diff --git a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala index 6447b71ed..7012bff72 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala @@ -16,10 +16,9 @@ */ package org.apache.livy.server.recovery +import org.apache.livy.{LivyConf, Logging} import org.apache.livy.server.discovery.JsonMapper import org.apache.livy.sessions.SessionManager._ -import org.apache.livy.{LivyConf, Logging} - import scala.reflect.ClassTag /** diff --git a/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala b/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala index bee90e4c3..5c5647505 100644 --- a/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala @@ -19,9 +19,9 @@ package org.apache.livy.server.discovery import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.api.{ExistsBuilder, GetDataBuilder, SetDataBuilder, UnhandledErrorListener} import org.apache.curator.framework.listen.Listenable +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} import org.apache.livy.LivyConf.LIVY_ZOOKEEPER_URL import org.apache.livy.server.LivyServer -import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} import org.apache.zookeeper.data.Stat import org.mockito.Mockito.{never, verify, when} import org.scalatest.FunSpec diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala index 8f7eba09e..597ef30bf 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -17,17 +17,17 @@ package org.apache.livy.server.recovery -import scala.collection.JavaConverters._ import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.api._ import org.apache.curator.framework.listen.Listenable +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} import org.apache.livy.server.discovery.ZooKeeperManager import org.apache.zookeeper.data.Stat import org.mockito.Mockito._ import org.scalatest.FunSpec import org.scalatest.Matchers._ import org.scalatest.mock.MockitoSugar.mock -import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import scala.collection.JavaConverters._ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { describe("ZooKeeperStateStore") { From 5f30372b4e6b42c87e287d15f53d7cb4079d6c03 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Thu, 1 Aug 2019 13:10:10 +0300 Subject: [PATCH 03/26] reorganized imports to fit project scalastyle requirements --- server/src/main/scala/org/apache/livy/LivyConf.scala | 4 +++- .../main/scala/org/apache/livy/server/LivyServer.scala | 5 +++-- .../apache/livy/server/discovery/DiscoveryManager.scala | 1 + .../org/apache/livy/server/discovery/JsonMapper.scala | 4 +++- .../apache/livy/server/discovery/ZooKeeperManager.scala | 8 +++++--- .../org/apache/livy/server/recovery/StateStore.scala | 3 ++- .../livy/server/recovery/ZooKeeperStateStore.scala | 6 ++++-- .../livy/server/discovery/DiscoveryManagerSpec.scala | 6 +++--- .../livy/server/recovery/ZooKeeperStateStoreSpec.scala | 9 +++++---- 9 files changed, 29 insertions(+), 17 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 196799ad0..81fc07553 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -21,11 +21,13 @@ import java.io.File import java.lang.{Boolean => JBoolean, Long => JLong} import java.util.{Map => JMap} +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration + import org.apache.livy.client.common.ClientConf import org.apache.livy.client.common.ClientConf.ConfEntry import org.apache.livy.client.common.ClientConf.DeprecatedConf -import scala.collection.JavaConverters._ object LivyConf { diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 33df89dc6..7727e3ba3 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -23,11 +23,11 @@ import java.util.EnumSet import java.util.concurrent._ import javax.servlet._ -import org.apache.curator.framework.CuratorFramework - import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future + +import org.apache.curator.framework.CuratorFramework import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation} import org.apache.hadoop.security.authentication.server._ import org.eclipse.jetty.servlet.FilterHolder @@ -35,6 +35,7 @@ import org.scalatra.{NotFound, ScalatraServlet} import org.scalatra.metrics.MetricsBootstrap import org.scalatra.metrics.MetricsSupportExtensions._ import org.scalatra.servlet.{MultipartConfig, ServletApiImplicits} + import org.apache.livy._ import org.apache.livy.server.batch.BatchSessionServlet import org.apache.livy.server.discovery.DiscoveryManager diff --git a/server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala index 320a4ab58..6e75fb43e 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala @@ -17,6 +17,7 @@ package org.apache.livy.server.discovery import org.apache.curator.framework.CuratorFramework + import org.apache.livy.LivyConf class DiscoveryManager(livyConf: LivyConf, diff --git a/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala b/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala index 2d2f1a76b..e509bf5e6 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala @@ -16,10 +16,12 @@ */ package org.apache.livy.server.discovery +import scala.reflect.{classTag, ClassTag} + import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule + import org.apache.livy.sessions.SessionKindModule -import scala.reflect.{classTag, ClassTag} protected[server] trait JsonMapper { protected val mapper = new ObjectMapper() diff --git a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala index c368cc5ed..e9c6747f9 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala @@ -16,14 +16,16 @@ */ package org.apache.livy.server.discovery +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.framework.api.UnhandledErrorListener import org.apache.curator.retry.RetryNTimes +import org.apache.zookeeper.KeeperException.NoNodeException + import org.apache.livy.{LivyConf, Logging} import org.apache.livy.LivyConf.Entry -import org.apache.zookeeper.KeeperException.NoNodeException -import scala.collection.JavaConverters._ -import scala.reflect.ClassTag class ZooKeeperManager( livyConf: LivyConf, diff --git a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala index 7012bff72..d338a1cc1 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala @@ -16,10 +16,11 @@ */ package org.apache.livy.server.recovery +import scala.reflect.ClassTag + import org.apache.livy.{LivyConf, Logging} import org.apache.livy.server.discovery.JsonMapper import org.apache.livy.sessions.SessionManager._ -import scala.reflect.ClassTag /** * Interface of a key-value pair storage for state storage. diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala index 0c318c1f1..ac237d52a 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala @@ -16,11 +16,13 @@ */ package org.apache.livy.server.recovery +import scala.reflect.ClassTag + import org.apache.curator.framework.CuratorFramework -import org.apache.livy.server.discovery.ZooKeeperManager + import org.apache.livy.{LivyConf, Logging} +import org.apache.livy.server.discovery.ZooKeeperManager -import scala.reflect.ClassTag class ZooKeeperStateStore(livyConf: LivyConf, mockCuratorClient: Option[CuratorFramework] = None) // For testing diff --git a/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala b/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala index 5c5647505..68b6f2984 100644 --- a/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala @@ -19,14 +19,14 @@ package org.apache.livy.server.discovery import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.api.{ExistsBuilder, GetDataBuilder, SetDataBuilder, UnhandledErrorListener} import org.apache.curator.framework.listen.Listenable -import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} -import org.apache.livy.LivyConf.LIVY_ZOOKEEPER_URL -import org.apache.livy.server.LivyServer import org.apache.zookeeper.data.Stat import org.mockito.Mockito.{never, verify, when} import org.scalatest.FunSpec import org.scalatest.mock.MockitoSugar.mock +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.LivyConf.LIVY_ZOOKEEPER_URL +import org.apache.livy.server.LivyServer class DiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite with JsonMapper { diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala index 597ef30bf..b3cad37b6 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -14,20 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.livy.server.recovery +import scala.collection.JavaConverters._ + import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.api._ import org.apache.curator.framework.listen.Listenable -import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} -import org.apache.livy.server.discovery.ZooKeeperManager import org.apache.zookeeper.data.Stat import org.mockito.Mockito._ import org.scalatest.FunSpec import org.scalatest.Matchers._ import org.scalatest.mock.MockitoSugar.mock -import scala.collection.JavaConverters._ + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.server.discovery.ZooKeeperManager class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { describe("ZooKeeperStateStore") { From 9731aae21687bbcc429acdfe208da4fcfbf2d955 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Thu, 1 Aug 2019 13:31:15 +0300 Subject: [PATCH 04/26] Trigger, flaky tests From 64dabaff220f9c8e368021474c575e4b34dca53f Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Thu, 1 Aug 2019 19:15:56 +0300 Subject: [PATCH 05/26] added Java/Scala API, scaladocs, small refactoring --- .../main/java/org/apache/livy/LivyClient.java | 7 ++ .../org/apache/livy/LivyClientBuilder.java | 8 +++ .../org/apache/livy/TestClientFactory.java | 5 ++ client-http/pom.xml | 5 ++ .../apache/livy/client/http/HttpClient.java | 10 +++ .../livy/client/http/HttpClientSpec.scala | 4 ++ conf/livy.conf.template | 5 +- .../java/org/apache/livy/rsc/RSCClient.java | 1 + .../livy/scalaapi/LivyScalaClient.scala | 7 ++ .../livy/scalaapi/ScalaClientTest.scala | 7 ++ .../main/scala/org/apache/livy/LivyConf.scala | 6 +- .../org/apache/livy/server/LivyServer.scala | 17 ++--- .../server/discovery/DiscoveryManager.scala | 45 ------------- .../discovery/LivyDiscoveryManager.scala | 66 +++++++++++++++++++ .../server/recovery/ZooKeeperStateStore.scala | 10 ++- ...c.scala => LivyDiscoveryManagerSpec.scala} | 22 ++++--- 16 files changed, 157 insertions(+), 68 deletions(-) delete mode 100644 server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala create mode 100644 server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala rename server/src/test/scala/org/apache/livy/server/discovery/{DiscoveryManagerSpec.scala => LivyDiscoveryManagerSpec.scala} (83%) diff --git a/api/src/main/java/org/apache/livy/LivyClient.java b/api/src/main/java/org/apache/livy/LivyClient.java index fc03a1f7a..6f47063d9 100644 --- a/api/src/main/java/org/apache/livy/LivyClient.java +++ b/api/src/main/java/org/apache/livy/LivyClient.java @@ -107,4 +107,11 @@ public interface LivyClient { */ Future addFile(URI uri); + /** + * Get Livy Server URI. + * URI will be propagated from LivyClientBuilder during creating LivyClient. + * + * @return A future with Livy Server URI + */ + Future getServerUri(); } diff --git a/api/src/main/java/org/apache/livy/LivyClientBuilder.java b/api/src/main/java/org/apache/livy/LivyClientBuilder.java index 5bbd17025..3081ce22f 100644 --- a/api/src/main/java/org/apache/livy/LivyClientBuilder.java +++ b/api/src/main/java/org/apache/livy/LivyClientBuilder.java @@ -81,6 +81,14 @@ public LivyClientBuilder(boolean loadDefaults) throws IOException { } } + /** + * Set Livy Server URI. + * This is possible to set it manually or get URI from LivyDiscoveryManager + * ({@code livy.server.zookeeper.url} should be configured). + * + * @param uri Livy Server URI + * @return this builder + */ public LivyClientBuilder setURI(URI uri) { config.setProperty(LIVY_URI_KEY, uri.toString()); return this; diff --git a/api/src/test/java/org/apache/livy/TestClientFactory.java b/api/src/test/java/org/apache/livy/TestClientFactory.java index 89edeec5e..9bdd119da 100644 --- a/api/src/test/java/org/apache/livy/TestClientFactory.java +++ b/api/src/test/java/org/apache/livy/TestClientFactory.java @@ -81,6 +81,11 @@ public Future addFile(URI uri) { throw new UnsupportedOperationException(); } + @Override + public Future getServerUri() { + throw new UnsupportedOperationException(); + } + } } diff --git a/client-http/pom.xml b/client-http/pom.xml index 8401950c3..16e98e63e 100644 --- a/client-http/pom.xml +++ b/client-http/pom.xml @@ -78,6 +78,11 @@ httpmime 4.5.1 + + io.netty + netty-all + ${netty.version} + org.apache.livy diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index f40148f94..b59961af0 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -17,6 +17,8 @@ package org.apache.livy.client.http; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; import java.io.File; import java.net.URI; import java.nio.ByteBuffer; @@ -47,12 +49,14 @@ class HttpClient implements LivyClient { private final int sessionId; private final ScheduledExecutorService executor; private final Serializer serializer; + private final Promise serverUriPromise; private boolean stopped; HttpClient(URI uri, HttpConf httpConf) { this.config = httpConf; this.stopped = false; + this.serverUriPromise = ImmediateEventExecutor.INSTANCE.newPromise(); // If the given URI looks like it refers to an existing session, then try to connect to // an existing session. Note this means that any Spark configuration in httpConf will be @@ -77,6 +81,7 @@ class HttpClient implements LivyClient { ClientMessage create = new CreateClientRequest(sessionConf); this.conn = new LivyConnection(uri, httpConf); this.sessionId = conn.post(create, SessionInfo.class, "/").id; + serverUriPromise.setSuccess(uri); } } catch (Exception e) { throw propagate(e); @@ -145,6 +150,11 @@ public Future addFile(URI uri) { return addResource("add-file", uri); } + @Override + public Future getServerUri() { + return serverUriPromise; + } + private Future uploadResource(final File file, final String command, final String paramName) { Callable task = new Callable() { @Override diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala index f53d9f5b4..e55bc0d9b 100644 --- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala @@ -98,6 +98,10 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni testJob(false) } + withClient("should get Livy Server URI") { + assume(Option(client.getServerUri.get()).isDefined) + } + withClient("should propagate errors from jobs") { val errorMessage = "This job throws an error." val (jobId, handle) = runJob(false, { id => Seq( diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 2590e870e..d00917be7 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -100,8 +100,9 @@ # off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions. # recovery: Livy persists session info to the state store. When Livy restarts, it recovers # previous sessions from the state store. -# Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to -# configure the state store. +# Must set livy.server.recovery.state-store to needed state store (filesystem or zookeeper) +# Set livy.server.recovery.state-store.url for filesystem state store +# or livy.server.zookeeper.url for zookeeper state store. # livy.server.recovery.mode = off # Where Livy should store state to for recovery. Possible values: diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index 77d45c71f..413c8c51a 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -192,6 +192,7 @@ public void onFailure(Throwable error) throws Exception { return promise; } + @Override public Future getServerUri() { return serverUriPromise; } diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala b/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala index ca7199a99..d0521f74a 100644 --- a/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala +++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala @@ -142,6 +142,13 @@ class LivyScalaClient(livyJavaClient: LivyClient) { */ def addFile(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addFile(uri)).poll() + /** + * Get Livy Server URI. + * + * @return A future with Livy Server URI + */ + def getServerUri: Future[URI] = new PollingContainer(livyJavaClient.getServerUri).poll() + private class PollingContainer[T] private[livy] (jFuture: JFuture[T]) extends Runnable { private val initialDelay = 1 diff --git a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala index a716f5856..fe7675439 100644 --- a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala +++ b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala @@ -117,6 +117,13 @@ class ScalaClientTest extends FunSuite ScalaClientTestUtils.assertTestPassed(sFuture, "test file") } + test("test get uri") { + configureClient(true) + val getUriFuture = client.getServerUri + val uri = Await.result(getUriFuture, ScalaClientTestUtils.Timeout second) + assert(Option(uri).isDefined) + } + test("test add jar") { configureClient(true) val jar = File.createTempFile("test", ".resource") diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 81fc07553..e16291122 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -173,7 +173,9 @@ object LivyConf { * off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions. * recovery: Livy persists session info to the state store. When Livy restarts, it recovers * previous sessions from the state store. - * Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to + * Must set livy.server.recovery.state-store to needed state store (filesystem or zookeeper) + * Set livy.server.recovery.state-store.url for filesystem state store + * or livy.server.zookeeper.url for zookeeper state store. * configure the state store. */ val RECOVERY_MODE = Entry("livy.server.recovery.mode", "off") @@ -187,7 +189,7 @@ object LivyConf { /** * For filesystem state store, the path of the state store directory. Please don't use a * filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. - * For zookeeper, use LIVY_ZOOKEEPER_URLS + * For zookeeper, use livy.server.zookeeper.url. */ val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "") diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 7727e3ba3..32765065e 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -18,7 +18,7 @@ package org.apache.livy.server import java.io.{BufferedInputStream, InputStream} -import java.net.InetAddress +import java.net.{InetAddress, URI} import java.util.EnumSet import java.util.concurrent._ import javax.servlet._ @@ -38,7 +38,7 @@ import org.scalatra.servlet.{MultipartConfig, ServletApiImplicits} import org.apache.livy._ import org.apache.livy.server.batch.BatchSessionServlet -import org.apache.livy.server.discovery.DiscoveryManager +import org.apache.livy.server.discovery.LivyDiscoveryManager import org.apache.livy.server.interactive.InteractiveSessionServlet import org.apache.livy.server.recovery.{SessionStore, StateStore} import org.apache.livy.server.ui.UIServlet @@ -74,7 +74,7 @@ class LivyServer extends Logging { maxFileSize = Some(livyConf.getLong(LivyConf.FILE_UPLOAD_MAX_SIZE)) ).toMultipartConfigElement - setServerAddress(livyConf) + setServerUri(livyConf) // Make sure the `spark-submit` program exists, otherwise much of livy won't work. testSparkHome(livyConf) @@ -389,13 +389,14 @@ class LivyServer extends Logging { } } - private[livy] def setServerAddress(livyConf: LivyConf, address: Option[String] = None, - mockCuratorClient: Option[CuratorFramework] = None): Unit = { + private[livy] def setServerUri(livyConf: LivyConf, address: Option[URI] = None, + mockCuratorClient: Option[CuratorFramework] = None): Unit = { if (Option(livyConf.get(LIVY_ZOOKEEPER_URL)).isDefined) { - val discoveryManager = DiscoveryManager(livyConf, mockCuratorClient) + val discoveryManager = LivyDiscoveryManager(livyConf, mockCuratorClient) val host = InetAddress.getLocalHost.getHostName - discoveryManager.setAddress( - address.getOrElse(s"$host:${livyConf.getInt(LivyConf.SERVER_PORT)}")) + val localUri = new URI(s"http://$host:${livyConf.getInt(LivyConf.SERVER_PORT)}") + val severUri = address.getOrElse(localUri) + discoveryManager.setServerUri(severUri) } } diff --git a/server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala deleted file mode 100644 index 6e75fb43e..000000000 --- a/server/src/main/scala/org/apache/livy/server/discovery/DiscoveryManager.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.livy.server.discovery - -import org.apache.curator.framework.CuratorFramework - -import org.apache.livy.LivyConf - -class DiscoveryManager(livyConf: LivyConf, - mockCuratorClient: Option[CuratorFramework] = None) - extends ZooKeeperManager(livyConf, mockCuratorClient) { - - import DiscoveryManager._ - - def setAddress(address: String): Unit = { - setData(LIVY_SERVER_URL_KEY, address) - } - - def getAddress(): String = { - getData[String](LIVY_SERVER_URL_KEY).getOrElse("") - } -} - -object DiscoveryManager { - val LIVY_SERVER_URL_KEY = "server.url" - - def apply(livyConf: LivyConf, - mockCuratorClient: Option[CuratorFramework] = None): DiscoveryManager = { - new DiscoveryManager(livyConf, mockCuratorClient) - } -} diff --git a/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala new file mode 100644 index 000000000..ea31341e4 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.server.discovery + +import java.net.URI + +import org.apache.curator.framework.CuratorFramework + +import org.apache.livy.LivyConf + +/** + * Livy Server Discovery manager. + * Stores information about Livy Server location in ZooKeeper. + * The address will be stored in + * "/{@code ZK_KEY_PREFIX_CONF}/{@code LIVY_SERVER_URI_KEY}" znode by default + * e.g. "/livy/server.uri". + * Need to set {@code livy.server.zookeeper.url} to be able to get information from ZooKeeper. + * + * @param livyConf - Livy configurations + * @param mockCuratorClient - used for testing + */ +class LivyDiscoveryManager(livyConf: LivyConf, + mockCuratorClient: Option[CuratorFramework] = None) + extends ZooKeeperManager(livyConf, mockCuratorClient) { + + import LivyDiscoveryManager._ + + /** + * Save Livy Server URI to ZooKeeper. + * @param address - URI address of Livy Server + */ + def setServerUri(address: URI): Unit = { + setData(LIVY_SERVER_URI_KEY, address) + } + + /** + * Get Livy Server URI from ZooKeeper. + * @return Livy Server URI + */ + def getServerUri(): URI = { + getData[URI](LIVY_SERVER_URI_KEY).getOrElse(URI.create("")) + } +} + +object LivyDiscoveryManager { + val LIVY_SERVER_URI_KEY = "server.uri" + + def apply(livyConf: LivyConf, + mockCuratorClient: Option[CuratorFramework] = None): LivyDiscoveryManager = { + new LivyDiscoveryManager(livyConf, mockCuratorClient) + } +} diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala index ac237d52a..1cc47f6e8 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala @@ -23,7 +23,15 @@ import org.apache.curator.framework.CuratorFramework import org.apache.livy.{LivyConf, Logging} import org.apache.livy.server.discovery.ZooKeeperManager - +/** + * Implementation for Livy State Store which uses Zookeeper as backend storage. + * Set {@code livy.server.recovery.mode} to ``recovery`` + * and {@code livy.server.recovery.state-store} to ``zookeeper`` to enable ZooKeeper state store. + * Also need to set {@code livy.server.zookeeper.url} to be able to get information from ZooKeeper. + * + * @param livyConf + * @param mockCuratorClient + */ class ZooKeeperStateStore(livyConf: LivyConf, mockCuratorClient: Option[CuratorFramework] = None) // For testing extends StateStore(livyConf) with Logging { diff --git a/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala similarity index 83% rename from server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala rename to server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala index 68b6f2984..33a9c2337 100644 --- a/server/src/test/scala/org/apache/livy/server/discovery/DiscoveryManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala @@ -16,6 +16,8 @@ */ package org.apache.livy.server.discovery +import java.net.URI + import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.api.{ExistsBuilder, GetDataBuilder, SetDataBuilder, UnhandledErrorListener} import org.apache.curator.framework.listen.Listenable @@ -28,22 +30,22 @@ import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} import org.apache.livy.LivyConf.LIVY_ZOOKEEPER_URL import org.apache.livy.server.LivyServer -class DiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite +class LivyDiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite with JsonMapper { describe("DiscoveryManagerSpec") { - case class TestFixture(discoveryManager: DiscoveryManager, curatorClient: CuratorFramework) + case class TestFixture(discoveryManager: LivyDiscoveryManager, curatorClient: CuratorFramework) val conf = new LivyConf() conf.set(LivyConf.LIVY_ZOOKEEPER_URL, "host") - val key = DiscoveryManager.LIVY_SERVER_URL_KEY + val key = LivyDiscoveryManager.LIVY_SERVER_URI_KEY val prefixedKey = s"/livy/$key" - val testAddress = s"0.0.0.0:${conf.getInt(LivyConf.SERVER_PORT)}" + val testAddress = new URI(s"http://0.0.0.0:${conf.getInt(LivyConf.SERVER_PORT)}") val testData: Array[Byte] = serializeToBytes(testAddress) def withMock[R](testBody: TestFixture => R): R = { val curatorClient = mock[CuratorFramework] when(curatorClient.getUnhandledErrorListenable()) .thenReturn(mock[Listenable[UnhandledErrorListener]]) - val discoveryManager = DiscoveryManager(conf, Some(curatorClient)) + val discoveryManager = LivyDiscoveryManager(conf, Some(curatorClient)) testBody(TestFixture(discoveryManager, curatorClient)) } @@ -62,21 +64,21 @@ class DiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite val setDataBuilder = mock[SetDataBuilder] when(f.curatorClient.setData()).thenReturn(setDataBuilder) - f.discoveryManager.setAddress(testAddress) + f.discoveryManager.setServerUri(testAddress) verify(f.curatorClient).start() verify(setDataBuilder).forPath(prefixedKey, testData) } } - it("getAddress should use curatorClient") { + it("getServerUri should use curatorClient") { withMock { f => mockExistsBuilder(f.curatorClient, exists = true) val getDataBuilder = mock[GetDataBuilder] when(f.curatorClient.getData()).thenReturn(getDataBuilder) when(getDataBuilder.forPath(prefixedKey)).thenReturn(testData) - f.discoveryManager.getAddress() + f.discoveryManager.getServerUri() verify(f.curatorClient).start() verify(getDataBuilder).forPath(prefixedKey) @@ -94,7 +96,7 @@ class DiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite val setDataBuilder = mock[SetDataBuilder] when(f.curatorClient.setData()).thenReturn(setDataBuilder) - s.setServerAddress(livyConf, Some(testAddress), Some(f.curatorClient)) + s.setServerUri(livyConf, Some(testAddress), Some(f.curatorClient)) verify(setDataBuilder).forPath(prefixedKey, testData) } } @@ -104,7 +106,7 @@ class DiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite val livyConf = new LivyConf() val s = new LivyServer() - s.setServerAddress(livyConf, Some(testAddress), Some(f.curatorClient)) + s.setServerUri(livyConf, Some(testAddress), Some(f.curatorClient)) verify(f.curatorClient, never).setData() } } From a7fd7d885a8aaedb44e8a16bcc5916c6c8b4e94e Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Thu, 1 Aug 2019 19:23:42 +0300 Subject: [PATCH 06/26] scalastyle --- .../main/java/org/apache/livy/client/http/HttpClient.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index b59961af0..53dbc0bbf 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -17,8 +17,6 @@ package org.apache.livy.client.http; -import io.netty.util.concurrent.ImmediateEventExecutor; -import io.netty.util.concurrent.Promise; import java.io.File; import java.net.URI; import java.nio.ByteBuffer; @@ -32,6 +30,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; + import org.apache.livy.Job; import org.apache.livy.JobHandle; import org.apache.livy.LivyClient; From aacfd9f8ffafd4a0be5603a9d671cae8c794eecd Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Thu, 1 Aug 2019 20:51:33 +0300 Subject: [PATCH 07/26] Trigger, flaky tests From d693ea3d5d45e799d4285f660b6b11f9ce05b639 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Thu, 1 Aug 2019 21:54:31 +0300 Subject: [PATCH 08/26] Trigger, flaky tests From 729beda40db50a27e86d0f60f4a279994b415ffc Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Thu, 1 Aug 2019 22:31:19 +0300 Subject: [PATCH 09/26] Trigger, flaky tests From 27a7ffcb0568c815749d674d6f7ff84cc70340fe Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Thu, 1 Aug 2019 23:28:32 +0300 Subject: [PATCH 10/26] Trigger, flaky tests From 104f66b81b462eaffa23914bac4ef14d74aeefd3 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Fri, 2 Aug 2019 09:59:59 +0300 Subject: [PATCH 11/26] Trigger, flaky tests From 5baf435cf82ad47d77026211920cfebf3270287e Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Fri, 2 Aug 2019 12:59:44 +0300 Subject: [PATCH 12/26] make more configurable --- conf/livy.conf.template | 18 +++++++++++++-- .../main/scala/org/apache/livy/LivyConf.scala | 21 +++++++++++++++--- .../discovery/LivyDiscoveryManager.scala | 3 +-- .../server/discovery/ZooKeeperManager.scala | 19 ++++------------ .../discovery/LivyDiscoveryManagerSpec.scala | 2 +- .../recovery/ZooKeeperStateStoreSpec.scala | 22 ------------------- 6 files changed, 40 insertions(+), 45 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index d00917be7..219f67ffc 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -102,7 +102,7 @@ # previous sessions from the state store. # Must set livy.server.recovery.state-store to needed state store (filesystem or zookeeper) # Set livy.server.recovery.state-store.url for filesystem state store -# or livy.server.zookeeper.url for zookeeper state store. +# or livy.zookeeper.url for zookeeper state store. # livy.server.recovery.mode = off # Where Livy should store state to for recovery. Possible values: @@ -113,8 +113,22 @@ # For filesystem state store, the path of the state store directory. Please don't use a filesystem # that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. -# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 # livy.server.recovery.state-store.url = +# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 +# livy.zookeeper.url = + +# Livy Server discovery +# ZooKeeper quorum URLs, e.g. host1:port1,host2:port2 +# livy.zookeeper.url = +# Name of base Livy zknode. Default livy +# livy.zookeeper.namespace = livy +# Name of Livy Server zknode. Uses livy.zookeeper.namespace as parent. +# By default will be /livy/server.uri +# livy.server.zookeeper.namespace = server.uri +# Number of trials to establish the connection to ZooKeeper quorum +# livy.server.zookeeper.connection.max.retries = 3 +# Sleep time between connection retries to ZooKeeper quorum +# livy.server.zookeeper.connection.retry.interval.ms = 500 # If Livy can't find the yarn app within this time, consider it lost. # livy.server.yarn.app-lookup-timeout = 120s diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index e16291122..a0a7e845a 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -175,7 +175,7 @@ object LivyConf { * previous sessions from the state store. * Must set livy.server.recovery.state-store to needed state store (filesystem or zookeeper) * Set livy.server.recovery.state-store.url for filesystem state store - * or livy.server.zookeeper.url for zookeeper state store. + * or livy.zookeeper.url for zookeeper state store. * configure the state store. */ val RECOVERY_MODE = Entry("livy.server.recovery.mode", "off") @@ -189,7 +189,7 @@ object LivyConf { /** * For filesystem state store, the path of the state store directory. Please don't use a * filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. - * For zookeeper, use livy.server.zookeeper.url. + * For zookeeper, use livy.zookeeper.url. */ val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "") @@ -197,7 +197,22 @@ object LivyConf { * ZooKeeper address witch will be used for Livy Server discovery * and Zookeeper state store. e.g. host1:port1,host2:port2 */ - val LIVY_ZOOKEEPER_URL = Entry("livy.server.zookeeper.url", null) + val LIVY_ZOOKEEPER_URL = Entry("livy.zookeeper.url", null) + + // Name of base Livy zknode. + val LIVY_ZOOKEEPER_NAMESPACE = Entry("livy.zookeeper.namespace", "livy") + + // Number of trials to establish the connection to ZooKeeper quorum. + val LIVY_ZOOKEEPER_CONNECTION_MAX_RETRIES = + Entry("livy.server.zookeeper.connection.max.retries", 3) + + // Sleep time between connection retries to ZooKeeper quorum. + val LIVY_ZOOKEEPER_CONNECTION_RETRY_INTERVAL = + Entry("livy.server.zookeeper.connection.retry.interval.ms", 500) + + // Name of Livy Server zknode. Uses LIVY_ZOOKEEPER_NAMESPACE as parent. + // By default will be /livy/server.uri + val LIVY_SERVER_ZOOKEEPER_NAMESPACE = Entry("livy.server.zookeeper.namespace", "server.uri") // Livy will cache the max no of logs specified. 0 means don't cache the logs. val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200) diff --git a/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala index ea31341e4..c66686f17 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala @@ -37,7 +37,7 @@ class LivyDiscoveryManager(livyConf: LivyConf, mockCuratorClient: Option[CuratorFramework] = None) extends ZooKeeperManager(livyConf, mockCuratorClient) { - import LivyDiscoveryManager._ + private val LIVY_SERVER_URI_KEY = livyConf.get(LivyConf.LIVY_SERVER_ZOOKEEPER_NAMESPACE) /** * Save Livy Server URI to ZooKeeper. @@ -57,7 +57,6 @@ class LivyDiscoveryManager(livyConf: LivyConf, } object LivyDiscoveryManager { - val LIVY_SERVER_URI_KEY = "server.uri" def apply(livyConf: LivyConf, mockCuratorClient: Option[CuratorFramework] = None): LivyDiscoveryManager = { diff --git a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala index e9c6747f9..f8a11edbd 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala @@ -25,15 +25,12 @@ import org.apache.curator.retry.RetryNTimes import org.apache.zookeeper.KeeperException.NoNodeException import org.apache.livy.{LivyConf, Logging} -import org.apache.livy.LivyConf.Entry class ZooKeeperManager( livyConf: LivyConf, mockCuratorClient: Option[CuratorFramework] = None) // For testing extends JsonMapper with Logging { - import ZooKeeperManager._ - // Constructor defined for StateStore factory to new this class using reflection. def this(livyConf: LivyConf) { this(livyConf, None) @@ -41,16 +38,10 @@ class ZooKeeperManager( private val zkAddress = livyConf.get(LivyConf.LIVY_ZOOKEEPER_URL) require(Option(zkAddress).isDefined, s"Please config ${LivyConf.LIVY_ZOOKEEPER_URL.key}.") - private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF) - private val retryValue = livyConf.get(ZK_RETRY_CONF) - // a regex to match patterns like "m, n" where m and n both are integer values - private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r - private[server] val retryPolicy = retryValue match { - case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt) - case _ => throw new IllegalArgumentException( - s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " + - "Correct format is ,. e.g. 5,100") - } + private val zkKeyPrefix = livyConf.get(LivyConf.LIVY_ZOOKEEPER_NAMESPACE) + private val maxRetries = livyConf.getInt(LivyConf.LIVY_ZOOKEEPER_CONNECTION_MAX_RETRIES) + private val sleepTime = livyConf.getInt(LivyConf.LIVY_ZOOKEEPER_CONNECTION_RETRY_INTERVAL) + private val retryPolicy = new RetryNTimes(maxRetries, sleepTime) private val curatorClient = mockCuratorClient.getOrElse { CuratorFrameworkFactory.newClient(zkAddress, retryPolicy) @@ -116,8 +107,6 @@ class ZooKeeperManager( } object ZooKeeperManager { - val ZK_KEY_PREFIX_CONF = Entry("livy.server.zookeeper.key-prefix", "livy") - val ZK_RETRY_CONF = Entry("livy.server.zookeeper.retry-policy", "5,100") def apply(livyConf: LivyConf, mockCuratorClient: Option[CuratorFramework] = None): ZooKeeperManager = { diff --git a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala index 33a9c2337..69791761c 100644 --- a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala @@ -36,7 +36,7 @@ class LivyDiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite case class TestFixture(discoveryManager: LivyDiscoveryManager, curatorClient: CuratorFramework) val conf = new LivyConf() conf.set(LivyConf.LIVY_ZOOKEEPER_URL, "host") - val key = LivyDiscoveryManager.LIVY_SERVER_URI_KEY + val key = conf.get(LivyConf.LIVY_SERVER_ZOOKEEPER_NAMESPACE) val prefixedKey = s"/livy/$key" val testAddress = new URI(s"http://0.0.0.0:${conf.getInt(LivyConf.SERVER_PORT)}") val testData: Array[Byte] = serializeToBytes(testAddress) diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala index b3cad37b6..9a76e087e 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -28,7 +28,6 @@ import org.scalatest.Matchers._ import org.scalatest.mock.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} -import org.apache.livy.server.discovery.ZooKeeperManager class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { describe("ZooKeeperStateStore") { @@ -54,17 +53,6 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { } } - it("should throw on bad config") { - withMock { f => - val conf = new LivyConf() - intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf, Some(f.curatorClient)) } - - conf.set(LivyConf.LIVY_ZOOKEEPER_URL, "host") - conf.set(ZooKeeperManager.ZK_RETRY_CONF, "bad") - intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf, Some(f.curatorClient)) } - } - } - it("set should use curatorClient") { withMock { f => mockExistsBuilder(f.curatorClient, true) @@ -95,16 +83,6 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { } } - it("get should retrieve retry policy configs") { - conf.set(ZooKeeperManager.ZK_RETRY_CONF, "11,77") - withMock { f => - mockExistsBuilder(f.curatorClient, true) - - f.stateStore.zooKeeperManager.retryPolicy should not be null - f.stateStore.zooKeeperManager.retryPolicy.getN shouldBe 11 - } - } - it("get should retrieve data from curatorClient") { withMock { f => mockExistsBuilder(f.curatorClient, true) From 5d7ea289f41328f572adf3375e4b583881378acc Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Fri, 2 Aug 2019 13:04:41 +0300 Subject: [PATCH 13/26] fix typo --- conf/livy.conf.template | 4 ++-- server/src/main/scala/org/apache/livy/LivyConf.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 219f67ffc..4060e1b10 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -120,9 +120,9 @@ # Livy Server discovery # ZooKeeper quorum URLs, e.g. host1:port1,host2:port2 # livy.zookeeper.url = -# Name of base Livy zknode. Default livy +# Name of base Livy znode. Default livy # livy.zookeeper.namespace = livy -# Name of Livy Server zknode. Uses livy.zookeeper.namespace as parent. +# Name of Livy Server znode. Uses livy.zookeeper.namespace as parent. # By default will be /livy/server.uri # livy.server.zookeeper.namespace = server.uri # Number of trials to establish the connection to ZooKeeper quorum diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index a0a7e845a..62509b06f 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -199,7 +199,7 @@ object LivyConf { */ val LIVY_ZOOKEEPER_URL = Entry("livy.zookeeper.url", null) - // Name of base Livy zknode. + // Name of base Livy znode. val LIVY_ZOOKEEPER_NAMESPACE = Entry("livy.zookeeper.namespace", "livy") // Number of trials to establish the connection to ZooKeeper quorum. @@ -210,7 +210,7 @@ object LivyConf { val LIVY_ZOOKEEPER_CONNECTION_RETRY_INTERVAL = Entry("livy.server.zookeeper.connection.retry.interval.ms", 500) - // Name of Livy Server zknode. Uses LIVY_ZOOKEEPER_NAMESPACE as parent. + // Name of Livy Server znode. Uses LIVY_ZOOKEEPER_NAMESPACE as parent. // By default will be /livy/server.uri val LIVY_SERVER_ZOOKEEPER_NAMESPACE = Entry("livy.server.zookeeper.namespace", "server.uri") From 728deb93517a8a7e1bdc4df5d03abd0b5541c7c1 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Fri, 2 Aug 2019 14:26:17 +0300 Subject: [PATCH 14/26] Trigger, flaky tests From a71c0efab3f4ce0e4de9e9ba0490e6275c349f57 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Fri, 2 Aug 2019 18:10:42 +0300 Subject: [PATCH 15/26] honor livy.server.host --- .../org/apache/livy/server/LivyServer.scala | 18 +++++++++++++----- .../discovery/LivyDiscoveryManagerSpec.scala | 4 ++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 32765065e..d9f4e9fcc 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -389,14 +389,22 @@ class LivyServer extends Logging { } } - private[livy] def setServerUri(livyConf: LivyConf, address: Option[URI] = None, + private[livy] def setServerUri(livyConf: LivyConf, mockCuratorClient: Option[CuratorFramework] = None): Unit = { if (Option(livyConf.get(LIVY_ZOOKEEPER_URL)).isDefined) { val discoveryManager = LivyDiscoveryManager(livyConf, mockCuratorClient) - val host = InetAddress.getLocalHost.getHostName - val localUri = new URI(s"http://$host:${livyConf.getInt(LivyConf.SERVER_PORT)}") - val severUri = address.getOrElse(localUri) - discoveryManager.setServerUri(severUri) + val host = resolvedSeverHost(livyConf) + val uri = new URI(s"http://$host:${livyConf.getInt(LivyConf.SERVER_PORT)}") + discoveryManager.setServerUri(uri) + } + } + + private[livy] def resolvedSeverHost(livyConf: LivyConf) = { + val host = livyConf.get(LivyConf.SERVER_HOST) + if (host.equals(livyConf.get(LivyConf.SERVER_HOST.dflt.toString))) { + InetAddress.getLocalHost.getHostName + } else { + host } } diff --git a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala index 69791761c..070055f98 100644 --- a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala @@ -96,7 +96,7 @@ class LivyDiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite val setDataBuilder = mock[SetDataBuilder] when(f.curatorClient.setData()).thenReturn(setDataBuilder) - s.setServerUri(livyConf, Some(testAddress), Some(f.curatorClient)) + s.setServerUri(livyConf, Some(f.curatorClient)) verify(setDataBuilder).forPath(prefixedKey, testData) } } @@ -106,7 +106,7 @@ class LivyDiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite val livyConf = new LivyConf() val s = new LivyServer() - s.setServerUri(livyConf, Some(testAddress), Some(f.curatorClient)) + s.setServerUri(livyConf, Some(f.curatorClient)) verify(f.curatorClient, never).setData() } } From 1d0add4289ce46e0329d3edb0257805114ed3e59 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Fri, 2 Aug 2019 18:29:42 +0300 Subject: [PATCH 16/26] remove package access --- server/src/main/scala/org/apache/livy/server/LivyServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index d9f4e9fcc..8704e71b4 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -399,7 +399,7 @@ class LivyServer extends Logging { } } - private[livy] def resolvedSeverHost(livyConf: LivyConf) = { + private def resolvedSeverHost(livyConf: LivyConf) = { val host = livyConf.get(LivyConf.SERVER_HOST) if (host.equals(livyConf.get(LivyConf.SERVER_HOST.dflt.toString))) { InetAddress.getLocalHost.getHostName From 7e570109da7cd47fff9f6f3442e9e4beee3cc629 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Wed, 14 Aug 2019 11:03:55 +0300 Subject: [PATCH 17/26] update scaladoc --- api/src/main/java/org/apache/livy/LivyClientBuilder.java | 2 +- conf/livy.conf.template | 2 +- server/src/main/scala/org/apache/livy/LivyConf.scala | 2 +- .../apache/livy/server/discovery/LivyDiscoveryManager.scala | 6 +++--- .../apache/livy/server/recovery/ZooKeeperStateStore.scala | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/api/src/main/java/org/apache/livy/LivyClientBuilder.java b/api/src/main/java/org/apache/livy/LivyClientBuilder.java index 3081ce22f..94aaf8cda 100644 --- a/api/src/main/java/org/apache/livy/LivyClientBuilder.java +++ b/api/src/main/java/org/apache/livy/LivyClientBuilder.java @@ -84,7 +84,7 @@ public LivyClientBuilder(boolean loadDefaults) throws IOException { /** * Set Livy Server URI. * This is possible to set it manually or get URI from LivyDiscoveryManager - * ({@code livy.server.zookeeper.url} should be configured). + * ({@code livy.zookeeper.url} should be configured). * * @param uri Livy Server URI * @return this builder diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 4060e1b10..f1550502b 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -123,7 +123,7 @@ # Name of base Livy znode. Default livy # livy.zookeeper.namespace = livy # Name of Livy Server znode. Uses livy.zookeeper.namespace as parent. -# By default will be /livy/server.uri +# By default, the full path to znode is /livy/server.uri # livy.server.zookeeper.namespace = server.uri # Number of trials to establish the connection to ZooKeeper quorum # livy.server.zookeeper.connection.max.retries = 3 diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 62509b06f..b74844027 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -211,7 +211,7 @@ object LivyConf { Entry("livy.server.zookeeper.connection.retry.interval.ms", 500) // Name of Livy Server znode. Uses LIVY_ZOOKEEPER_NAMESPACE as parent. - // By default will be /livy/server.uri + // By default, the full path to znode is /livy/server.uri val LIVY_SERVER_ZOOKEEPER_NAMESPACE = Entry("livy.server.zookeeper.namespace", "server.uri") // Livy will cache the max no of logs specified. 0 means don't cache the logs. diff --git a/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala index c66686f17..af62d3526 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala @@ -26,9 +26,9 @@ import org.apache.livy.LivyConf * Livy Server Discovery manager. * Stores information about Livy Server location in ZooKeeper. * The address will be stored in - * "/{@code ZK_KEY_PREFIX_CONF}/{@code LIVY_SERVER_URI_KEY}" znode by default - * e.g. "/livy/server.uri". - * Need to set {@code livy.server.zookeeper.url} to be able to get information from ZooKeeper. + * "/{@code LIVY_ZOOKEEPER_NAMESPACE}/{@code LIVY_SERVER_ZOOKEEPER_NAMESPACE}" znode + * By default, the full path to znode is /livy/server.uri. + * Need to set {@code livy.zookeeper.url} to be able to get information from ZooKeeper. * * @param livyConf - Livy configurations * @param mockCuratorClient - used for testing diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala index 1cc47f6e8..79785d6d5 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala @@ -27,7 +27,7 @@ import org.apache.livy.server.discovery.ZooKeeperManager * Implementation for Livy State Store which uses Zookeeper as backend storage. * Set {@code livy.server.recovery.mode} to ``recovery`` * and {@code livy.server.recovery.state-store} to ``zookeeper`` to enable ZooKeeper state store. - * Also need to set {@code livy.server.zookeeper.url} to be able to get information from ZooKeeper. + * Also need to set {@code livy.zookeeper.url} to be able to get information from ZooKeeper. * * @param livyConf * @param mockCuratorClient From 627e4551daa81d248e0f02c4a27fa419efd52892 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Wed, 14 Aug 2019 11:54:30 +0300 Subject: [PATCH 18/26] Trigger, flaky tests From df5a3955d8a871fb5601e224b00fc843466ac6cb Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Wed, 14 Aug 2019 12:25:45 +0300 Subject: [PATCH 19/26] refactored, changed class ZooKeeperManager to trait to be able to add as mixin it instead of creating as field --- .../discovery/LivyDiscoveryManager.scala | 6 +++--- .../server/discovery/ZooKeeperManager.scala | 19 +++---------------- .../server/recovery/BlackholeStateStore.scala | 2 +- .../recovery/FileSystemStateStore.scala | 2 +- .../livy/server/recovery/SessionStore.scala | 2 +- .../livy/server/recovery/StateStore.scala | 2 +- .../server/recovery/ZooKeeperStateStore.scala | 18 ++++++++---------- .../recovery/BlackholeStateStoreSpec.scala | 4 ++-- .../recovery/FileSystemStateStoreSpec.scala | 10 +++++----- .../server/recovery/SessionStoreSpec.scala | 4 ++-- .../recovery/ZooKeeperStateStoreSpec.scala | 8 ++++---- 11 files changed, 31 insertions(+), 46 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala index af62d3526..38bf1a034 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala @@ -33,9 +33,9 @@ import org.apache.livy.LivyConf * @param livyConf - Livy configurations * @param mockCuratorClient - used for testing */ -class LivyDiscoveryManager(livyConf: LivyConf, - mockCuratorClient: Option[CuratorFramework] = None) - extends ZooKeeperManager(livyConf, mockCuratorClient) { +class LivyDiscoveryManager(val livyConf: LivyConf, + val mockCuratorClient: Option[CuratorFramework] = None) + extends ZooKeeperManager { private val LIVY_SERVER_URI_KEY = livyConf.get(LivyConf.LIVY_SERVER_ZOOKEEPER_NAMESPACE) diff --git a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala index f8a11edbd..dc44230cb 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala @@ -26,15 +26,10 @@ import org.apache.zookeeper.KeeperException.NoNodeException import org.apache.livy.{LivyConf, Logging} -class ZooKeeperManager( - livyConf: LivyConf, - mockCuratorClient: Option[CuratorFramework] = None) // For testing - extends JsonMapper with Logging { +trait ZooKeeperManager extends JsonMapper with Logging { - // Constructor defined for StateStore factory to new this class using reflection. - def this(livyConf: LivyConf) { - this(livyConf, None) - } + val livyConf: LivyConf + val mockCuratorClient: Option[CuratorFramework] private val zkAddress = livyConf.get(LivyConf.LIVY_ZOOKEEPER_URL) require(Option(zkAddress).isDefined, s"Please config ${LivyConf.LIVY_ZOOKEEPER_URL.key}.") @@ -105,11 +100,3 @@ class ZooKeeperManager( private def prefixKey(key: String) = s"/$zkKeyPrefix/$key" } - -object ZooKeeperManager { - - def apply(livyConf: LivyConf, - mockCuratorClient: Option[CuratorFramework] = None): ZooKeeperManager = { - new ZooKeeperManager(livyConf, mockCuratorClient) - } -} diff --git a/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala index df9a712a8..3e5475293 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala @@ -30,7 +30,7 @@ class BlackholeStateStore(livyConf: LivyConf) extends StateStore(livyConf) { def get[T: ClassTag](key: String): Option[T] = None - def getChildren(key: String): Seq[String] = List.empty[String] + def getChildrenNodes(key: String): Seq[String] = List.empty[String] def remove(key: String): Unit = {} } diff --git a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala index d5f8f3dc6..d33f9fc57 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala @@ -113,7 +113,7 @@ class FileSystemStateStore( } } - override def getChildren(key: String): Seq[String] = { + override def getChildrenNodes(key: String): Seq[String] = { try { fileContext.util.listStatus(absPath(key)).map(_.getPath.getName) } catch { diff --git a/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala index 04292957c..136b9a674 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala @@ -54,7 +54,7 @@ class SessionStore( * Return all sessions stored in the store with specified session type. */ def getAllSessions[T <: RecoveryMetadata : ClassTag](sessionType: String): Seq[Try[T]] = { - store.getChildren(sessionPath(sessionType)) + store.getChildrenNodes(sessionPath(sessionType)) .flatMap { c => Try(c.toInt).toOption } // Ignore all non numerical keys .flatMap { id => val p = sessionPath(sessionType, id) diff --git a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala index d338a1cc1..046db93c7 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala @@ -49,7 +49,7 @@ abstract class StateStore(livyConf: LivyConf) extends JsonMapper { * @return List of names of the direct children of the key. * Empty list if the key doesn't exist or have no child. */ - def getChildren(key: String): Seq[String] + def getChildrenNodes(key: String): Seq[String] /** * Remove the key from this state store. Does not throw if the key doesn't exist. diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala index 79785d6d5..46b940919 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala @@ -32,11 +32,9 @@ import org.apache.livy.server.discovery.ZooKeeperManager * @param livyConf * @param mockCuratorClient */ -class ZooKeeperStateStore(livyConf: LivyConf, - mockCuratorClient: Option[CuratorFramework] = None) // For testing - extends StateStore(livyConf) with Logging { - - val zooKeeperManager = ZooKeeperManager(livyConf, mockCuratorClient) +class ZooKeeperStateStore(val livyConf: LivyConf, + val mockCuratorClient: Option[CuratorFramework] = None) // For testing + extends StateStore(livyConf) with ZooKeeperManager with Logging { // Constructor defined for StateStore factory to new this class using reflection. def this(livyConf: LivyConf) { @@ -44,18 +42,18 @@ class ZooKeeperStateStore(livyConf: LivyConf, } override def set(key: String, value: Object): Unit = { - zooKeeperManager.setData(key, value) + setData(key, value) } override def get[T: ClassTag](key: String): Option[T] = { - zooKeeperManager.getData[T](key) + getData[T](key) } - override def getChildren(key: String): Seq[String] = { - zooKeeperManager.getChildren(key) + override def getChildrenNodes(key: String): Seq[String] = { + getChildren(key) } override def remove(key: String): Unit = { - zooKeeperManager.delete(key) + delete(key) } } diff --git a/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala index 8ee448f5e..fee8a72e5 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala @@ -36,8 +36,8 @@ class BlackholeStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { v shouldBe None } - it("getChildren should return empty list") { - val c = stateStore.getChildren("") + it("getChildrenNodes should return empty list") { + val c = stateStore.getChildrenNodes("") c shouldBe empty } diff --git a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala index 4758c854f..1dc084ee5 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala @@ -141,7 +141,7 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { stateStore.get[String]("key") shouldBe None } - it("getChildren should list file") { + it("getChildrenNodes should list file") { val parentPath = "path" def makeFileStatus(name: String): FileStatus = { val fs = new FileStatus() @@ -157,7 +157,7 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { when(fileContext.util()).thenReturn(util) val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) - stateStore.getChildren(parentPath) should contain theSameElementsAs children + stateStore.getChildrenNodes(parentPath) should contain theSameElementsAs children } def getChildrenErrorTest(error: Exception): Unit = { @@ -169,14 +169,14 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { when(fileContext.util()).thenReturn(util) val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) - stateStore.getChildren(parentPath) shouldBe empty + stateStore.getChildrenNodes(parentPath) shouldBe empty } - it("getChildren should return empty list if the key doesn't exist") { + it("getChildrenNodes should return empty list if the key doesn't exist") { getChildrenErrorTest(new IOException("Unit test")) } - it("getChildren should return empty list if key doesn't exist") { + it("getChildrenNodes should return empty list if key doesn't exist") { getChildrenErrorTest(new FileNotFoundException("Unit test")) } diff --git a/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala index 5eeb2cfc3..3baa8e9cc 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala @@ -56,7 +56,7 @@ class SessionStoreSpec extends FunSpec with LivyBaseUnitTestSuite { ) val stateStore = mock[StateStore] val sessionStore = new SessionStore(conf, stateStore) - when(stateStore.getChildren(sessionPath)) + when(stateStore.getChildrenNodes(sessionPath)) .thenReturn((validMetadata ++ corruptedMetadata).keys.toList) validMetadata.foreach { case (id, m) => @@ -78,7 +78,7 @@ class SessionStoreSpec extends FunSpec with LivyBaseUnitTestSuite { it("should not throw if the state store is empty") { val stateStore = mock[StateStore] val sessionStore = new SessionStore(conf, stateStore) - when(stateStore.getChildren(sessionPath)).thenReturn(Seq.empty) + when(stateStore.getChildrenNodes(sessionPath)).thenReturn(Seq.empty) val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType) s.filter(_.isSuccess) shouldBe empty diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala index 9a76e087e..217e166c7 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -109,7 +109,7 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { } } - it("getChildren should use curatorClient") { + it("getChildrenNodes should use curatorClient") { withMock { f => mockExistsBuilder(f.curatorClient, true) @@ -118,18 +118,18 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { val children = List("abc", "def") when(getChildrenBuilder.forPath(prefixedKey)).thenReturn(children.asJava) - val c = f.stateStore.getChildren("key") + val c = f.stateStore.getChildrenNodes("key") verify(f.curatorClient).start() c shouldBe children } } - it("getChildren should return empty list if key doesn't exist") { + it("getChildrenNodes should return empty list if key doesn't exist") { withMock { f => mockExistsBuilder(f.curatorClient, false) - val c = f.stateStore.getChildren("key") + val c = f.stateStore.getChildrenNodes("key") verify(f.curatorClient).start() c shouldBe empty From 18c10eda2c15b8ae05e9fc8a02126b7874a4fd9e Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Wed, 14 Aug 2019 12:34:54 +0300 Subject: [PATCH 20/26] Logging already mixed to ZooKeeperManager --- .../org/apache/livy/server/recovery/ZooKeeperStateStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala index 46b940919..a042a0c2e 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala @@ -34,7 +34,7 @@ import org.apache.livy.server.discovery.ZooKeeperManager */ class ZooKeeperStateStore(val livyConf: LivyConf, val mockCuratorClient: Option[CuratorFramework] = None) // For testing - extends StateStore(livyConf) with ZooKeeperManager with Logging { + extends StateStore(livyConf) with ZooKeeperManager { // Constructor defined for StateStore factory to new this class using reflection. def this(livyConf: LivyConf) { From 7d655fc4d37fda75f7e7844512bd4d8f9fc97706 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Mon, 2 Sep 2019 12:19:46 +0300 Subject: [PATCH 21/26] fixed problem with host resolving --- server/src/main/scala/org/apache/livy/server/LivyServer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 8704e71b4..cee7d91a8 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -401,8 +401,8 @@ class LivyServer extends Logging { private def resolvedSeverHost(livyConf: LivyConf) = { val host = livyConf.get(LivyConf.SERVER_HOST) - if (host.equals(livyConf.get(LivyConf.SERVER_HOST.dflt.toString))) { - InetAddress.getLocalHost.getHostName + if (host.equals(LivyConf.SERVER_HOST.dflt.toString)) { + InetAddress.getLocalHost.getHostAddress } else { host } From e8dc3508bc531f3f75b377f01169dc52c50577d2 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Mon, 2 Sep 2019 13:15:14 +0300 Subject: [PATCH 22/26] fixed test --- .../livy/server/discovery/LivyDiscoveryManagerSpec.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala index 070055f98..8dc7eb27c 100644 --- a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala @@ -16,7 +16,7 @@ */ package org.apache.livy.server.discovery -import java.net.URI +import java.net.{InetAddress, URI} import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.api.{ExistsBuilder, GetDataBuilder, SetDataBuilder, UnhandledErrorListener} @@ -25,7 +25,6 @@ import org.apache.zookeeper.data.Stat import org.mockito.Mockito.{never, verify, when} import org.scalatest.FunSpec import org.scalatest.mock.MockitoSugar.mock - import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} import org.apache.livy.LivyConf.LIVY_ZOOKEEPER_URL import org.apache.livy.server.LivyServer @@ -38,7 +37,7 @@ class LivyDiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite conf.set(LivyConf.LIVY_ZOOKEEPER_URL, "host") val key = conf.get(LivyConf.LIVY_SERVER_ZOOKEEPER_NAMESPACE) val prefixedKey = s"/livy/$key" - val testAddress = new URI(s"http://0.0.0.0:${conf.getInt(LivyConf.SERVER_PORT)}") + val testAddress = new URI(s"http://${InetAddress.getLocalHost.getHostAddress}:${conf.getInt(LivyConf.SERVER_PORT)}") val testData: Array[Byte] = serializeToBytes(testAddress) def withMock[R](testBody: TestFixture => R): R = { From 5e88b2db0a31a21ca3a604de9abbcac74077e616 Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Mon, 2 Sep 2019 13:15:35 +0300 Subject: [PATCH 23/26] log message improved --- .../org/apache/livy/server/discovery/ZooKeeperManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala index dc44230cb..e87fc42d1 100644 --- a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala +++ b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala @@ -32,7 +32,7 @@ trait ZooKeeperManager extends JsonMapper with Logging { val mockCuratorClient: Option[CuratorFramework] private val zkAddress = livyConf.get(LivyConf.LIVY_ZOOKEEPER_URL) - require(Option(zkAddress).isDefined, s"Please config ${LivyConf.LIVY_ZOOKEEPER_URL.key}.") + require(Option(zkAddress).isDefined, s"Please configure ${LivyConf.LIVY_ZOOKEEPER_URL.key}.") private val zkKeyPrefix = livyConf.get(LivyConf.LIVY_ZOOKEEPER_NAMESPACE) private val maxRetries = livyConf.getInt(LivyConf.LIVY_ZOOKEEPER_CONNECTION_MAX_RETRIES) private val sleepTime = livyConf.getInt(LivyConf.LIVY_ZOOKEEPER_CONNECTION_RETRY_INTERVAL) From 47d88520ebb77c40f4026213e646b692c462160b Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Mon, 2 Sep 2019 13:58:34 +0300 Subject: [PATCH 24/26] fix scalastyle --- .../livy/server/discovery/LivyDiscoveryManagerSpec.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala index 8dc7eb27c..c4a67d0cc 100644 --- a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala @@ -37,7 +37,8 @@ class LivyDiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite conf.set(LivyConf.LIVY_ZOOKEEPER_URL, "host") val key = conf.get(LivyConf.LIVY_SERVER_ZOOKEEPER_NAMESPACE) val prefixedKey = s"/livy/$key" - val testAddress = new URI(s"http://${InetAddress.getLocalHost.getHostAddress}:${conf.getInt(LivyConf.SERVER_PORT)}") + val ipAddress = InetAddress.getLocalHost.getHostAddress + val testAddress = new URI(s"http://${ipAddress}:${conf.getInt(LivyConf.SERVER_PORT)}") val testData: Array[Byte] = serializeToBytes(testAddress) def withMock[R](testBody: TestFixture => R): R = { From 1250fea414d8f603f5f388b63cbdf87691c800ad Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Mon, 2 Sep 2019 13:59:15 +0300 Subject: [PATCH 25/26] fix scalastyle --- .../apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala index c4a67d0cc..2c2887972 100644 --- a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala @@ -25,6 +25,7 @@ import org.apache.zookeeper.data.Stat import org.mockito.Mockito.{never, verify, when} import org.scalatest.FunSpec import org.scalatest.mock.MockitoSugar.mock + import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} import org.apache.livy.LivyConf.LIVY_ZOOKEEPER_URL import org.apache.livy.server.LivyServer From 5c47b096f71e30198c1eff54232d8ae165e026cf Mon Sep 17 00:00:00 2001 From: Oleksandr Shevchenko Date: Mon, 2 Sep 2019 14:47:07 +0300 Subject: [PATCH 26/26] Trigger, flaky