diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index 7566971c3..eeed31856 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -151,17 +151,39 @@
# cause session leakage, so we need to check session leakage.
# How long to check livy session leakage
# livy.server.yarn.app-leakage.check-timeout = 600s
-# how often to check livy session leakage
+# How often to check livy session leakage
# livy.server.yarn.app-leakage.check-interval = 60s
# How often Livy polls YARN to refresh YARN app state.
# livy.server.yarn.poll-interval = 5s
-#
+
+# If Livy can't find the Kubernetes app within this time, consider it lost
+# livy.server.kubernetes.app-lookup-timeout = 600s
+# When the cluster is busy, we may fail to launch Kubernetes app in app-lookup-timeout, then it
+# would cause session leakage, so we need to check session leakage
+# How long to check livy session leakage
+# livy.server.kubernetes.app-leakage.check-timeout = 600s
+# How often to check livy session leakage
+# livy.server.kubernetes.app-leakage.check-interval = 60s
+
+# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description
+# details, routes, etc...)
+# livy.server.kubernetes.poll-interval = 15s
+
+# Comma-separated list of the Kubernetes namespaces to allow for applications creation.
+# All namespaces are allowed if empty
+# livy.server.kubernetes.allowedNamespaces =
+
# Days to keep Livy server request logs.
# livy.server.request-log-retain.days = 5
# If the Livy Web UI should be included in the Livy Server. Enabled by default.
# livy.ui.enabled = true
+# Wether to display on Livy UI links to Spark UI when running on Kubernetes
+# livy.ui.kubernetes.sparkui.enabled = false
+# String format to use to build links to Spark UI to be displayed on Livy UI when running on
+# Kubernetes. Use '%s' placeholder for the link url part to be replaced by Spark application ID
+# livy.ui.kubernetes.sparkui.link-format = http://localhost/%s
# Whether to enable Livy server access control, if it is true then all the income requests will
# be checked if the requested user has permission.
@@ -195,3 +217,18 @@
# Enable to allow custom classpath by proxy user in cluster mode
# The below configuration parameter is disabled by default.
# livy.server.session.allow-custom-classpath = true
+
+# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount
+# if deployed to Kubernetes cluster as a Pod)
+# Kubernetes oauth token file path
+# livy.server.kubernetes.oauthTokenFile =
+# Kubernetes oauth token string value
+# livy.server.kubernetes.oauthTokenValue =
+# Kubernetes CA cert file path
+# livy.server.kubernetes.caCertFile =
+# Kubernetes client key file path
+# livy.server.kubernetes.clientKeyFile =
+# Kubernetes client cert file path
+# livy.server.kubernetes.clientCertFile =
+# Kubernetes client default namespace
+# livy.server.kubernetes.defaultNamespace =
diff --git a/pom.xml b/pom.xml
index 23561962d..fa97cdffd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@
2.4.5
2.4.5
${spark.scala-2.11.version}
+ 4.6.4
3.0.0
1.9
4.5.13
@@ -318,6 +319,18 @@
${metrics.version}
+
+ io.fabric8
+ kubernetes-client
+ ${kubernetes.client.version}
+
+
+ com.fasterxml.jackson.core
+ *
+
+
+
+
io.netty
netty-all
@@ -1188,6 +1201,8 @@
https://archive.apache.org/dist/spark/spark-${spark.version}/${spark.bin.name}.tgz
+ 4.9.2
+ 2.10.3
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
index 4c45956d7..200f5a5f0 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
@@ -25,6 +25,7 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import javax.security.sasl.Sasl;
@@ -151,6 +152,12 @@ public String findLocalAddress() throws IOException {
return address.getCanonicalHostName();
}
+ public boolean isRunningOnKubernetes() {
+ return Optional.ofNullable(get("livy.spark.master"))
+ .filter(s -> s.startsWith("k8s"))
+ .isPresent();
+ }
+
private static final Map configsWithAlternatives
= Collections.unmodifiableMap(new HashMap() {{
put(RSCConf.Entry.CLIENT_IN_PROCESS.key, DepConf.CLIENT_IN_PROCESS);
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
index b93c5cc71..be609afe2 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
@@ -169,6 +169,13 @@ private void initializeServer() throws Exception {
// on the cluster, it would be tricky to solve that problem in a generic way.
livyConf.set(RPC_SERVER_ADDRESS, null);
+ // If we are running on Kubernetes, set RPC_SERVER_ADDRESS from "spark.driver.host" option,
+ // which is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep:
+ // line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
+ if (livyConf.isRunningOnKubernetes()) {
+ livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host"));
+ }
+
if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) {
// Test flag is turned on so we will just infinite loop here. It should cause
// timeout and we should still see yarn application being cleaned up.
diff --git a/server/pom.xml b/server/pom.xml
index c2a8ef4f1..f9c296e51 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -84,6 +84,11 @@
metrics-healthchecks
+
+ io.fabric8
+ kubernetes-client
+
+
javax.servlet
javax.servlet-api
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 31b687259..c8dc53ab5 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -63,6 +63,12 @@ object LivyConf {
val SERVER_BASE_PATH = Entry("livy.ui.basePath", "")
val UI_ENABLED = Entry("livy.ui.enabled", true)
+ // Wether to display on Livy UI links to Spark UI when running on Kubernetes
+ val UI_KUBERNETES_SPARK_UI_ENABLED = Entry("livy.ui.kubernetes.sparkui.enabled", false)
+ // String format to use to build links to Spark UI to be displayed on Livy UI when running on
+ // Kubernetes. Use '%s' placeholder for the link url part to be replaced by Spark application ID
+ val UI_KUBERNETES_SPARK_UI_LINK_FORMAT =
+ Entry("livy.ui.kubernetes.sparkui.link-format", "http://localhost/%s")
val REQUEST_HEADER_SIZE = Entry("livy.server.request-header.size", 131072)
val RESPONSE_HEADER_SIZE = Entry("livy.server.response-header.size", 131072)
@@ -252,6 +258,35 @@ object LivyConf {
// how often to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s")
+ // Kubernetes oauth token file path.
+ val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "")
+ // Kubernetes oauth token string value.
+ val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "")
+ // Kubernetes CA cert file path.
+ val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "")
+ // Kubernetes client key file path.
+ val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "")
+ // Kubernetes client cert file path.
+ val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "")
+ // Kubernetes client default namespace.
+ val KUBERNETES_DEFAULT_NAMESPACE = Entry("livy.server.kubernetes.defaultNamespace", "")
+
+ // Comma-separated list of the Kubernetes namespaces to allow for applications creation.
+ // All namespaces are allowed if empty.
+ val KUBERNETES_ALLOWED_NAMESPACES = Entry("livy.server.kubernetes.allowedNamespaces", null)
+
+ // If Livy can't find the Kubernetes app within this time, consider it lost.
+ val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
+ // How often Livy polls Kubernetes to refresh Kubernetes app state.
+ val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")
+
+ // How long to check livy session leakage.
+ val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT =
+ Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s")
+ // How often to check livy session leakage.
+ val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL =
+ Entry("livy.server.kubernetes.app-leakage.check-interval", "60s")
+
// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
@@ -365,6 +400,15 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return true if spark master starts with yarn. */
def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn")
+ /** Return true if spark master starts with k8s. */
+ def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s")
+
+ /** Return Kubernetes namespace or all if not set. */
+ def getKubernetesNamespaces(): Set[String] =
+ Option(get(KUBERNETES_ALLOWED_NAMESPACES)).filterNot(_.isEmpty)
+ .map(_.split(",").toSet)
+ .getOrElse(Set.empty)
+
/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)
diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
index 3e715bdf1..67da94d29 100644
--- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala
+++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
@@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManag
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
+import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp}
import org.apache.livy.utils.LivySparkUtils._
-import org.apache.livy.utils.SparkYarnApp
class LivyServer extends Logging {
@@ -142,10 +142,13 @@ class LivyServer extends Logging {
testRecovery(livyConf)
- // Initialize YarnClient ASAP to save time.
+ // Initialize YarnClient or KubernetesClient ASAP to save time.
if (livyConf.isRunningOnYarn()) {
SparkYarnApp.init(livyConf)
Future { SparkYarnApp.yarnClient }
+ } else if (livyConf.isRunningOnKubernetes()) {
+ SparkKubernetesApp.init(livyConf)
+ Future { SparkKubernetesApp.kubernetesClient }
}
if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
@@ -415,10 +418,10 @@ class LivyServer extends Logging {
}
private[livy] def testRecovery(livyConf: LivyConf): Unit = {
- if (!livyConf.isRunningOnYarn()) {
- // If recovery is turned on but we are not running on YARN, quit.
+ if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) {
+ // If recovery is turned on but we are not running on YARN or Kubernetes, quit.
require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF,
- "Session recovery requires YARN.")
+ "Session recovery requires YARN or Kubernetes.")
}
}
}
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index bf2db0c09..af069c3ce 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -458,7 +458,9 @@ class InteractiveSession(
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
- if (livyConf.isRunningOnYarn() || driverProcess.isDefined) {
+ if (livyConf.isRunningOnYarn() || driverProcess.isDefined
+ // Create SparkKubernetesApp anyway to recover app monitoring on Livy server restart
+ || livyConf.isRunningOnKubernetes()) {
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
None
@@ -540,6 +542,8 @@ class InteractiveSession(
transition(SessionState.ShuttingDown)
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
client.foreach { _.stop(true) }
+ // We need to call #kill here explicitly to delete Interactive pods from the cluster
+ if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill())
} catch {
case _: Exception =>
app.foreach {
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
index 9afe28162..ffe7eecc7 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
@@ -71,13 +71,36 @@ object SparkApp {
sparkConf ++ Map(
SPARK_YARN_TAG_KEY -> mergedYarnTags,
"spark.yarn.submit.waitAppCompletion" -> "false")
+ } else if (livyConf.isRunningOnKubernetes()) {
+
+ // We don't allow to submit applications to the namespaces different from the configured
+ val kubernetesNamespaces = livyConf.getKubernetesNamespaces()
+ val targetNamespace = sparkConf.getOrElse("spark.kubernetes.namespace",
+ SparkKubernetesApp.kubernetesClient.getDefaultNamespace)
+ if (kubernetesNamespaces.nonEmpty && !kubernetesNamespaces.contains(targetNamespace)) {
+ throw new IllegalArgumentException(
+ s"Requested namespace $targetNamespace doesn't match the configured: " +
+ kubernetesNamespaces.mkString(", "))
+ }
+
+ import KubernetesConstants._
+ sparkConf ++ Map(
+ "spark.kubernetes.namespace" -> targetNamespace,
+ // Mark Spark pods with the unique appTag label to be used for their discovery
+ s"spark.kubernetes.driver.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag,
+ s"spark.kubernetes.executor.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag,
+ // Mark Spark pods as created by Livy for the additional tracing
+ s"spark.kubernetes.driver.label.$CREATED_BY_ANNOTATION" -> "livy",
+ s"spark.kubernetes.executor.label.$CREATED_BY_ANNOTATION" -> "livy",
+ "spark.kubernetes.submission.waitAppCompletion" -> "false")
} else {
sparkConf
}
}
/**
- * Return a SparkApp object to control the underlying Spark application via YARN or spark-submit.
+ * Return a SparkApp object to control the underlying Spark application via YARN, Kubernetes
+ * or spark-submit.
*
* @param uniqueAppTag A tag that can uniquely identify the application.
*/
@@ -89,8 +112,11 @@ object SparkApp {
listener: Option[SparkAppListener]): SparkApp = {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
+ } else if (livyConf.isRunningOnKubernetes()) {
+ new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf)
} else {
- require(process.isDefined, "process must not be None when Livy master is not YARN.")
+ require(process.isDefined, "process must not be None when Livy master is not YARN or " +
+ "Kubernetes.")
new SparkProcApp(process.get, listener)
}
}
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
new file mode 100644
index 000000000..a69803a6f
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.utils
+
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client._
+
+import org.apache.livy.{LivyConf, Logging, Utils}
+
+object SparkKubernetesApp extends Logging {
+
+ lazy val kubernetesClient: LivyKubernetesClient =
+ KubernetesClientFactory.createKubernetesClient(livyConf)
+
+ private val RETRY_BACKOFF_MILLIS = 1000
+ private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
+
+ private val leakedAppsGCThread = new Thread() {
+ override def run(): Unit = {
+ while (true) {
+ if (!leakedAppTags.isEmpty) {
+ // kill the app if found it or remove it if exceeding a threshold
+ val leakedApps = leakedAppTags.entrySet().iterator()
+ val now = System.currentTimeMillis()
+ try {
+ val apps = withRetry(kubernetesClient.getApplications()).groupBy(_.getApplicationTag)
+ while (leakedApps.hasNext) {
+ val leakedApp = leakedApps.next()
+ apps.get(leakedApp.getKey) match {
+ case Some(seq) =>
+ seq.foreach(app =>
+ if (withRetry(kubernetesClient.killApplication(app))) {
+ leakedApps.remove()
+ info(s"Killed leaked app with tag ${leakedApp.getKey}")
+ } else {
+ warn(s"Leaked app with tag ${leakedApp.getKey} haven't been killed")
+ }
+ )
+ case None if (leakedApp.getValue - now) > sessionLeakageCheckTimeout =>
+ leakedApps.remove()
+ warn(s"Leaked app with tag ${leakedApp.getKey} doesn't exist")
+ }
+ }
+ } catch {
+ case e: KubernetesClientException =>
+ error("Kubernetes client failure", e)
+ case NonFatal(e) =>
+ error("Failed to remove leaked apps", e)
+ }
+ }
+ Thread.sleep(sessionLeakageCheckInterval)
+ }
+ }
+ }
+
+ private var livyConf: LivyConf = _
+ private var sessionLeakageCheckTimeout: Long = _
+ private var sessionLeakageCheckInterval: Long = _
+
+ def init(livyConf: LivyConf): Unit = {
+ this.livyConf = livyConf
+ sessionLeakageCheckInterval =
+ livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL)
+ sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT)
+ leakedAppsGCThread.setDaemon(true)
+ leakedAppsGCThread.setName("LeakedAppsGCThread")
+ leakedAppsGCThread.start()
+ }
+
+ // Returning T, throwing the exception on failure
+ @tailrec
+ private def withRetry[T](fn: => T, retries: Int = 3): T = {
+ Try { fn } match {
+ case Success(x) => x
+ case _ if retries > 1 =>
+ Thread.sleep(RETRY_BACKOFF_MILLIS)
+ withRetry(fn, retries - 1)
+ case Failure(e) => throw e
+ }
+ }
+
+ private[utils] def mapKubernetesState(
+ kubernetesAppState: String,
+ appTag: String): SparkApp.State.Value = {
+ import KubernetesApplicationState._
+ kubernetesAppState.toLowerCase match {
+ case PENDING =>
+ SparkApp.State.STARTING
+ case RUNNING =>
+ SparkApp.State.RUNNING
+ case SUCCEEDED =>
+ SparkApp.State.FINISHED
+ case FAILED =>
+ SparkApp.State.FAILED
+ case other => // any other combination is invalid, so FAIL the application.
+ error(s"Unknown Kubernetes state $other for app with tag $appTag")
+ SparkApp.State.FAILED
+ }
+ }
+
+ private[utils] object KubernetesApplicationState {
+ val PENDING = "pending"
+ val RUNNING = "running"
+ val SUCCEEDED = "succeeded"
+ val FAILED = "failed"
+ }
+}
+
+class SparkKubernetesApp private[utils](
+ appTag: String,
+ appIdOption: Option[String],
+ process: Option[LineBufferedProcess],
+ listener: Option[SparkAppListener],
+ livyConf: LivyConf,
+ // For unit tests
+ kubernetesClient: => LivyKubernetesClient = SparkKubernetesApp.kubernetesClient)
+ extends SparkApp with Logging {
+
+ import SparkKubernetesApp._
+
+ private val appLookupTimeout =
+ livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds
+ private val cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)
+ private val pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds
+
+ private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String]
+ private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
+
+ private var state: SparkApp.State = SparkApp.State.STARTING
+ private val appPromise: Promise[KubernetesApplication] = Promise()
+
+ private[utils] val kubernetesAppMonitorThread = Utils
+ .startDaemonThread(s"kubernetesAppMonitorThread-$this") {
+ try {
+ val app = try {
+ getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
+ } catch {
+ case e: Exception =>
+ appPromise.failure(e)
+ throw e
+ }
+ appPromise.success(app)
+ val appId = app.getApplicationId
+
+ Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appTag")
+ listener.foreach(_.appIdKnown(appId))
+
+ var appInfo = AppInfo()
+ while (isRunning) {
+ val appReport = withRetry(kubernetesClient.getApplicationReport(app, cacheLogSize))
+ kubernetesAppLog = appReport.getApplicationLog
+ kubernetesDiagnostics = appReport.getApplicationDiagnostics
+ changeState(mapKubernetesState(appReport.getApplicationState, appTag))
+ val latestAppInfo = AppInfo(sparkUiUrl = appReport.getTrackingUrl)
+ if (appInfo != latestAppInfo) {
+ listener.foreach(_.infoChanged(latestAppInfo))
+ appInfo = latestAppInfo
+ }
+ Clock.sleep(pollInterval.toMillis)
+ }
+ debug(s"Application $appId is in state $state\nDiagnostics:" +
+ s"\n${kubernetesDiagnostics.mkString("\n")}")
+ } catch {
+ case _: InterruptedException =>
+ kubernetesDiagnostics = ArrayBuffer("Application stopped by user")
+ changeState(SparkApp.State.KILLED)
+ case NonFatal(e) =>
+ error("Couldn't refresh Kubernetes state", e)
+ kubernetesDiagnostics = ArrayBuffer(e.getMessage)
+ changeState(SparkApp.State.FAILED)
+ } finally {
+ listener.foreach(_.infoChanged(AppInfo()))
+ info(s"Finished monitoring application $appTag with state $state")
+ }
+ }
+
+ override def log(): IndexedSeq[String] = {
+ ("stdout: " +: kubernetesAppLog) ++
+ ("\nstderr: " +: (process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++
+ process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++
+ ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics)
+ }
+
+ override def kill(): Unit = {
+ try {
+ withRetry {
+ kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout))
+ }
+ } catch {
+ // We cannot kill the Kubernetes app without the appTag.
+ // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure.
+ // We don't want a stuck session that can't be deleted. Emit a warning and move on.
+ case _: TimeoutException | _: InterruptedException =>
+ warn("Attempted to delete a session while its Kubernetes application is not found")
+ kubernetesAppMonitorThread.interrupt()
+ } finally {
+ process.foreach(_.destroy())
+ }
+ }
+
+ private def isRunning: Boolean = {
+ state != SparkApp.State.FAILED &&
+ state != SparkApp.State.FINISHED &&
+ state != SparkApp.State.KILLED
+ }
+
+ private def changeState(newState: SparkApp.State.Value): Unit = {
+ if (state != newState) {
+ listener.foreach(_.stateChanged(state, newState))
+ state = newState
+ }
+ }
+
+ /**
+ * Find the corresponding KubernetesApplication from an application tag.
+ *
+ * @param appTag The application tag tagged on the target application.
+ * If the tag is not unique, it returns the first application it found.
+ * @return KubernetesApplication or the failure.
+ */
+ @tailrec
+ private def getAppFromTag(
+ appTag: String,
+ pollInterval: Duration,
+ deadline: Deadline): KubernetesApplication = {
+ withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag)))
+ match {
+ case Some(app) => app
+ case None =>
+ if (deadline.isOverdue) {
+ process.foreach(_.destroy())
+ leakedAppTags.put(appTag, System.currentTimeMillis())
+ throw new IllegalStateException("No Kubernetes application is found with tag" +
+ s" $appTag in ${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" +
+ " seconds. This may be because 1) spark-submit fail to submit application to " +
+ "Kubernetes; or 2) Kubernetes cluster doesn't have enough resources to start the " +
+ "application in time. Please check Livy log and Kubernetes log to know the details")
+ } else {
+ Clock.sleep(pollInterval.toMillis)
+ getAppFromTag(appTag, pollInterval, deadline)
+ }
+ }
+ }
+}
+
+object KubernetesConstants {
+ val CREATED_BY_ANNOTATION = "created-by"
+
+ val SPARK_APP_ID_LABEL = "spark-app-selector"
+ val SPARK_APP_TAG_LABEL = "spark-app-tag"
+ val SPARK_ROLE_LABEL = "spark-role"
+ val SPARK_EXEC_ID_LABEL = "spark-exec-id"
+
+ val SPARK_ROLE_DRIVER = "driver"
+ val SPARK_ROLE_EXECUTOR = "executor"
+}
+
+class KubernetesApplication(driverPod: Pod) {
+
+ import KubernetesConstants._
+
+ def getApplicationTag: String = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL)
+
+ def getApplicationId: String = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)
+
+ def getApplicationNamespace: String = driverPod.getMetadata.getNamespace
+
+ def getApplicationState: String = driverPod.getStatus.getPhase.toLowerCase
+
+ def getApplicationPod: Pod = driverPod
+}
+
+private[utils] case class KubernetesAppReport(
+ driver: Option[Pod],
+ executors: Set[Pod],
+ appLog: IndexedSeq[String],
+ livyConf: LivyConf) {
+
+ import SparkKubernetesApp._
+
+ private val UNKNOWN = "unknown"
+ private val APP = driver.map(new KubernetesApplication(_))
+
+ def getTrackingUrl: Option[String] = {
+ if (livyConf.getBoolean(LivyConf.UI_KUBERNETES_SPARK_UI_ENABLED) && isRunning) {
+ val format = livyConf.get(LivyConf.UI_KUBERNETES_SPARK_UI_LINK_FORMAT)
+ Some(String.format(format, getApplicationId))
+ } else {
+ None
+ }
+ }
+
+ def getApplicationState: String = getOrDefault(_.getApplicationState)
+
+ def getApplicationTag: String = getOrDefault(_.getApplicationTag)
+
+ def getApplicationId: String = getOrDefault(_.getApplicationId)
+
+ def getApplicationLog: IndexedSeq[String] = appLog
+
+ def getApplicationDiagnostics: IndexedSeq[String] = {
+ (Seq(driver) ++ executors.toSeq.sortBy(_.getMetadata.getName).map(Some(_)))
+ .flatten
+ .map(buildSparkPodDiagnosticsPrettyString)
+ .flatMap(_.split("\n"))
+ .toIndexedSeq
+ }
+
+ private def isRunning: Boolean =
+ SparkApp.State.RUNNING == mapKubernetesState(getApplicationState, getApplicationTag)
+
+ private def getOrDefault(extractor: KubernetesApplication => String): String =
+ APP.map(extractor).flatMap(Option(_)).getOrElse(UNKNOWN)
+
+ private def buildSparkPodDiagnosticsPrettyString(pod: Pod): String = {
+ import scala.collection.JavaConverters._
+ def printMap(map: Map[_, _]): String = {
+ map.map {
+ case (key, value) => s"$key=$value"
+ }.mkString(", ")
+ }
+
+ s"${pod.getMetadata.getName}.${pod.getMetadata.getNamespace}:" +
+ s"\n\tnode: ${pod.getSpec.getNodeName}" +
+ s"\n\thostname: ${pod.getSpec.getHostname}" +
+ s"\n\tpodIp: ${pod.getStatus.getPodIP}" +
+ s"\n\tstartTime: ${pod.getStatus.getStartTime}" +
+ s"\n\tphase: ${pod.getStatus.getPhase}" +
+ s"\n\treason: ${pod.getStatus.getReason}" +
+ s"\n\tmessage: ${pod.getStatus.getMessage}" +
+ s"\n\tlabels: ${printMap(pod.getMetadata.getLabels.asScala.toMap)}" +
+ s"\n\tcontainers:" +
+ s"\n\t\t${
+ pod.getSpec.getContainers.asScala.map(container =>
+ s"${container.getName}:" +
+ s"\n\t\t\timage: ${container.getImage}" +
+ s"\n\t\t\trequests: ${printMap(container.getResources.getRequests.asScala.toMap)}" +
+ s"\n\t\t\tlimits: ${printMap(container.getResources.getLimits.asScala.toMap)}" +
+ s"\n\t\t\tcommand: ${container.getCommand} ${container.getArgs}"
+ ).mkString("\n\t\t")
+ }" +
+ s"\n\tconditions:" +
+ s"\n\t\t${pod.getStatus.getConditions.asScala.mkString("\n\t\t")}"
+ }
+}
+
+private[utils] class LivyKubernetesClient(
+ client: DefaultKubernetesClient, livyConf: LivyConf) {
+
+ import scala.collection.JavaConverters._
+
+ import KubernetesConstants._
+
+ def getApplications(
+ labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER),
+ appTagLabel: String = SPARK_APP_TAG_LABEL,
+ appIdLabel: String = SPARK_APP_ID_LABEL): Seq[KubernetesApplication] = {
+ Option(livyConf.getKubernetesNamespaces()).filter(_.nonEmpty)
+ .map(_.map(client.inNamespace))
+ .getOrElse(Seq(client.inAnyNamespace()))
+ .map(_.pods
+ .withLabels(labels.asJava)
+ .withLabel(appTagLabel)
+ .withLabel(appIdLabel)
+ .list.getItems.asScala.map(new KubernetesApplication(_)))
+ .reduce(_ ++ _)
+ }
+
+ def killApplication(app: KubernetesApplication): Boolean = {
+ client.inNamespace(app.getApplicationNamespace).pods.delete(app.getApplicationPod)
+ }
+
+ def getApplicationReport(
+ app: KubernetesApplication,
+ cacheLogSize: Int,
+ appTagLabel: String = SPARK_APP_TAG_LABEL): KubernetesAppReport = {
+ val pods = client.inNamespace(app.getApplicationNamespace).pods
+ .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava)
+ .list.getItems.asScala.toSet
+ val driver = pods.find(isDriver)
+ val executors = pods.filter(isExecutor)
+ val appLog = getApplicationLog(app, cacheLogSize)
+ KubernetesAppReport(driver, executors, appLog, livyConf)
+ }
+
+ private def getApplicationLog(
+ app: KubernetesApplication, cacheLogSize: Int): IndexedSeq[String] = {
+ Try(
+ client.inNamespace(app.getApplicationNamespace).pods
+ .withName(app.getApplicationPod.getMetadata.getName)
+ .tailingLines(cacheLogSize).getLog.split("\n").toIndexedSeq
+ ).getOrElse(IndexedSeq.empty)
+ }
+
+ private def isDriver: Pod => Boolean = {
+ _.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER
+ }
+
+ private def isExecutor: Pod => Boolean = {
+ _.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR
+ }
+
+ def getDefaultNamespace: String = client.getNamespace
+}
+
+private[utils] object KubernetesClientFactory {
+
+ import java.io.File
+
+ import com.google.common.base.Charsets
+ import com.google.common.io.Files
+ import io.fabric8.kubernetes.client.ConfigBuilder
+
+ def createKubernetesClient(livyConf: LivyConf): LivyKubernetesClient = {
+ val masterUrl = sparkMasterToKubernetesApi(livyConf.sparkMaster())
+
+ val oauthTokenFile = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE).toOption
+ val oauthTokenValue = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE).toOption
+ require(oauthTokenFile.isEmpty || oauthTokenValue.isEmpty,
+ "Cannot specify OAuth token through both " +
+ s"a file $oauthTokenFile and a value $oauthTokenValue")
+
+ val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption
+ val clientKeyFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption
+ val clientCertFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_CERT_FILE).toOption
+ val clientNamespace = livyConf.get(LivyConf.KUBERNETES_DEFAULT_NAMESPACE).toOption
+
+ val config = new ConfigBuilder()
+ .withApiVersion("v1")
+ .withMasterUrl(masterUrl)
+ .withOption(oauthTokenValue) {
+ (token, builder) => builder.withOauthToken(token)
+ }
+ .withOption(oauthTokenFile) {
+ (file, builder) => builder.withOauthToken(Files.toString(new File(file), Charsets.UTF_8))
+ }
+ .withOption(caCertFile) {
+ (file, builder) => builder.withCaCertFile(file)
+ }
+ .withOption(clientKeyFile) {
+ (file, builder) => builder.withClientKeyFile(file)
+ }
+ .withOption(clientCertFile) {
+ (file, builder) => builder.withClientCertFile(file)
+ }
+ .withOption(clientNamespace) {
+ (namespace, builder) => builder.withNamespace(namespace)
+ }
+ .build()
+ new LivyKubernetesClient(
+ new DefaultKubernetesClient(config), livyConf)
+ }
+
+ private[utils] def sparkMasterToKubernetesApi(sparkMaster: String): String = {
+ val replaced = sparkMaster.replaceFirst("k8s://", "")
+ if (!replaced.startsWith("http")) s"https://$replaced"
+ else replaced
+ }
+
+ implicit class OptionString(val string: String) extends AnyVal {
+ def toOption: Option[String] = {
+ if (string == null || string.trim.isEmpty) None
+ else Some(string)
+ }
+ }
+
+ private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
+ extends AnyVal {
+
+ def withOption[T](option: Option[T])
+ (configurator: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder = {
+ option.map(configurator(_, configBuilder)).getOrElse(configBuilder)
+ }
+ }
+}
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
new file mode 100644
index 000000000..064adb485
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.utils
+
+import java.util.Objects._
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.duration._
+
+import io.fabric8.kubernetes.api.model._
+import org.mockito.Matchers.{eq => eqs, _}
+import org.mockito.Mockito.{atLeast, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.FunSpec
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.utils.SparkApp.State
+import org.apache.livy.utils.SparkKubernetesApp.KubernetesApplicationState._
+
+class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite {
+
+ private def cleanupThread(t: Thread)(f: => Unit) = {
+ try { f } finally { t.interrupt() }
+ }
+
+ private def mockSleep(ms: Long) = {
+ Thread.`yield`()
+ }
+
+ describe("SparkKubernetesApp") {
+ val TEST_TIMEOUT = 30 seconds
+ val appId = "app_id"
+ val appTag = "app_tag"
+ val trackingUrl = "tracking_url"
+
+ def initMockApp: KubernetesApplication = {
+ val mockApp = mock[KubernetesApplication]
+ when(mockApp.getApplicationId).thenReturn(appId)
+ when(mockApp.getApplicationTag).thenReturn(appTag)
+ mockApp
+ }
+
+ def initMockClient(mockApp: KubernetesApplication): LivyKubernetesClient = {
+ val mockAppReport = mock[KubernetesAppReport]
+ when(mockAppReport.getApplicationLog).thenReturn(IndexedSeq("app", "log"))
+ when(mockAppReport.getApplicationDiagnostics).thenReturn(IndexedSeq("app", "diagnostics"))
+ when(mockAppReport.getTrackingUrl).thenReturn(Some(trackingUrl))
+ val mockClient = mock[LivyKubernetesClient]
+ when(mockClient.getApplications(any(), any(), any())).thenReturn(Seq(mockApp))
+ when(mockClient.getApplicationReport(eqs(mockApp), any(), any())).thenReturn(mockAppReport)
+
+ // Simulate Kubernetes app state progression.
+ val applicationStateList = List(
+ PENDING,
+ RUNNING,
+ SUCCEEDED
+ )
+ val stateIndex = new AtomicInteger(0)
+ when(mockAppReport.getApplicationState).thenAnswer(
+ new Answer[String] {
+ override def answer(inv: InvocationOnMock): String = {
+ stateIndex.getAndIncrement() match {
+ case i if i < applicationStateList.size =>
+ applicationStateList(i)
+ case _ =>
+ applicationStateList.last
+ }
+ }
+ }
+ )
+ mockClient
+ }
+
+ it("should poll Kubernetes state and terminate") {
+ val livyConf = new LivyConf(false)
+ livyConf.set(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT, "30s")
+ Clock.withSleepMethod(mockSleep) {
+ val mockListener = mock[SparkAppListener]
+ val mockApp = initMockApp
+ val mockClient = initMockClient(mockApp)
+ val app = new SparkKubernetesApp(
+ appTag, None, None, Some(mockListener), livyConf, mockClient)
+ cleanupThread(app.kubernetesAppMonitorThread) {
+ app.kubernetesAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+ assert(!app.kubernetesAppMonitorThread.isAlive,
+ "KubernetesAppMonitorThread should terminate after Kubernetes app is finished")
+ verify(mockClient, atLeast(1)).getApplications(any(), anyString(), anyString())
+ verify(mockClient, atLeast(1))
+ .getApplicationReport(eqs(mockApp), anyInt(), anyString())
+ verify(mockListener).appIdKnown(appId)
+ verify(mockListener).infoChanged(AppInfo())
+ verify(mockListener).stateChanged(State.STARTING, State.RUNNING)
+ verify(mockListener).stateChanged(State.RUNNING, State.FINISHED)
+ }
+ }
+ }
+
+ it("should build sparkUiUrl and update AppInfo") {
+ val livyConf = new LivyConf(false)
+ livyConf.set(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT, "30s")
+ Clock.withSleepMethod(mockSleep) {
+ val mockListener = mock[SparkAppListener]
+ val mockApp = initMockApp
+ val mockClient = initMockClient(mockApp)
+ val app = new SparkKubernetesApp(
+ appTag, None, None, Some(mockListener), livyConf, mockClient)
+ cleanupThread(app.kubernetesAppMonitorThread) {
+ app.kubernetesAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+ assert(!app.kubernetesAppMonitorThread.isAlive,
+ "KubernetesAppMonitorThread should terminate after Kubernetes app is finished.")
+ verify(mockListener)
+ .infoChanged(AppInfo(sparkUiUrl = Some(trackingUrl)))
+ verify(mockListener).infoChanged(AppInfo())
+ }
+ }
+ }
+ }
+
+ describe("KubernetesAppReport") {
+
+ import scala.collection.JavaConverters._
+ import KubernetesConstants._
+
+ def driverMock(state: String): Pod = {
+ val status = when(mock[PodStatus].getPhase).thenReturn(state).getMock[PodStatus]
+ when(mock[Pod].getStatus).thenReturn(status).getMock[Pod]
+ }
+
+ def witLabels(pod: Pod, labelMap: Map[String, String]): Pod = {
+ val metaWithLabel = when(mock[ObjectMeta].getLabels).thenReturn(labelMap.asJava)
+ .getMock[ObjectMeta]
+ when(pod.getMetadata).thenReturn(metaWithLabel).getMock[Pod]
+ }
+
+ def report(driver: Option[Pod], livyConf: LivyConf): KubernetesAppReport =
+ KubernetesAppReport(driver, Set.empty, IndexedSeq.empty, livyConf)
+
+ it("should return application state") {
+ val livyConf = new LivyConf(false)
+ val driver = driverMock("State")
+ assertResult("state") {
+ KubernetesAppReport(Some(driver), Set.empty, IndexedSeq.empty, livyConf).getApplicationState
+ }
+ assertResult("unknown") {
+ KubernetesAppReport(None, Set.empty, IndexedSeq.empty, livyConf).getApplicationState
+ }
+ }
+
+ it("should return Spark UI url") {
+
+ def test(expected: Option[String], driver: Option[Pod], livyConf: LivyConf): Unit =
+ assertResult(expected) {
+ report(driver, livyConf).getTrackingUrl
+ }
+
+ val appId = "app_id"
+ val livyConf = new LivyConf(false)
+ livyConf.set(LivyConf.UI_KUBERNETES_SPARK_UI_ENABLED, true)
+ livyConf.set(LivyConf.UI_KUBERNETES_SPARK_UI_LINK_FORMAT, "https://cluser.com/spark/%s")
+
+ test(Some(s"https://cluser.com/spark/$appId"),
+ Some(witLabels(driverMock("Running"), Map(SPARK_APP_ID_LABEL -> appId))), livyConf)
+ test(Some(s"https://cluser.com/spark/unknown"),
+ Some(witLabels(driverMock("Running"), Map.empty)), livyConf)
+ test(None, Some(witLabels(driverMock("State"), Map.empty)), livyConf)
+ test(None, None, livyConf)
+ test(None, Some(witLabels(driverMock("Running"), Map(SPARK_APP_ID_LABEL -> appId))),
+ new LivyConf(false))
+ }
+ }
+
+ describe("KubernetesClientFactory") {
+ it("should build KubernetesApi url from LivyConf master url") {
+ def actual(sparkMaster: String): String =
+ KubernetesClientFactory.sparkMasterToKubernetesApi(sparkMaster)
+
+ val masterUrl = "kubernetes.default.svc:443"
+
+ assertResult(s"https://local")(actual(s"https://local"))
+ assertResult(s"https://$masterUrl")(actual(s"k8s://$masterUrl"))
+ assertResult(s"http://$masterUrl")(actual(s"k8s://http://$masterUrl"))
+ assertResult(s"https://$masterUrl")(actual(s"k8s://https://$masterUrl"))
+ assertResult(s"http://$masterUrl")(actual(s"http://$masterUrl"))
+ assertResult(s"https://$masterUrl")(actual(s"https://$masterUrl"))
+ }
+
+ it("should create KubernetesClient with default configs") {
+ assert(nonNull(KubernetesClientFactory.createKubernetesClient(new LivyConf(false))))
+ }
+
+ it("should throw IllegalArgumentException if both oauth file and token provided") {
+ val conf = new LivyConf(false)
+ .set(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE, "dummy_path")
+ .set(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE, "dummy_value")
+ intercept[IllegalArgumentException] {
+ KubernetesClientFactory.createKubernetesClient(conf)
+ }
+ }
+ }
+}