diff --git a/conf/livy.conf.template b/conf/livy.conf.template index aedc63293..2d5710f24 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -148,12 +148,29 @@ # cause session leakage, so we need to check session leakage. # How long to check livy session leakage # livy.server.yarn.app-leakage.check-timeout = 600s -# how often to check livy session leakage +# How often to check livy session leakage # livy.server.yarn.app-leakage.check-interval = 60s # How often Livy polls YARN to refresh YARN app state. # livy.server.yarn.poll-interval = 5s -# + +# If Livy can't find the Kubernetes app within this time, consider it lost +# livy.server.kubernetes.app-lookup-timeout = 600s +# When the cluster is busy, we may fail to launch Kubernetes app in app-lookup-timeout, then it +# would cause session leakage, so we need to check session leakage +# How long to check livy session leakage +# livy.server.kubernetes.app-leakage.check-timeout = 600s +# How often to check livy session leakage +# livy.server.kubernetes.app-leakage.check-interval = 60s + +# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description +# details, routes, etc...) +# livy.server.kubernetes.poll-interval = 15s + +# Comma-separated list of the Kubernetes namespaces to allow for applications creation. +# All namespaces are allowed if empty +# livy.server.kubernetes.allowedNamespaces = + # Days to keep Livy server request logs. # livy.server.request-log-retain.days = 5 @@ -189,6 +206,21 @@ # livy.server.auth..param. = # livy.server.auth..param. = +# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount +# if deployed to Kubernetes cluster as a Pod) +# Kubernetes oauth token file path +# livy.server.kubernetes.oauthTokenFile = +# Kubernetes oauth token string value +# livy.server.kubernetes.oauthTokenValue = +# Kubernetes CA cert file path +# livy.server.kubernetes.caCertFile = +# Kubernetes client key file path +# livy.server.kubernetes.clientKeyFile = +# Kubernetes client cert file path +# livy.server.kubernetes.clientCertFile = +# Kubernetes client default namespace +# livy.server.kubernetes.defaultNamespace = + # Enable to allow custom classpath by proxy user in cluster mode # The below configuration parameter is disabled by default. # livy.server.session.allow-custom-classpath = true diff --git a/pom.xml b/pom.xml index 46fcd51aa..fb37203fb 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ 2.4.5 2.4.5 ${spark.scala-2.11.version} + 4.6.4 3.0.0 1.9 4.5.13 @@ -319,6 +320,18 @@ ${metrics.version} + + io.fabric8 + kubernetes-client + ${kubernetes.client.version} + + + com.fasterxml.jackson.core + * + + + + io.netty netty-all @@ -1165,6 +1178,8 @@ https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz spark-3.0.0-bin-hadoop2.7 + 4.9.2 + 2.10.3 diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index 4c45956d7..200f5a5f0 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -25,6 +25,7 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Properties; import javax.security.sasl.Sasl; @@ -151,6 +152,12 @@ public String findLocalAddress() throws IOException { return address.getCanonicalHostName(); } + public boolean isRunningOnKubernetes() { + return Optional.ofNullable(get("livy.spark.master")) + .filter(s -> s.startsWith("k8s")) + .isPresent(); + } + private static final Map configsWithAlternatives = Collections.unmodifiableMap(new HashMap() {{ put(RSCConf.Entry.CLIENT_IN_PROCESS.key, DepConf.CLIENT_IN_PROCESS); diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java index b93c5cc71..be609afe2 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java @@ -169,6 +169,13 @@ private void initializeServer() throws Exception { // on the cluster, it would be tricky to solve that problem in a generic way. livyConf.set(RPC_SERVER_ADDRESS, null); + // If we are running on Kubernetes, set RPC_SERVER_ADDRESS from "spark.driver.host" option, + // which is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep: + // line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc" + if (livyConf.isRunningOnKubernetes()) { + livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host")); + } + if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) { // Test flag is turned on so we will just infinite loop here. It should cause // timeout and we should still see yarn application being cleaned up. diff --git a/server/pom.xml b/server/pom.xml index 90a3e5e34..2ee1b3858 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -84,6 +84,11 @@ metrics-healthchecks + + io.fabric8 + kubernetes-client + + javax.servlet javax.servlet-api diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 51179e138..76bbbdb6b 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -251,6 +251,35 @@ object LivyConf { // how often to check livy session leakage val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s") + // Kubernetes oauth token file path. + val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "") + // Kubernetes oauth token string value. + val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "") + // Kubernetes CA cert file path. + val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "") + // Kubernetes client key file path. + val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "") + // Kubernetes client cert file path. + val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "") + // Kubernetes client default namespace. + val KUBERNETES_DEFAULT_NAMESPACE = Entry("livy.server.kubernetes.defaultNamespace", "") + + // Comma-separated list of the Kubernetes namespaces to allow for applications creation. + // All namespaces are allowed if empty. + val KUBERNETES_ALLOWED_NAMESPACES = Entry("livy.server.kubernetes.allowedNamespaces", null) + + // If Livy can't find the Kubernetes app within this time, consider it lost. + val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s") + // How often Livy polls Kubernetes to refresh Kubernetes app state. + val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s") + + // How long to check livy session leakage. + val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT = + Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s") + // How often to check livy session leakage. + val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL = + Entry("livy.server.kubernetes.app-leakage.check-interval", "60s") + // Whether session timeout should be checked, by default it will be checked, which means inactive // session will be stopped after "livy.server.session.timeout" val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) @@ -364,6 +393,15 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return true if spark master starts with yarn. */ def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn") + /** Return true if spark master starts with k8s. */ + def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s") + + /** Return Kubernetes namespace or all if not set. */ + def getKubernetesNamespaces(): Set[String] = + Option(get(KUBERNETES_ALLOWED_NAMESPACES)).filterNot(_.isEmpty) + .map(_.split(",").toSet) + .getOrElse(Set.empty) + /** Return the spark deploy mode Livy sessions should use. */ def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 3e715bdf1..67da94d29 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManag import org.apache.livy.server.ui.UIServlet import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager} import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF +import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp} import org.apache.livy.utils.LivySparkUtils._ -import org.apache.livy.utils.SparkYarnApp class LivyServer extends Logging { @@ -142,10 +142,13 @@ class LivyServer extends Logging { testRecovery(livyConf) - // Initialize YarnClient ASAP to save time. + // Initialize YarnClient or KubernetesClient ASAP to save time. if (livyConf.isRunningOnYarn()) { SparkYarnApp.init(livyConf) Future { SparkYarnApp.yarnClient } + } else if (livyConf.isRunningOnKubernetes()) { + SparkKubernetesApp.init(livyConf) + Future { SparkKubernetesApp.kubernetesClient } } if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") { @@ -415,10 +418,10 @@ class LivyServer extends Logging { } private[livy] def testRecovery(livyConf: LivyConf): Unit = { - if (!livyConf.isRunningOnYarn()) { - // If recovery is turned on but we are not running on YARN, quit. + if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) { + // If recovery is turned on but we are not running on YARN or Kubernetes, quit. require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF, - "Session recovery requires YARN.") + "Session recovery requires YARN or Kubernetes.") } } } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 59a593f68..e101227ef 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -458,7 +458,9 @@ class InteractiveSession( val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) - if (livyConf.isRunningOnYarn() || driverProcess.isDefined) { + if (livyConf.isRunningOnYarn() || driverProcess.isDefined + // Create SparkKubernetesApp anyway to recover app monitoring on Livy server restart + || livyConf.isRunningOnKubernetes()) { Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) } else { None @@ -540,6 +542,8 @@ class InteractiveSession( transition(SessionState.ShuttingDown) sessionStore.remove(RECOVERY_SESSION_TYPE, id) client.foreach { _.stop(true) } + // We need to call #kill here explicitly to delete Interactive pods from the cluster + if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill()) } catch { case _: Exception => app.foreach { diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index 9afe28162..ffe7eecc7 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -71,13 +71,36 @@ object SparkApp { sparkConf ++ Map( SPARK_YARN_TAG_KEY -> mergedYarnTags, "spark.yarn.submit.waitAppCompletion" -> "false") + } else if (livyConf.isRunningOnKubernetes()) { + + // We don't allow to submit applications to the namespaces different from the configured + val kubernetesNamespaces = livyConf.getKubernetesNamespaces() + val targetNamespace = sparkConf.getOrElse("spark.kubernetes.namespace", + SparkKubernetesApp.kubernetesClient.getDefaultNamespace) + if (kubernetesNamespaces.nonEmpty && !kubernetesNamespaces.contains(targetNamespace)) { + throw new IllegalArgumentException( + s"Requested namespace $targetNamespace doesn't match the configured: " + + kubernetesNamespaces.mkString(", ")) + } + + import KubernetesConstants._ + sparkConf ++ Map( + "spark.kubernetes.namespace" -> targetNamespace, + // Mark Spark pods with the unique appTag label to be used for their discovery + s"spark.kubernetes.driver.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag, + s"spark.kubernetes.executor.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag, + // Mark Spark pods as created by Livy for the additional tracing + s"spark.kubernetes.driver.label.$CREATED_BY_ANNOTATION" -> "livy", + s"spark.kubernetes.executor.label.$CREATED_BY_ANNOTATION" -> "livy", + "spark.kubernetes.submission.waitAppCompletion" -> "false") } else { sparkConf } } /** - * Return a SparkApp object to control the underlying Spark application via YARN or spark-submit. + * Return a SparkApp object to control the underlying Spark application via YARN, Kubernetes + * or spark-submit. * * @param uniqueAppTag A tag that can uniquely identify the application. */ @@ -89,8 +112,11 @@ object SparkApp { listener: Option[SparkAppListener]): SparkApp = { if (livyConf.isRunningOnYarn()) { new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf) + } else if (livyConf.isRunningOnKubernetes()) { + new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf) } else { - require(process.isDefined, "process must not be None when Livy master is not YARN.") + require(process.isDefined, "process must not be None when Livy master is not YARN or " + + "Kubernetes.") new SparkProcApp(process.get, listener) } } diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala new file mode 100644 index 000000000..6091705cb --- /dev/null +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.utils + +import java.util.concurrent.TimeoutException + +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client._ + +import org.apache.livy.{LivyConf, Logging, Utils} + +object SparkKubernetesApp extends Logging { + + lazy val kubernetesClient: LivyKubernetesClient = + KubernetesClientFactory.createKubernetesClient(livyConf) + + private val RETRY_BACKOFF_MILLIS = 1000 + private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() + + private val leakedAppsGCThread = new Thread() { + override def run(): Unit = { + while (true) { + if (!leakedAppTags.isEmpty) { + // kill the app if found it or remove it if exceeding a threshold + val leakedApps = leakedAppTags.entrySet().iterator() + val now = System.currentTimeMillis() + try { + val apps = withRetry(kubernetesClient.getApplications()).groupBy(_.getApplicationTag) + while (leakedApps.hasNext) { + val leakedApp = leakedApps.next() + apps.get(leakedApp.getKey) match { + case Some(seq) => + seq.foreach(app => + if (withRetry(kubernetesClient.killApplication(app))) { + leakedApps.remove() + info(s"Killed leaked app with tag ${leakedApp.getKey}") + } else { + warn(s"Leaked app with tag ${leakedApp.getKey} haven't been killed") + } + ) + case None if (leakedApp.getValue - now) > sessionLeakageCheckTimeout => + leakedApps.remove() + warn(s"Leaked app with tag ${leakedApp.getKey} doesn't exist") + } + } + } catch { + case e: KubernetesClientException => + error("Kubernetes client failure", e) + case NonFatal(e) => + error("Failed to remove leaked apps", e) + } + } + Thread.sleep(sessionLeakageCheckInterval) + } + } + } + + private var livyConf: LivyConf = _ + private var sessionLeakageCheckTimeout: Long = _ + private var sessionLeakageCheckInterval: Long = _ + + def init(livyConf: LivyConf): Unit = { + this.livyConf = livyConf + sessionLeakageCheckInterval = + livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL) + sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT) + leakedAppsGCThread.setDaemon(true) + leakedAppsGCThread.setName("LeakedAppsGCThread") + leakedAppsGCThread.start() + } + + // Returning T, throwing the exception on failure + @tailrec + private def withRetry[T](fn: => T, retries: Int = 3): T = { + Try { fn } match { + case Success(x) => x + case _ if retries > 1 => + Thread.sleep(RETRY_BACKOFF_MILLIS) + withRetry(fn, retries - 1) + case Failure(e) => throw e + } + } + + private[utils] def mapKubernetesState( + kubernetesAppState: String, + appTag: String): SparkApp.State.Value = { + import KubernetesApplicationState._ + kubernetesAppState.toLowerCase match { + case PENDING => + SparkApp.State.STARTING + case RUNNING => + SparkApp.State.RUNNING + case SUCCEEDED => + SparkApp.State.FINISHED + case FAILED => + SparkApp.State.FAILED + case other => // any other combination is invalid, so FAIL the application. + error(s"Unknown Kubernetes state $other for app with tag $appTag") + SparkApp.State.FAILED + } + } + + private[utils] object KubernetesApplicationState { + val PENDING = "pending" + val RUNNING = "running" + val SUCCEEDED = "succeeded" + val FAILED = "failed" + } +} + +class SparkKubernetesApp private[utils]( + appTag: String, + appIdOption: Option[String], + process: Option[LineBufferedProcess], + listener: Option[SparkAppListener], + livyConf: LivyConf, + // For unit tests + kubernetesClient: => LivyKubernetesClient = SparkKubernetesApp.kubernetesClient) + extends SparkApp with Logging { + + import SparkKubernetesApp._ + + private val appLookupTimeout = + livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds + private val cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) + private val pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds + + private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String] + private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] + + private var state: SparkApp.State = SparkApp.State.STARTING + private val appPromise: Promise[KubernetesApplication] = Promise() + + private[utils] val kubernetesAppMonitorThread = Utils + .startDaemonThread(s"kubernetesAppMonitorThread-$this") { + try { + val app = try { + getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow) + } catch { + case e: Exception => + appPromise.failure(e) + throw e + } + appPromise.success(app) + val appId = app.getApplicationId + + Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appTag") + listener.foreach(_.appIdKnown(appId)) + + while (isRunning) { + val appReport = withRetry(kubernetesClient.getApplicationReport(app, cacheLogSize)) + kubernetesAppLog = appReport.getApplicationLog + kubernetesDiagnostics = appReport.getApplicationDiagnostics + changeState(mapKubernetesState(appReport.getApplicationState, appTag)) + + Clock.sleep(pollInterval.toMillis) + } + debug(s"Application $appId is in state $state\nDiagnostics:" + + s"\n${kubernetesDiagnostics.mkString("\n")}") + } catch { + case _: InterruptedException => + kubernetesDiagnostics = ArrayBuffer("Application stopped by user.") + changeState(SparkApp.State.KILLED) + case NonFatal(e) => + error("Couldn't refresh Kubernetes state", e) + kubernetesDiagnostics = ArrayBuffer(e.getMessage) + changeState(SparkApp.State.FAILED) + } finally { + info(s"Finished monitoring application $appTag with state $state") + } + } + + override def log(): IndexedSeq[String] = { + ("stdout: " +: kubernetesAppLog) ++ + ("\nstderr: " +: (process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++ + process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++ + ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics) + } + + override def kill(): Unit = { + try { + withRetry { + kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout)) + } + } catch { + // We cannot kill the Kubernetes app without the appTag. + // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure. + // We don't want a stuck session that can't be deleted. Emit a warning and move on. + case _: TimeoutException | _: InterruptedException => + warn("Attempted to delete a session while its Kubernetes application is not found.") + kubernetesAppMonitorThread.interrupt() + } finally { + process.foreach(_.destroy()) + } + } + + private def isRunning: Boolean = { + state != SparkApp.State.FAILED && + state != SparkApp.State.FINISHED && + state != SparkApp.State.KILLED + } + + private def changeState(newState: SparkApp.State.Value): Unit = { + if (state != newState) { + listener.foreach(_.stateChanged(state, newState)) + state = newState + } + } + + /** + * Find the corresponding KubernetesApplication from an application tag. + * + * @param appTag The application tag tagged on the target application. + * If the tag is not unique, it returns the first application it found. + * @return KubernetesApplication or the failure. + */ + @tailrec + private def getAppFromTag( + appTag: String, + pollInterval: Duration, + deadline: Deadline): KubernetesApplication = { + withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag))) + match { + case Some(app) => app + case None => + if (deadline.isOverdue) { + process.foreach(_.destroy()) + leakedAppTags.put(appTag, System.currentTimeMillis()) + throw new IllegalStateException("No Kubernetes application is found with tag" + + s" $appTag in ${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" + + " seconds. This may be because 1) spark-submit fail to submit application to " + + "Kubernetes; or 2) Kubernetes cluster doesn't have enough resources to start the " + + "application in time. Please check Livy log and Kubernetes log to know the details.") + } else { + Clock.sleep(pollInterval.toMillis) + getAppFromTag(appTag, pollInterval, deadline) + } + } + } +} + +object KubernetesConstants { + val CREATED_BY_ANNOTATION = "created-by" + + val SPARK_APP_ID_LABEL = "spark-app-selector" + val SPARK_APP_TAG_LABEL = "spark-app-tag" + val SPARK_ROLE_LABEL = "spark-role" + val SPARK_EXEC_ID_LABEL = "spark-exec-id" + + val SPARK_ROLE_DRIVER = "driver" + val SPARK_ROLE_EXECUTOR = "executor" +} + +class KubernetesApplication(driverPod: Pod) { + + import KubernetesConstants._ + + private val appTag = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL) + private val appId = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) + private val namespace = driverPod.getMetadata.getNamespace + + def getApplicationTag: String = appTag + + def getApplicationId: String = appId + + def getApplicationNamespace: String = namespace + + def getApplicationPod: Pod = driverPod +} + +private[utils] case class KubernetesAppReport( + driver: Option[Pod], + executors: Seq[Pod], + appLog: IndexedSeq[String]) { + + def getApplicationState: String = { + driver.map(_.getStatus.getPhase.toLowerCase).getOrElse("unknown") + } + + def getApplicationLog: IndexedSeq[String] = appLog + + def getApplicationDiagnostics: IndexedSeq[String] = { + (Seq(driver) ++ executors.sortBy(_.getMetadata.getName).map(Some(_))) + .filter(_.nonEmpty) + .map(buildSparkPodDiagnosticsPrettyString) + .flatMap(_.split("\n")).toIndexedSeq + } + + private def buildSparkPodDiagnosticsPrettyString(podOption: Option[Pod]): String = { + import scala.collection.JavaConverters._ + def printMap(map: Map[_, _]): String = { + map.map { + case (key, value) => s"$key=$value" + }.mkString(", ") + } + + if (podOption.isEmpty) return "unknown" + val pod = podOption.get + + s"${pod.getMetadata.getName}.${pod.getMetadata.getNamespace}:" + + s"\n\tnode: ${pod.getSpec.getNodeName}" + + s"\n\thostname: ${pod.getSpec.getHostname}" + + s"\n\tpodIp: ${pod.getStatus.getPodIP}" + + s"\n\tstartTime: ${pod.getStatus.getStartTime}" + + s"\n\tphase: ${pod.getStatus.getPhase}" + + s"\n\treason: ${pod.getStatus.getReason}" + + s"\n\tmessage: ${pod.getStatus.getMessage}" + + s"\n\tlabels: ${printMap(pod.getMetadata.getLabels.asScala.toMap)}" + + s"\n\tcontainers:" + + s"\n\t\t${ + pod.getSpec.getContainers.asScala.map(container => + s"${container.getName}:" + + s"\n\t\t\timage: ${container.getImage}" + + s"\n\t\t\trequests: ${printMap(container.getResources.getRequests.asScala.toMap)}" + + s"\n\t\t\tlimits: ${printMap(container.getResources.getLimits.asScala.toMap)}" + + s"\n\t\t\tcommand: ${container.getCommand} ${container.getArgs}" + ).mkString("\n\t\t") + }" + + s"\n\tconditions:" + + s"\n\t\t${pod.getStatus.getConditions.asScala.mkString("\n\t\t")}" + } +} + +private[utils] class LivyKubernetesClient( + client: DefaultKubernetesClient, namespaces: Set[String] = Set.empty) { + + import KubernetesConstants._ + import scala.collection.JavaConverters._ + + def getApplications( + labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER), + appTagLabel: String = SPARK_APP_TAG_LABEL, + appIdLabel: String = SPARK_APP_ID_LABEL): Seq[KubernetesApplication] = { + Option(namespaces).filter(_.nonEmpty) + .map(_.map(client.inNamespace)) + .getOrElse(Seq(client.inAnyNamespace())) + .map(_.pods + .withLabels(labels.asJava) + .withLabel(appTagLabel) + .withLabel(appIdLabel) + .list.getItems.asScala.map(new KubernetesApplication(_))) + .reduce(_ ++ _) + } + + def killApplication(app: KubernetesApplication): Boolean = { + client.inNamespace(app.getApplicationNamespace).pods.delete(app.getApplicationPod) + } + + def getApplicationReport( + app: KubernetesApplication, + cacheLogSize: Int, + appTagLabel: String = SPARK_APP_TAG_LABEL): KubernetesAppReport = { + val pods = client.inNamespace(app.getApplicationNamespace).pods + .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava) + .list.getItems.asScala + val driver = pods.find(isDriver) + val executors = pods.filter(isExecutor) + val appLog = getApplicationLog(app, cacheLogSize) + KubernetesAppReport(driver, executors, appLog) + } + + private def getApplicationLog( + app: KubernetesApplication, cacheLogSize: Int): IndexedSeq[String] = { + Try( + client.inNamespace(app.getApplicationNamespace).pods + .withName(app.getApplicationPod.getMetadata.getName) + .tailingLines(cacheLogSize).getLog.split("\n").toIndexedSeq + ).getOrElse(IndexedSeq.empty) + } + + private def isDriver: Pod => Boolean = { + _.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER + } + + private def isExecutor: Pod => Boolean = { + _.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR + } + + def getDefaultNamespace: String = client.getNamespace +} + +private[utils] object KubernetesClientFactory { + + import java.io.File + + import com.google.common.base.Charsets + import com.google.common.io.Files + import io.fabric8.kubernetes.client.ConfigBuilder + + def createKubernetesClient(livyConf: LivyConf): LivyKubernetesClient = { + val masterUrl = sparkMasterToKubernetesApi(livyConf.sparkMaster()) + + val oauthTokenFile = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE).toOption + val oauthTokenValue = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE).toOption + require(oauthTokenFile.isEmpty || oauthTokenValue.isEmpty, + "Cannot specify OAuth token through both " + + s"a file $oauthTokenFile and a value $oauthTokenValue.") + + val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption + val clientKeyFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption + val clientCertFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_CERT_FILE).toOption + val clientNamespace = livyConf.get(LivyConf.KUBERNETES_DEFAULT_NAMESPACE).toOption + + val config = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(masterUrl) + .withOption(oauthTokenValue) { + (token, builder) => builder.withOauthToken(token) + } + .withOption(oauthTokenFile) { + (file, builder) => builder.withOauthToken(Files.toString(new File(file), Charsets.UTF_8)) + } + .withOption(caCertFile) { + (file, builder) => builder.withCaCertFile(file) + } + .withOption(clientKeyFile) { + (file, builder) => builder.withClientKeyFile(file) + } + .withOption(clientCertFile) { + (file, builder) => builder.withClientCertFile(file) + } + .withOption(clientNamespace) { + (namespace, builder) => builder.withNamespace(namespace) + } + .build() + new LivyKubernetesClient( + new DefaultKubernetesClient(config), livyConf.getKubernetesNamespaces()) + } + + private[utils] def sparkMasterToKubernetesApi(sparkMaster: String): String = { + val replaced = sparkMaster.replaceFirst("k8s://", "") + if (!replaced.startsWith("http")) s"https://$replaced" + else replaced + } + + implicit class OptionString(val string: String) extends AnyVal { + def toOption: Option[String] = { + if (string == null || string.trim.isEmpty) None + else Some(string) + } + } + + private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder) + extends AnyVal { + + def withOption[T](option: Option[T]) + (configurator: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder = { + option.map(configurator(_, configBuilder)).getOrElse(configBuilder) + } + } +} diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala new file mode 100644 index 000000000..87fb36afb --- /dev/null +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.utils + +import java.util.Objects._ +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.duration._ + +import io.fabric8.kubernetes.api.model._ +import org.mockito.Matchers.{eq => eqs, _} +import org.mockito.Mockito.{atLeast, verify, when} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.FunSpec +import org.scalatest.mock.MockitoSugar.mock + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.utils.SparkApp.State + +class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { + + private def cleanupThread(t: Thread)(f: => Unit) = { + try { f } finally { t.interrupt() } + } + + private def mockSleep(ms: Long) = { + Thread.`yield`() + } + + describe("SparkKubernetesApp") { + val TEST_TIMEOUT = 30 seconds + val appId = "app_id" + val appTag = "app_tag" + val livyConf = new LivyConf(false) + livyConf.set(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT, "30s") + + it("should poll Kubernetes state and terminate") { + Clock.withSleepMethod(mockSleep) { + val mockApp = mock[KubernetesApplication] + when(mockApp.getApplicationId).thenReturn(appId) + when(mockApp.getApplicationTag).thenReturn(appTag) + val mockAppReport = mock[KubernetesAppReport] + when(mockAppReport.getApplicationLog).thenReturn(IndexedSeq("app", "log")) + when(mockAppReport.getApplicationDiagnostics).thenReturn(IndexedSeq("app", "diagnostics")) + val mockClient = mock[LivyKubernetesClient] + when(mockClient.getApplications(any(), any(), any())).thenReturn(Seq(mockApp)) + when(mockClient.getApplicationReport(eqs(mockApp), any(), any())).thenReturn(mockAppReport) + val mockListener = mock[SparkAppListener] + + // Simulate Kubernetes app state progression. + import SparkKubernetesApp.KubernetesApplicationState._ + val applicationStateList = List( + PENDING, + RUNNING, + SUCCEEDED + ) + val stateIndex = new AtomicInteger(0) + when(mockAppReport.getApplicationState).thenAnswer( + new Answer[String] { + override def answer(inv: InvocationOnMock): String = { + stateIndex.getAndIncrement() match { + case i if i < applicationStateList.size => + applicationStateList(i) + case _ => + applicationStateList.last + } + } + } + ) + + val app = new SparkKubernetesApp( + appTag, None, None, Some(mockListener), livyConf, mockClient) + + cleanupThread(app.kubernetesAppMonitorThread) { + app.kubernetesAppMonitorThread.join(TEST_TIMEOUT.toMillis) + assert(!app.kubernetesAppMonitorThread.isAlive, + "KubernetesAppMonitorThread should terminate after Kubernetes app is finished") + verify(mockClient, atLeast(1)).getApplications(any(), anyString(), anyString()) + verify(mockClient, atLeast(1)) + .getApplicationReport(eqs(mockApp), anyInt(), anyString()) + verify(mockListener).appIdKnown(appId) + verify(mockListener).stateChanged(State.STARTING, State.RUNNING) + verify(mockListener).stateChanged(State.RUNNING, State.FINISHED) + } + } + } + } + + describe("KubernetesAppReport") { + it("should return application state") { + val status = when(mock[PodStatus].getPhase).thenReturn("Status").getMock[PodStatus] + val driver = when(mock[Pod].getStatus).thenReturn(status).getMock[Pod] + assertResult("status") { + KubernetesAppReport(Some(driver), Seq.empty, IndexedSeq.empty).getApplicationState + } + assertResult("unknown") { + KubernetesAppReport(None, Seq.empty, IndexedSeq.empty).getApplicationState + } + } + } + + describe("KubernetesClientFactory") { + it("should build KubernetesApi url from LivyConf master url") { + def actual(sparkMaster: String): String = + KubernetesClientFactory.sparkMasterToKubernetesApi(sparkMaster) + + val masterUrl = "kubernetes.default.svc:443" + + assertResult(s"https://local")(actual(s"https://local")) + assertResult(s"https://$masterUrl")(actual(s"k8s://$masterUrl")) + assertResult(s"http://$masterUrl")(actual(s"k8s://http://$masterUrl")) + assertResult(s"https://$masterUrl")(actual(s"k8s://https://$masterUrl")) + assertResult(s"http://$masterUrl")(actual(s"http://$masterUrl")) + assertResult(s"https://$masterUrl")(actual(s"https://$masterUrl")) + } + + it("should create KubernetesClient with default configs") { + assert(nonNull(KubernetesClientFactory.createKubernetesClient(new LivyConf(false)))) + } + + it("should throw IllegalArgumentException if both oauth file and token provided") { + val conf = new LivyConf(false) + .set(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE, "dummy_path") + .set(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE, "dummy_value") + intercept[IllegalArgumentException] { + KubernetesClientFactory.createKubernetesClient(conf) + } + } + } +}