diff --git a/api/src/main/java/org/apache/livy/LivyClient.java b/api/src/main/java/org/apache/livy/LivyClient.java index fc03a1f7a..6f47063d9 100644 --- a/api/src/main/java/org/apache/livy/LivyClient.java +++ b/api/src/main/java/org/apache/livy/LivyClient.java @@ -107,4 +107,11 @@ public interface LivyClient { */ Future addFile(URI uri); + /** + * Get Livy Server URI. + * URI will be propagated from LivyClientBuilder during creating LivyClient. + * + * @return A future with Livy Server URI + */ + Future getServerUri(); } diff --git a/api/src/main/java/org/apache/livy/LivyClientBuilder.java b/api/src/main/java/org/apache/livy/LivyClientBuilder.java index 5bbd17025..94aaf8cda 100644 --- a/api/src/main/java/org/apache/livy/LivyClientBuilder.java +++ b/api/src/main/java/org/apache/livy/LivyClientBuilder.java @@ -81,6 +81,14 @@ public LivyClientBuilder(boolean loadDefaults) throws IOException { } } + /** + * Set Livy Server URI. + * This is possible to set it manually or get URI from LivyDiscoveryManager + * ({@code livy.zookeeper.url} should be configured). + * + * @param uri Livy Server URI + * @return this builder + */ public LivyClientBuilder setURI(URI uri) { config.setProperty(LIVY_URI_KEY, uri.toString()); return this; diff --git a/api/src/test/java/org/apache/livy/TestClientFactory.java b/api/src/test/java/org/apache/livy/TestClientFactory.java index 89edeec5e..9bdd119da 100644 --- a/api/src/test/java/org/apache/livy/TestClientFactory.java +++ b/api/src/test/java/org/apache/livy/TestClientFactory.java @@ -81,6 +81,11 @@ public Future addFile(URI uri) { throw new UnsupportedOperationException(); } + @Override + public Future getServerUri() { + throw new UnsupportedOperationException(); + } + } } diff --git a/client-http/pom.xml b/client-http/pom.xml index 8401950c3..16e98e63e 100644 --- a/client-http/pom.xml +++ b/client-http/pom.xml @@ -78,6 +78,11 @@ httpmime 4.5.1 + + io.netty + netty-all + ${netty.version} + org.apache.livy diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index f40148f94..53dbc0bbf 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -30,6 +30,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; + import org.apache.livy.Job; import org.apache.livy.JobHandle; import org.apache.livy.LivyClient; @@ -47,12 +50,14 @@ class HttpClient implements LivyClient { private final int sessionId; private final ScheduledExecutorService executor; private final Serializer serializer; + private final Promise serverUriPromise; private boolean stopped; HttpClient(URI uri, HttpConf httpConf) { this.config = httpConf; this.stopped = false; + this.serverUriPromise = ImmediateEventExecutor.INSTANCE.newPromise(); // If the given URI looks like it refers to an existing session, then try to connect to // an existing session. Note this means that any Spark configuration in httpConf will be @@ -77,6 +82,7 @@ class HttpClient implements LivyClient { ClientMessage create = new CreateClientRequest(sessionConf); this.conn = new LivyConnection(uri, httpConf); this.sessionId = conn.post(create, SessionInfo.class, "/").id; + serverUriPromise.setSuccess(uri); } } catch (Exception e) { throw propagate(e); @@ -145,6 +151,11 @@ public Future addFile(URI uri) { return addResource("add-file", uri); } + @Override + public Future getServerUri() { + return serverUriPromise; + } + private Future uploadResource(final File file, final String command, final String paramName) { Callable task = new Callable() { @Override diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala index f53d9f5b4..e55bc0d9b 100644 --- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala @@ -98,6 +98,10 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni testJob(false) } + withClient("should get Livy Server URI") { + assume(Option(client.getServerUri.get()).isDefined) + } + withClient("should propagate errors from jobs") { val errorMessage = "This job throws an error." val (jobId, handle) = runJob(false, { id => Seq( diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 2590e870e..f1550502b 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -100,8 +100,9 @@ # off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions. # recovery: Livy persists session info to the state store. When Livy restarts, it recovers # previous sessions from the state store. -# Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to -# configure the state store. +# Must set livy.server.recovery.state-store to needed state store (filesystem or zookeeper) +# Set livy.server.recovery.state-store.url for filesystem state store +# or livy.zookeeper.url for zookeeper state store. # livy.server.recovery.mode = off # Where Livy should store state to for recovery. Possible values: @@ -112,8 +113,22 @@ # For filesystem state store, the path of the state store directory. Please don't use a filesystem # that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. -# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 # livy.server.recovery.state-store.url = +# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 +# livy.zookeeper.url = + +# Livy Server discovery +# ZooKeeper quorum URLs, e.g. host1:port1,host2:port2 +# livy.zookeeper.url = +# Name of base Livy znode. Default livy +# livy.zookeeper.namespace = livy +# Name of Livy Server znode. Uses livy.zookeeper.namespace as parent. +# By default, the full path to znode is /livy/server.uri +# livy.server.zookeeper.namespace = server.uri +# Number of trials to establish the connection to ZooKeeper quorum +# livy.server.zookeeper.connection.max.retries = 3 +# Sleep time between connection retries to ZooKeeper quorum +# livy.server.zookeeper.connection.retry.interval.ms = 500 # If Livy can't find the yarn app within this time, consider it lost. # livy.server.yarn.app-lookup-timeout = 120s diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index 77d45c71f..413c8c51a 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -192,6 +192,7 @@ public void onFailure(Throwable error) throws Exception { return promise; } + @Override public Future getServerUri() { return serverUriPromise; } diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala b/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala index ca7199a99..d0521f74a 100644 --- a/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala +++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala @@ -142,6 +142,13 @@ class LivyScalaClient(livyJavaClient: LivyClient) { */ def addFile(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addFile(uri)).poll() + /** + * Get Livy Server URI. + * + * @return A future with Livy Server URI + */ + def getServerUri: Future[URI] = new PollingContainer(livyJavaClient.getServerUri).poll() + private class PollingContainer[T] private[livy] (jFuture: JFuture[T]) extends Runnable { private val initialDelay = 1 diff --git a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala index a716f5856..fe7675439 100644 --- a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala +++ b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala @@ -117,6 +117,13 @@ class ScalaClientTest extends FunSuite ScalaClientTestUtils.assertTestPassed(sFuture, "test file") } + test("test get uri") { + configureClient(true) + val getUriFuture = client.getServerUri + val uri = Await.result(getUriFuture, ScalaClientTestUtils.Timeout second) + assert(Option(uri).isDefined) + } + test("test add jar") { configureClient(true) val jar = File.createTempFile("test", ".resource") diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 32b3522d5..b74844027 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -173,7 +173,9 @@ object LivyConf { * off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions. * recovery: Livy persists session info to the state store. When Livy restarts, it recovers * previous sessions from the state store. - * Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to + * Must set livy.server.recovery.state-store to needed state store (filesystem or zookeeper) + * Set livy.server.recovery.state-store.url for filesystem state store + * or livy.zookeeper.url for zookeeper state store. * configure the state store. */ val RECOVERY_MODE = Entry("livy.server.recovery.mode", "off") @@ -187,10 +189,31 @@ object LivyConf { /** * For filesystem state store, the path of the state store directory. Please don't use a * filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. - * For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 + * For zookeeper, use livy.zookeeper.url. */ val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "") + /** + * ZooKeeper address witch will be used for Livy Server discovery + * and Zookeeper state store. e.g. host1:port1,host2:port2 + */ + val LIVY_ZOOKEEPER_URL = Entry("livy.zookeeper.url", null) + + // Name of base Livy znode. + val LIVY_ZOOKEEPER_NAMESPACE = Entry("livy.zookeeper.namespace", "livy") + + // Number of trials to establish the connection to ZooKeeper quorum. + val LIVY_ZOOKEEPER_CONNECTION_MAX_RETRIES = + Entry("livy.server.zookeeper.connection.max.retries", 3) + + // Sleep time between connection retries to ZooKeeper quorum. + val LIVY_ZOOKEEPER_CONNECTION_RETRY_INTERVAL = + Entry("livy.server.zookeeper.connection.retry.interval.ms", 500) + + // Name of Livy Server znode. Uses LIVY_ZOOKEEPER_NAMESPACE as parent. + // By default, the full path to znode is /livy/server.uri + val LIVY_SERVER_ZOOKEEPER_NAMESPACE = Entry("livy.server.zookeeper.namespace", "server.uri") + // Livy will cache the max no of logs specified. 0 means don't cache the logs. val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 60f3961dc..cee7d91a8 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -18,15 +18,16 @@ package org.apache.livy.server import java.io.{BufferedInputStream, InputStream} -import java.net.InetAddress -import java.util.concurrent._ +import java.net.{InetAddress, URI} import java.util.EnumSet +import java.util.concurrent._ import javax.servlet._ import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future +import org.apache.curator.framework.CuratorFramework import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation} import org.apache.hadoop.security.authentication.server._ import org.eclipse.jetty.servlet.FilterHolder @@ -37,6 +38,7 @@ import org.scalatra.servlet.{MultipartConfig, ServletApiImplicits} import org.apache.livy._ import org.apache.livy.server.batch.BatchSessionServlet +import org.apache.livy.server.discovery.LivyDiscoveryManager import org.apache.livy.server.interactive.InteractiveSessionServlet import org.apache.livy.server.recovery.{SessionStore, StateStore} import org.apache.livy.server.ui.UIServlet @@ -72,6 +74,8 @@ class LivyServer extends Logging { maxFileSize = Some(livyConf.getLong(LivyConf.FILE_UPLOAD_MAX_SIZE)) ).toMultipartConfigElement + setServerUri(livyConf) + // Make sure the `spark-submit` program exists, otherwise much of livy won't work. testSparkHome(livyConf) @@ -385,6 +389,25 @@ class LivyServer extends Logging { } } + private[livy] def setServerUri(livyConf: LivyConf, + mockCuratorClient: Option[CuratorFramework] = None): Unit = { + if (Option(livyConf.get(LIVY_ZOOKEEPER_URL)).isDefined) { + val discoveryManager = LivyDiscoveryManager(livyConf, mockCuratorClient) + val host = resolvedSeverHost(livyConf) + val uri = new URI(s"http://$host:${livyConf.getInt(LivyConf.SERVER_PORT)}") + discoveryManager.setServerUri(uri) + } + } + + private def resolvedSeverHost(livyConf: LivyConf) = { + val host = livyConf.get(LivyConf.SERVER_HOST) + if (host.equals(LivyConf.SERVER_HOST.dflt.toString)) { + InetAddress.getLocalHost.getHostAddress + } else { + host + } + } + private[livy] def testRecovery(livyConf: LivyConf): Unit = { if (!livyConf.isRunningOnYarn()) { // If recovery is turned on but we are not running on YARN, quit. diff --git a/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala b/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala new file mode 100644 index 000000000..e509bf5e6 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/discovery/JsonMapper.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.server.discovery + +import scala.reflect.{classTag, ClassTag} + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.livy.sessions.SessionKindModule + +protected[server] trait JsonMapper { + protected val mapper = new ObjectMapper() + .registerModule(DefaultScalaModule) + .registerModule(new SessionKindModule()) + + def serializeToBytes(value: Object): Array[Byte] = mapper.writeValueAsBytes(value) + + def deserialize[T: ClassTag](json: Array[Byte]): T = + mapper.readValue(json, classTag[T].runtimeClass.asInstanceOf[Class[T]]) +} diff --git a/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala new file mode 100644 index 000000000..38bf1a034 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/discovery/LivyDiscoveryManager.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.server.discovery + +import java.net.URI + +import org.apache.curator.framework.CuratorFramework + +import org.apache.livy.LivyConf + +/** + * Livy Server Discovery manager. + * Stores information about Livy Server location in ZooKeeper. + * The address will be stored in + * "/{@code LIVY_ZOOKEEPER_NAMESPACE}/{@code LIVY_SERVER_ZOOKEEPER_NAMESPACE}" znode + * By default, the full path to znode is /livy/server.uri. + * Need to set {@code livy.zookeeper.url} to be able to get information from ZooKeeper. + * + * @param livyConf - Livy configurations + * @param mockCuratorClient - used for testing + */ +class LivyDiscoveryManager(val livyConf: LivyConf, + val mockCuratorClient: Option[CuratorFramework] = None) + extends ZooKeeperManager { + + private val LIVY_SERVER_URI_KEY = livyConf.get(LivyConf.LIVY_SERVER_ZOOKEEPER_NAMESPACE) + + /** + * Save Livy Server URI to ZooKeeper. + * @param address - URI address of Livy Server + */ + def setServerUri(address: URI): Unit = { + setData(LIVY_SERVER_URI_KEY, address) + } + + /** + * Get Livy Server URI from ZooKeeper. + * @return Livy Server URI + */ + def getServerUri(): URI = { + getData[URI](LIVY_SERVER_URI_KEY).getOrElse(URI.create("")) + } +} + +object LivyDiscoveryManager { + + def apply(livyConf: LivyConf, + mockCuratorClient: Option[CuratorFramework] = None): LivyDiscoveryManager = { + new LivyDiscoveryManager(livyConf, mockCuratorClient) + } +} diff --git a/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala new file mode 100644 index 000000000..e87fc42d1 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/discovery/ZooKeeperManager.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.server.discovery + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.framework.api.UnhandledErrorListener +import org.apache.curator.retry.RetryNTimes +import org.apache.zookeeper.KeeperException.NoNodeException + +import org.apache.livy.{LivyConf, Logging} + +trait ZooKeeperManager extends JsonMapper with Logging { + + val livyConf: LivyConf + val mockCuratorClient: Option[CuratorFramework] + + private val zkAddress = livyConf.get(LivyConf.LIVY_ZOOKEEPER_URL) + require(Option(zkAddress).isDefined, s"Please configure ${LivyConf.LIVY_ZOOKEEPER_URL.key}.") + private val zkKeyPrefix = livyConf.get(LivyConf.LIVY_ZOOKEEPER_NAMESPACE) + private val maxRetries = livyConf.getInt(LivyConf.LIVY_ZOOKEEPER_CONNECTION_MAX_RETRIES) + private val sleepTime = livyConf.getInt(LivyConf.LIVY_ZOOKEEPER_CONNECTION_RETRY_INTERVAL) + private val retryPolicy = new RetryNTimes(maxRetries, sleepTime) + + private val curatorClient = mockCuratorClient.getOrElse { + CuratorFrameworkFactory.newClient(zkAddress, retryPolicy) + } + + Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { + override def run(): Unit = { + curatorClient.close() + } + })) + + curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener { + def unhandledError(message: String, e: Throwable): Unit = { + error(s"Fatal Zookeeper error. Shutting down Livy server.") + System.exit(1) + } + }) + curatorClient.start() + // TODO Make sure ZK path has proper secure permissions so that other users cannot read its + // contents. + + def setData(key: String, value: Object): Unit = { + val prefixedKey = prefixKey(key) + val data = serializeToBytes(value) + if (exist(prefixedKey)) { + curatorClient.setData().forPath(prefixedKey, data) + } else { + curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data) + } + } + + def getData[T: ClassTag](key: String): Option[T] = { + val prefixedKey = prefixKey(key) + if (exist(prefixedKey)) { + Option(deserialize[T](curatorClient.getData().forPath(prefixedKey))) + } else { + None + } + } + + def getChildren(key: String): Seq[String] = { + val prefixedKey = prefixKey(key) + if (exist(prefixedKey)) { + curatorClient.getChildren.forPath(prefixedKey).asScala + } else { + Seq.empty[String] + } + } + + def delete(key: String): Unit = { + try { + curatorClient.delete().guaranteed().forPath(prefixKey(key)) + } catch { + case _: NoNodeException => + } + } + + def exist(key: String): Boolean = { + Option(curatorClient.checkExists().forPath(key)).isDefined + } + + private def prefixKey(key: String) = s"/$zkKeyPrefix/$key" +} diff --git a/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala index df9a712a8..3e5475293 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala @@ -30,7 +30,7 @@ class BlackholeStateStore(livyConf: LivyConf) extends StateStore(livyConf) { def get[T: ClassTag](key: String): Option[T] = None - def getChildren(key: String): Seq[String] = List.empty[String] + def getChildrenNodes(key: String): Seq[String] = List.empty[String] def remove(key: String): Unit = {} } diff --git a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala index d5f8f3dc6..d33f9fc57 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala @@ -113,7 +113,7 @@ class FileSystemStateStore( } } - override def getChildren(key: String): Seq[String] = { + override def getChildrenNodes(key: String): Seq[String] = { try { fileContext.util.listStatus(absPath(key)).map(_.getPath.getName) } catch { diff --git a/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala index 04292957c..136b9a674 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala @@ -54,7 +54,7 @@ class SessionStore( * Return all sessions stored in the store with specified session type. */ def getAllSessions[T <: RecoveryMetadata : ClassTag](sessionType: String): Seq[Try[T]] = { - store.getChildren(sessionPath(sessionType)) + store.getChildrenNodes(sessionPath(sessionType)) .flatMap { c => Try(c.toInt).toOption } // Ignore all non numerical keys .flatMap { id => val p = sessionPath(sessionType, id) diff --git a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala index a6c3275d7..046db93c7 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala @@ -16,26 +16,12 @@ */ package org.apache.livy.server.recovery -import scala.reflect.{classTag, ClassTag} - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule +import scala.reflect.ClassTag import org.apache.livy.{LivyConf, Logging} -import org.apache.livy.sessions.SessionKindModule +import org.apache.livy.server.discovery.JsonMapper import org.apache.livy.sessions.SessionManager._ -protected trait JsonMapper { - protected val mapper = new ObjectMapper() - .registerModule(DefaultScalaModule) - .registerModule(new SessionKindModule()) - - def serializeToBytes(value: Object): Array[Byte] = mapper.writeValueAsBytes(value) - - def deserialize[T: ClassTag](json: Array[Byte]): T = - mapper.readValue(json, classTag[T].runtimeClass.asInstanceOf[Class[T]]) -} - /** * Interface of a key-value pair storage for state storage. * It's responsible for de/serialization and retrieving/storing object. @@ -63,7 +49,7 @@ abstract class StateStore(livyConf: LivyConf) extends JsonMapper { * @return List of names of the direct children of the key. * Empty list if the key doesn't exist or have no child. */ - def getChildren(key: String): Seq[String] + def getChildrenNodes(key: String): Seq[String] /** * Remove the key from this state store. Does not throw if the key doesn't exist. diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala index ec6b9df18..a042a0c2e 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala @@ -16,103 +16,44 @@ */ package org.apache.livy.server.recovery -import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import scala.util.Try -import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} -import org.apache.curator.framework.api.UnhandledErrorListener -import org.apache.curator.retry.RetryNTimes -import org.apache.zookeeper.KeeperException.NoNodeException +import org.apache.curator.framework.CuratorFramework import org.apache.livy.{LivyConf, Logging} -import org.apache.livy.LivyConf.Entry - -object ZooKeeperStateStore { - val ZK_KEY_PREFIX_CONF = Entry("livy.server.recovery.zk-state-store.key-prefix", "livy") - val ZK_RETRY_CONF = Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100") -} - -class ZooKeeperStateStore( - livyConf: LivyConf, - mockCuratorClient: Option[CuratorFramework] = None) // For testing - extends StateStore(livyConf) with Logging { - - import ZooKeeperStateStore._ +import org.apache.livy.server.discovery.ZooKeeperManager + +/** + * Implementation for Livy State Store which uses Zookeeper as backend storage. + * Set {@code livy.server.recovery.mode} to ``recovery`` + * and {@code livy.server.recovery.state-store} to ``zookeeper`` to enable ZooKeeper state store. + * Also need to set {@code livy.zookeeper.url} to be able to get information from ZooKeeper. + * + * @param livyConf + * @param mockCuratorClient + */ +class ZooKeeperStateStore(val livyConf: LivyConf, + val mockCuratorClient: Option[CuratorFramework] = None) // For testing + extends StateStore(livyConf) with ZooKeeperManager { // Constructor defined for StateStore factory to new this class using reflection. def this(livyConf: LivyConf) { this(livyConf, None) } - private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL) - require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.") - private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF) - private val retryValue = livyConf.get(ZK_RETRY_CONF) - // a regex to match patterns like "m, n" where m and n both are integer values - private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r - private[recovery] val retryPolicy = retryValue match { - case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt) - case _ => throw new IllegalArgumentException( - s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " + - "Correct format is ,. e.g. 5,100") - } - - private val curatorClient = mockCuratorClient.getOrElse { - CuratorFrameworkFactory.newClient(zkAddress, retryPolicy) - } - - Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { - override def run(): Unit = { - curatorClient.close() - } - })) - - curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener { - def unhandledError(message: String, e: Throwable): Unit = { - error(s"Fatal Zookeeper error. Shutting down Livy server.") - System.exit(1) - } - }) - curatorClient.start() - // TODO Make sure ZK path has proper secure permissions so that other users cannot read its - // contents. - override def set(key: String, value: Object): Unit = { - val prefixedKey = prefixKey(key) - val data = serializeToBytes(value) - if (curatorClient.checkExists().forPath(prefixedKey) == null) { - curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data) - } else { - curatorClient.setData().forPath(prefixedKey, data) - } + setData(key, value) } override def get[T: ClassTag](key: String): Option[T] = { - val prefixedKey = prefixKey(key) - if (curatorClient.checkExists().forPath(prefixedKey) == null) { - None - } else { - Option(deserialize[T](curatorClient.getData().forPath(prefixedKey))) - } + getData[T](key) } - override def getChildren(key: String): Seq[String] = { - val prefixedKey = prefixKey(key) - if (curatorClient.checkExists().forPath(prefixedKey) == null) { - Seq.empty[String] - } else { - curatorClient.getChildren.forPath(prefixedKey).asScala - } + override def getChildrenNodes(key: String): Seq[String] = { + getChildren(key) } override def remove(key: String): Unit = { - try { - curatorClient.delete().guaranteed().forPath(prefixKey(key)) - } catch { - case _: NoNodeException => - } + delete(key) } - - private def prefixKey(key: String) = s"/$zkKeyPrefix/$key" } diff --git a/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala new file mode 100644 index 000000000..2c2887972 --- /dev/null +++ b/server/src/test/scala/org/apache/livy/server/discovery/LivyDiscoveryManagerSpec.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.server.discovery + +import java.net.{InetAddress, URI} + +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.api.{ExistsBuilder, GetDataBuilder, SetDataBuilder, UnhandledErrorListener} +import org.apache.curator.framework.listen.Listenable +import org.apache.zookeeper.data.Stat +import org.mockito.Mockito.{never, verify, when} +import org.scalatest.FunSpec +import org.scalatest.mock.MockitoSugar.mock + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.LivyConf.LIVY_ZOOKEEPER_URL +import org.apache.livy.server.LivyServer + +class LivyDiscoveryManagerSpec extends FunSpec with LivyBaseUnitTestSuite + with JsonMapper { + describe("DiscoveryManagerSpec") { + case class TestFixture(discoveryManager: LivyDiscoveryManager, curatorClient: CuratorFramework) + val conf = new LivyConf() + conf.set(LivyConf.LIVY_ZOOKEEPER_URL, "host") + val key = conf.get(LivyConf.LIVY_SERVER_ZOOKEEPER_NAMESPACE) + val prefixedKey = s"/livy/$key" + val ipAddress = InetAddress.getLocalHost.getHostAddress + val testAddress = new URI(s"http://${ipAddress}:${conf.getInt(LivyConf.SERVER_PORT)}") + val testData: Array[Byte] = serializeToBytes(testAddress) + + def withMock[R](testBody: TestFixture => R): R = { + val curatorClient = mock[CuratorFramework] + when(curatorClient.getUnhandledErrorListenable()) + .thenReturn(mock[Listenable[UnhandledErrorListener]]) + val discoveryManager = LivyDiscoveryManager(conf, Some(curatorClient)) + testBody(TestFixture(discoveryManager, curatorClient)) + } + + def mockExistsBuilder(curatorClient: CuratorFramework, exists: Boolean): Unit = { + val existsBuilder = mock[ExistsBuilder] + when(curatorClient.checkExists()).thenReturn(existsBuilder) + if (exists) { + when(existsBuilder.forPath(prefixedKey)).thenReturn(mock[Stat]) + } + } + + it("setAddress should use curatorClient") { + withMock { f => + mockExistsBuilder(f.curatorClient, exists = true) + + val setDataBuilder = mock[SetDataBuilder] + when(f.curatorClient.setData()).thenReturn(setDataBuilder) + + f.discoveryManager.setServerUri(testAddress) + + verify(f.curatorClient).start() + verify(setDataBuilder).forPath(prefixedKey, testData) + } + } + + it("getServerUri should use curatorClient") { + withMock { f => + mockExistsBuilder(f.curatorClient, exists = true) + val getDataBuilder = mock[GetDataBuilder] + when(f.curatorClient.getData()).thenReturn(getDataBuilder) + when(getDataBuilder.forPath(prefixedKey)).thenReturn(testData) + + f.discoveryManager.getServerUri() + + verify(f.curatorClient).start() + verify(getDataBuilder).forPath(prefixedKey) + } + } + + it("Livy Server should use DiscoveryManager") { + withMock { f => + val livyConf = new LivyConf() + livyConf.set(LIVY_ZOOKEEPER_URL, "host:port") + val s = new LivyServer() + + mockExistsBuilder(f.curatorClient, exists = true) + + val setDataBuilder = mock[SetDataBuilder] + when(f.curatorClient.setData()).thenReturn(setDataBuilder) + + s.setServerUri(livyConf, Some(f.curatorClient)) + verify(setDataBuilder).forPath(prefixedKey, testData) + } + } + + it("Livy Server should skip DiscoveryManager if ZooKeeper url isn't defined") { + withMock { f => + val livyConf = new LivyConf() + val s = new LivyServer() + + s.setServerUri(livyConf, Some(f.curatorClient)) + verify(f.curatorClient, never).setData() + } + } + } + +} diff --git a/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala index 8ee448f5e..fee8a72e5 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala @@ -36,8 +36,8 @@ class BlackholeStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { v shouldBe None } - it("getChildren should return empty list") { - val c = stateStore.getChildren("") + it("getChildrenNodes should return empty list") { + val c = stateStore.getChildrenNodes("") c shouldBe empty } diff --git a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala index 4758c854f..1dc084ee5 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala @@ -141,7 +141,7 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { stateStore.get[String]("key") shouldBe None } - it("getChildren should list file") { + it("getChildrenNodes should list file") { val parentPath = "path" def makeFileStatus(name: String): FileStatus = { val fs = new FileStatus() @@ -157,7 +157,7 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { when(fileContext.util()).thenReturn(util) val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) - stateStore.getChildren(parentPath) should contain theSameElementsAs children + stateStore.getChildrenNodes(parentPath) should contain theSameElementsAs children } def getChildrenErrorTest(error: Exception): Unit = { @@ -169,14 +169,14 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { when(fileContext.util()).thenReturn(util) val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) - stateStore.getChildren(parentPath) shouldBe empty + stateStore.getChildrenNodes(parentPath) shouldBe empty } - it("getChildren should return empty list if the key doesn't exist") { + it("getChildrenNodes should return empty list if the key doesn't exist") { getChildrenErrorTest(new IOException("Unit test")) } - it("getChildren should return empty list if key doesn't exist") { + it("getChildrenNodes should return empty list if key doesn't exist") { getChildrenErrorTest(new FileNotFoundException("Unit test")) } diff --git a/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala index 5eeb2cfc3..3baa8e9cc 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala @@ -56,7 +56,7 @@ class SessionStoreSpec extends FunSpec with LivyBaseUnitTestSuite { ) val stateStore = mock[StateStore] val sessionStore = new SessionStore(conf, stateStore) - when(stateStore.getChildren(sessionPath)) + when(stateStore.getChildrenNodes(sessionPath)) .thenReturn((validMetadata ++ corruptedMetadata).keys.toList) validMetadata.foreach { case (id, m) => @@ -78,7 +78,7 @@ class SessionStoreSpec extends FunSpec with LivyBaseUnitTestSuite { it("should not throw if the state store is empty") { val stateStore = mock[StateStore] val sessionStore = new SessionStore(conf, stateStore) - when(stateStore.getChildren(sessionPath)).thenReturn(Seq.empty) + when(stateStore.getChildrenNodes(sessionPath)).thenReturn(Seq.empty) val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType) s.filter(_.isSuccess) shouldBe empty diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala index 88e530f62..217e166c7 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.livy.server.recovery import scala.collection.JavaConverters._ @@ -34,7 +33,7 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { describe("ZooKeeperStateStore") { case class TestFixture(stateStore: ZooKeeperStateStore, curatorClient: CuratorFramework) val conf = new LivyConf() - conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host") + conf.set(LivyConf.LIVY_ZOOKEEPER_URL, "host") val key = "key" val prefixedKey = s"/livy/$key" @@ -54,17 +53,6 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { } } - it("should throw on bad config") { - withMock { f => - val conf = new LivyConf() - intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) } - - conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host") - conf.set(ZooKeeperStateStore.ZK_RETRY_CONF, "bad") - intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) } - } - } - it("set should use curatorClient") { withMock { f => mockExistsBuilder(f.curatorClient, true) @@ -95,16 +83,6 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { } } - it("get should retrieve retry policy configs") { - conf.set(org.apache.livy.server.recovery.ZooKeeperStateStore.ZK_RETRY_CONF, "11,77") - withMock { f => - mockExistsBuilder(f.curatorClient, true) - - f.stateStore.retryPolicy should not be null - f.stateStore.retryPolicy.getN shouldBe 11 - } - } - it("get should retrieve data from curatorClient") { withMock { f => mockExistsBuilder(f.curatorClient, true) @@ -131,7 +109,7 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { } } - it("getChildren should use curatorClient") { + it("getChildrenNodes should use curatorClient") { withMock { f => mockExistsBuilder(f.curatorClient, true) @@ -140,18 +118,18 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { val children = List("abc", "def") when(getChildrenBuilder.forPath(prefixedKey)).thenReturn(children.asJava) - val c = f.stateStore.getChildren("key") + val c = f.stateStore.getChildrenNodes("key") verify(f.curatorClient).start() c shouldBe children } } - it("getChildren should return empty list if key doesn't exist") { + it("getChildrenNodes should return empty list if key doesn't exist") { withMock { f => mockExistsBuilder(f.curatorClient, false) - val c = f.stateStore.getChildren("key") + val c = f.stateStore.getChildrenNodes("key") verify(f.curatorClient).start() c shouldBe empty