From f06df4755a18f7da1442ad6abf8cbe1b335a9270 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 27 Oct 2019 18:36:54 +0100 Subject: [PATCH 01/11] Implement base kubernetes fetures. Signed-off-by: Aliaksandr Sasnouskikh --- conf/livy.conf.template | 34 +- .../java/org/apache/livy/rsc/RSCConf.java | 7 + .../org/apache/livy/rsc/driver/RSCDriver.java | 7 + server/pom.xml | 5 + .../main/scala/org/apache/livy/LivyConf.scala | 36 ++ .../org/apache/livy/server/LivyServer.scala | 13 +- .../interactive/InteractiveSession.scala | 10 +- .../org/apache/livy/utils/SparkApp.scala | 30 +- .../livy/utils/SparkKubernetesApp.scala | 463 ++++++++++++++++++ .../livy/utils/SparkKubernetesAppSpec.scala | 71 +++ 10 files changed, 666 insertions(+), 10 deletions(-) create mode 100644 server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala create mode 100644 server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala diff --git a/conf/livy.conf.template b/conf/livy.conf.template index de7c24823..e7b3aea0e 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -125,12 +125,29 @@ # cause session leakage, so we need to check session leakage. # How long to check livy session leakage # livy.server.yarn.app-leakage.check-timeout = 600s -# how often to check livy session leakage +# How often to check livy session leakage # livy.server.yarn.app-leakage.check-interval = 60s # How often Livy polls YARN to refresh YARN app state. # livy.server.yarn.poll-interval = 5s -# + +# If Livy can't find the Kubernetes app within this time, consider it lost. +# livy.server.kubernetes.app-lookup-timeout = 600s +# When the cluster is busy, we may fail to launch Kubernetes app in app-lookup-timeout, then it +# would cause session leakage, so we need to check session leakage. +# How long to check livy session leakage +# livy.server.kubernetes.app-leakage.check-timeout = 600s +# How often to check livy session leakage +# livy.server.kubernetes.app-leakage.check-interval = 60s + +# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description +# details, routes, etc...) +# livy.server.kubernetes.poll-interval = 15s + +# Comma-separated list of the Kubernetes namespaces to allow for applications creation. +# All namespaces are allowed if empty. +# livy.server.kubernetes.allowedNamespaces = + # Days to keep Livy server request logs. # livy.server.request-log-retain.days = 5 @@ -165,3 +182,16 @@ # livy.server.auth..class = # livy.server.auth..param. = # livy.server.auth..param. = + +# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount +# if deployed to Kubernetes cluster as a Pod) +# Kubernetes oauth token file path +# livy.server.kubernetes.oauthTokenFile = +# Kubernetes oauth token string value +# livy.server.kubernetes.oauthTokenValue = +# Kubernetes CA cert file path +# livy.server.kubernetes.caCertFile = +# Kubernetes client key file path +# livy.server.kubernetes.clientKeyFile = +# Kubernetes client cert file path +# livy.server.kubernetes.clientCertFile = diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index 4c45956d7..200f5a5f0 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -25,6 +25,7 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Properties; import javax.security.sasl.Sasl; @@ -151,6 +152,12 @@ public String findLocalAddress() throws IOException { return address.getCanonicalHostName(); } + public boolean isRunningOnKubernetes() { + return Optional.ofNullable(get("livy.spark.master")) + .filter(s -> s.startsWith("k8s")) + .isPresent(); + } + private static final Map configsWithAlternatives = Collections.unmodifiableMap(new HashMap() {{ put(RSCConf.Entry.CLIENT_IN_PROCESS.key, DepConf.CLIENT_IN_PROCESS); diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java index 0d8eec538..7616575d9 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java @@ -169,6 +169,13 @@ private void initializeServer() throws Exception { // on the cluster, it would be tricky to solve that problem in a generic way. livyConf.set(RPC_SERVER_ADDRESS, null); + // If we are running on Kubernetes, set RPC_SERVER_ADDRESS from "spark.driver.host" option, + // which is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep: + // line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc" + if (livyConf.isRunningOnKubernetes()) { + livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host")); + } + if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) { // Test flag is turned on so we will just infinite loop here. It should cause // timeout and we should still see yarn application being cleaned up. diff --git a/server/pom.xml b/server/pom.xml index 8e797f2b1..8eb11d311 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -84,6 +84,11 @@ metrics-healthchecks + + io.fabric8 + kubernetes-client + + javax.servlet javax.servlet-api diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 0a52d5166..ccf07284b 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -220,6 +220,33 @@ object LivyConf { // how often to check livy session leakage val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s") + // Kubernetes oauth token file path. + val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "") + // Kubernetes oauth token string value. + val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "") + // Kubernetes CA cert file path. + val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "") + // Kubernetes client key file path. + val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "") + // Kubernetes client cert file path. + val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "") + + // Comma-separated list of the Kubernetes namespaces to allow for applications creation. + // All namespaces are allowed if empty. + val KUBERNETES_ALLOWED_NAMESPACES = Entry("livy.server.kubernetes.allowedNamespaces", null) + + // If Livy can't find the Kubernetes app within this time, consider it lost. + val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s") + // How often Livy polls Kubernetes to refresh Kubernetes app state. + val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s") + + // How long to check livy session leakage. + val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT = + Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s") + // How often to check livy session leakage. + val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL = + Entry("livy.server.kubernetes.app-leakage.check-interval", "60s") + // Whether session timeout should be checked, by default it will be checked, which means inactive // session will be stopped after "livy.server.session.timeout" val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) @@ -331,6 +358,15 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return true if spark master starts with yarn. */ def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn") + /** Return true if spark master starts with k8s. */ + def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s") + + /** Return Kubernetes namespace or all if not set. */ + def getKubernetesNamespaces(): Set[String] = + Option(get(KUBERNETES_ALLOWED_NAMESPACES)).filterNot(_.isEmpty) + .map(_.split(",").toSet) + .getOrElse(Set.empty) + /** Return the spark deploy mode Livy sessions should use. */ def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index b40a20e50..ce405d2a9 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore} import org.apache.livy.server.ui.UIServlet import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager} import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF +import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp} import org.apache.livy.utils.LivySparkUtils._ -import org.apache.livy.utils.SparkYarnApp class LivyServer extends Logging { @@ -140,10 +140,13 @@ class LivyServer extends Logging { testRecovery(livyConf) - // Initialize YarnClient ASAP to save time. + // Initialize YarnClient or KubernetesClient ASAP to save time. if (livyConf.isRunningOnYarn()) { SparkYarnApp.init(livyConf) Future { SparkYarnApp.yarnClient } + } else if (livyConf.isRunningOnKubernetes()) { + SparkKubernetesApp.init(livyConf) + Future { SparkKubernetesApp.kubernetesClient } } StateStore.init(livyConf) @@ -407,10 +410,10 @@ class LivyServer extends Logging { } private[livy] def testRecovery(livyConf: LivyConf): Unit = { - if (!livyConf.isRunningOnYarn()) { - // If recovery is turned on but we are not running on YARN, quit. + if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) { + // If recovery is turned on but we are not running on YARN or Kubernetes, quit. require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF, - "Session recovery requires YARN.") + "Session recovery requires YARN or Kubernetes.") } } } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index cdedddabb..988b9b4e9 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -399,7 +399,13 @@ class InteractiveSession( app = mockApp.orElse { val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) - driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)) } + driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + .orElse { + if (livyConf.isRunningOnKubernetes()) { + // Create SparkKubernetesApp anyway to recover app monitoring on Livy server restart + Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + } else None + } } if (client.isEmpty) { @@ -474,6 +480,8 @@ class InteractiveSession( transition(SessionState.ShuttingDown) sessionStore.remove(RECOVERY_SESSION_TYPE, id) client.foreach { _.stop(true) } + // We need to call #kill here explicitly to delete Interactive pods from the cluster + if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill()) } catch { case _: Exception => app.foreach { diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index 9afe28162..ac74e585d 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -71,13 +71,36 @@ object SparkApp { sparkConf ++ Map( SPARK_YARN_TAG_KEY -> mergedYarnTags, "spark.yarn.submit.waitAppCompletion" -> "false") + } else if (livyConf.isRunningOnKubernetes()) { + + // We don't allow to submit applications to the namespaces different from the configured + val kubernetesNamespaces = livyConf.getKubernetesNamespaces() + val targetNamespace = sparkConf.getOrElse("spark.kubernetes.namespace", + SparkKubernetesApp.kubernetesClient.getDefaultNamespace) + if (kubernetesNamespaces.nonEmpty && !kubernetesNamespaces.contains(targetNamespace)) { + throw new IllegalArgumentException( + s"Requested namespace $targetNamespace doesn't match the configured: " + + s"${kubernetesNamespaces.mkString(", ")}") + } + + import KubernetesConstants._ + sparkConf ++ Map( + "spark.kubernetes.namespace" -> targetNamespace, + // Mark Spark pods with the unique appTag label to be used for their discovery + s"spark.kubernetes.driver.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag, + s"spark.kubernetes.executor.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag, + // Mark Spark pods as created by Livy for the additional tracing + s"spark.kubernetes.driver.label.$CREATED_BY_ANNOTATION" -> "livy", + s"spark.kubernetes.executor.label.$CREATED_BY_ANNOTATION" -> "livy", + "spark.kubernetes.submission.waitAppCompletion" -> "false") } else { sparkConf } } /** - * Return a SparkApp object to control the underlying Spark application via YARN or spark-submit. + * Return a SparkApp object to control the underlying Spark application via YARN, Kubernetes + * or spark-submit. * * @param uniqueAppTag A tag that can uniquely identify the application. */ @@ -89,8 +112,11 @@ object SparkApp { listener: Option[SparkAppListener]): SparkApp = { if (livyConf.isRunningOnYarn()) { new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf) + } else if (livyConf.isRunningOnKubernetes()) { + new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf) } else { - require(process.isDefined, "process must not be None when Livy master is not YARN.") + require(process.isDefined, "process must not be None when Livy master is not YARN or " + + "Kubernetes.") new SparkProcApp(process.get, listener) } } diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala new file mode 100644 index 000000000..6c8fb0b10 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.utils + +import java.util.concurrent.TimeoutException + +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client._ + +import org.apache.livy.{LivyConf, Logging, Utils} + +object SparkKubernetesApp extends Logging { + + private val RETRY_BACKOFF_MILLIS = 1000 + + lazy val kubernetesClient: LivyKubernetesClient = + KubernetesClientFactory.createKubernetesClient(livyConf) + + private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() + + private val leakedAppsGCThread = new Thread() { + override def run(): Unit = { + while (true) { + if (!leakedAppTags.isEmpty) { + // kill the app if found it or remove it if exceeding a threshold + val leakedApps = leakedAppTags.entrySet().iterator() + val now = System.currentTimeMillis() + val apps = withRetry(kubernetesClient.getApplications()).groupBy(_.getApplicationTag) + while (leakedApps.hasNext) { + val leakedApp = leakedApps.next() + apps.get(leakedApp.getKey) match { + case Some(seq) => + seq.foreach(app => + if (withRetry(kubernetesClient.killApplication(app))) { + leakedApps.remove() + info(s"Killed leaked app with tag ${leakedApp.getKey}") + } + ) + case None if (leakedApp.getValue - now) > sessionLeakageCheckTimeout => + leakedApps.remove() + info(s"Remove leaked Kubernetes app tag ${leakedApp.getKey}") + } + } + } + Thread.sleep(sessionLeakageCheckInterval) + } + } + } + + private var livyConf: LivyConf = _ + + private var cacheLogSize: Int = _ + private var appLookupTimeout: FiniteDuration = _ + private var pollInterval: FiniteDuration = _ + + private var sessionLeakageCheckTimeout: Long = _ + private var sessionLeakageCheckInterval: Long = _ + + def init(livyConf: LivyConf): Unit = { + this.livyConf = livyConf + + cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) + appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds + pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds + + sessionLeakageCheckInterval = + livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL) + sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT) + + leakedAppsGCThread.setDaemon(true) + leakedAppsGCThread.setName("LeakedAppsGCThread") + leakedAppsGCThread.start() + } + + // Returning T, throwing the exception on failure + @tailrec + private def withRetry[T](fn: => T, retries: Int = 3): T = + Try { fn } match { + case Success(x) => x + case _ if retries > 1 => + Thread.sleep(RETRY_BACKOFF_MILLIS) + withRetry(fn, retries - 1) + case Failure(e) => throw e + } +} + +class SparkKubernetesApp private[utils]( + appTag: String, + appIdOption: Option[String], + process: Option[LineBufferedProcess], + listener: Option[SparkAppListener], + livyConf: LivyConf, + kubernetesClient: => LivyKubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit tests + extends SparkApp with Logging { + + import SparkKubernetesApp._ + + private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String] + private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] + + private var state: SparkApp.State = SparkApp.State.STARTING + private val appPromise: Promise[KubernetesApplication] = Promise() + + private val kubernetesAppMonitorThread = Utils + .startDaemonThread(s"kubernetesAppMonitorThread-$this") { + try { + val app = try { + getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow) + } catch { + case e: Exception => + appPromise.failure(e) + throw e + } + appPromise.success(app) + val appId = app.getApplicationId + + Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId") + listener.foreach(_.appIdKnown(appId)) + + while (isRunning) { + Clock.sleep(pollInterval.toMillis) + + val appReport = withRetry(kubernetesClient.getApplicationReport(app, cacheLogSize)) + kubernetesAppLog = appReport.getApplicationLog + kubernetesDiagnostics = appReport.getApplicationDiagnostics + changeState(mapKubernetesState(appReport.getApplicationState, appTag)) + } + debug(s"Application $appId is in state $state\nDiagnostics:\n${kubernetesDiagnostics.mkString("\n")}") + } catch { + case _: InterruptedException => + kubernetesDiagnostics = ArrayBuffer("Application stopped by user.") + changeState(SparkApp.State.KILLED) + case NonFatal(e) => + error("Error while refreshing Kubernetes state", e) + kubernetesDiagnostics = ArrayBuffer(e.getMessage) + changeState(SparkApp.State.FAILED) + } finally { + listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = None))) + } + } + + override def log(): IndexedSeq[String] = + ("stdout: " +: kubernetesAppLog) ++ + ("\nstderr: " +: (process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++ + process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++ + ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics) + + override def kill(): Unit = + try { + withRetry { + kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout)) + } + } catch { + // We cannot kill the Kubernetes app without the appTag. + // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure. + // We don't want a stuck session that can't be deleted. Emit a warning and move on. + case _: TimeoutException | _: InterruptedException => + warn("Deleting a session while its Kubernetes application is not found.") + kubernetesAppMonitorThread.interrupt() + } finally { + process.foreach(_.destroy()) + } + + private def isRunning: Boolean = { + state != SparkApp.State.FAILED && + state != SparkApp.State.FINISHED && + state != SparkApp.State.KILLED + } + + private def changeState(newState: SparkApp.State.Value): Unit = { + if (state != newState) { + listener.foreach(_.stateChanged(state, newState)) + state = newState + } + } + + /** + * Find the corresponding KubernetesApplication from an application tag. + * + * @param appTag The application tag tagged on the target application. + * If the tag is not unique, it returns the first application it found. + * @return KubernetesApplication or the failure. + */ + @tailrec + private def getAppFromTag( + appTag: String, + pollInterval: Duration, + deadline: Deadline): KubernetesApplication = { + withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag))) + match { + case Some(app) => app + case None => + if (deadline.isOverdue) { + process.foreach(_.destroy()) + leakedAppTags.put(appTag, System.currentTimeMillis()) + throw new IllegalStateException(s"No Kubernetes application is found with tag" + + s" $appTag in ${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" + + " seconds. This may be because 1) spark-submit fail to submit application to " + + "Kubernetes; or 2) Kubernetes cluster doesn't have enough resources to start the " + + "application in time. Please check Livy log and Kubernetes log to know the details.") + } else { + Clock.sleep(pollInterval.toMillis) + getAppFromTag(appTag, pollInterval, deadline) + } + } + } + + private def mapKubernetesState( + kubernetesAppState: String, + appTag: String): SparkApp.State.Value = { + import KubernetesApplicationState._ + kubernetesAppState.toLowerCase match { + case PENDING | CONTAINER_CREATING => + SparkApp.State.STARTING + case RUNNING => + SparkApp.State.RUNNING + case COMPLETED | SUCCEEDED => + SparkApp.State.FINISHED + case FAILED | ERROR => + SparkApp.State.FAILED + case other => // any other combination is invalid, so FAIL the application. + error(s"Unknown Kubernetes state $other for app with tag $appTag.") + SparkApp.State.FAILED + } + } +} + +object KubernetesApplicationState { + val PENDING = "pending" + val CONTAINER_CREATING = "containercreating" + val RUNNING = "running" + val COMPLETED = "completed" + val SUCCEEDED = "succeeded" + val FAILED = "failed" + val ERROR = "error" +} + +object KubernetesConstants { + val REQUESTED_BY_ANNOTATION = "requested-by" + val CREATED_BY_ANNOTATION = "created-by" + + val SPARK_APP_ID_LABEL = "spark-app-selector" + val SPARK_APP_TAG_LABEL = "spark-app-tag" + val SPARK_ROLE_LABEL = "spark-role" + val SPARK_EXEC_ID_LABEL = "spark-exec-id" + + val SPARK_ROLE_DRIVER = "driver" + val SPARK_ROLE_EXECUTOR = "executor" +} + +class KubernetesApplication(driverPod: Pod) { + + import KubernetesConstants._ + + private val appTag = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL) + private val appId = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) + private val namespace = driverPod.getMetadata.getNamespace + + def getApplicationTag: String = appTag + + def getApplicationId: String = appId + + def getApplicationNamespace: String = namespace + + def getApplicationPod: Pod = driverPod +} + +private[utils] case class KubernetesAppReport( + driver: Option[Pod], + executors: Seq[Pod], + appLog: IndexedSeq[String]) { + + def getApplicationState: String = + driver.map(_.getStatus.getPhase.toLowerCase).getOrElse("unknown") + + def getApplicationLog: IndexedSeq[String] = appLog + + def getApplicationDiagnostics: IndexedSeq[String] = { + (Seq(driver) ++ executors.sortBy(_.getMetadata.getName).map(Some(_))) + .filter(_.nonEmpty) + .map(buildSparkPodDiagnosticsPrettyString) + .flatMap(_.split("\n")).toIndexedSeq + } + + private def buildSparkPodDiagnosticsPrettyString(podOption: Option[Pod]): String = { + import scala.collection.JavaConverters._ + def printMap(map: Map[_, _]): String = map.map { + case (key, value) => s"$key=$value" + }.mkString(", ") + + if (podOption.isEmpty) return "unknown" + val pod = podOption.get + + s"${pod.getMetadata.getName}.${pod.getMetadata.getNamespace}:" + + s"\n\tnode: ${pod.getSpec.getNodeName}" + + s"\n\thostname: ${pod.getSpec.getHostname}" + + s"\n\tpodIp: ${pod.getStatus.getPodIP}" + + s"\n\tstartTime: ${pod.getStatus.getStartTime}" + + s"\n\tphase: ${pod.getStatus.getPhase}" + + s"\n\treason: ${pod.getStatus.getReason}" + + s"\n\tmessage: ${pod.getStatus.getMessage}" + + s"\n\tlabels: ${printMap(pod.getMetadata.getLabels.asScala.toMap)}" + + s"\n\tcontainers:" + + s"\n\t\t${ + pod.getSpec.getContainers.asScala.map(container => + s"${container.getName}:" + + s"\n\t\t\timage: ${container.getImage}" + + s"\n\t\t\trequests: ${printMap(container.getResources.getRequests.asScala.toMap)}" + + s"\n\t\t\tlimits: ${printMap(container.getResources.getLimits.asScala.toMap)}" + + s"\n\t\t\tcommand: ${container.getCommand} ${container.getArgs}" + ).mkString("\n\t\t") + }" + + s"\n\tconditions:" + + s"\n\t\t${pod.getStatus.getConditions.asScala.mkString("\n\t\t")}" + } +} + +private[utils] class LivyKubernetesClient( + client: NamespacedKubernetesClient, namespaces: Set[String] = Set.empty) { + + import KubernetesConstants._ + import scala.collection.JavaConverters._ + + def getApplications( + labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER), + appTagLabel: String = SPARK_APP_TAG_LABEL, + appIdLabel: String = SPARK_APP_ID_LABEL + ): Seq[KubernetesApplication] = + Option(namespaces).filter(_.nonEmpty) + .map(_.map(client.inNamespace)) + .getOrElse(Seq(client.inAnyNamespace())) + .map(_.pods + .withLabels(labels.asJava) + .withLabel(appTagLabel) + .withLabel(appIdLabel) + .list.getItems.asScala.map(new KubernetesApplication(_))) + .reduce(_ ++ _) + + def killApplication(app: KubernetesApplication): Boolean = + client.inNamespace(app.getApplicationNamespace).pods.delete(app.getApplicationPod) + + def getApplicationReport( + app: KubernetesApplication, + cacheLogSize: Int, + appTagLabel: String = SPARK_APP_TAG_LABEL + ): KubernetesAppReport = { + val pods = client.inNamespace(app.getApplicationNamespace).pods + .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava) + .list.getItems.asScala + val driver = pods.find(isDriver) + val executors = pods.filter(isExecutor) + val appLog = getApplicationLog(app, cacheLogSize) + KubernetesAppReport(driver, executors, appLog) + } + + def getDefaultNamespace: String = client.getNamespace + + private def getApplicationLog( + app: KubernetesApplication, cacheLogSize: Int): IndexedSeq[String] = + Try( + client.inNamespace(app.getApplicationNamespace).pods + .withName(app.getApplicationPod.getMetadata.getName) + .tailingLines(cacheLogSize).getLog.split("\n").toIndexedSeq + ).getOrElse(IndexedSeq.empty) + + private def isDriver: Pod => Boolean = + _.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER + + private def isExecutor: Pod => Boolean = + _.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR + +} + +private[utils] object KubernetesClientFactory { + + import java.io.File + + import com.google.common.base.Charsets + import com.google.common.io.Files + import io.fabric8.kubernetes.client.ConfigBuilder + + def createKubernetesClient(livyConf: LivyConf): LivyKubernetesClient = { + val masterUrl = sparkMasterToKubernetesApi(livyConf.sparkMaster()) + + val oauthTokenFile = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE).toOption + val oauthTokenValue = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE).toOption + require(oauthTokenFile.isEmpty || oauthTokenValue.isEmpty, + "Cannot specify OAuth token through both " + + s"a file $oauthTokenFile and a value $oauthTokenValue.") + + val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption + val clientKeyFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption + val clientCertFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_CERT_FILE).toOption + + val config = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(masterUrl) + .withOption(oauthTokenValue) { + (token, configBuilder) => configBuilder.withOauthToken(token) + } + .withOption(oauthTokenFile) { + (file, configBuilder) => + configBuilder + .withOauthToken(Files.toString(new File(file), Charsets.UTF_8)) + } + .withOption(caCertFile) { + (file, configBuilder) => configBuilder.withCaCertFile(file) + } + .withOption(clientKeyFile) { + (file, configBuilder) => configBuilder.withClientKeyFile(file) + } + .withOption(clientCertFile) { + (file, configBuilder) => configBuilder.withClientCertFile(file) + } + .build() + new LivyKubernetesClient( + new DefaultKubernetesClient(config), livyConf.getKubernetesNamespaces()) + } + + private[utils] def sparkMasterToKubernetesApi(sparkMaster: String): String = { + val replaced = sparkMaster.replaceFirst("k8s://", "") + if (!replaced.startsWith("http")) s"https://$replaced" + else replaced + } + + implicit class OptionString(val string: String) extends AnyVal { + def toOption: Option[String] = + if (string == null || string.trim.isEmpty) None + else Some(string) + } + + private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder) + extends AnyVal { + + def withOption[T](option: Option[T]) + (configurator: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder = + option.map(configurator(_, configBuilder)).getOrElse(configBuilder) + } + +} diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala new file mode 100644 index 000000000..5e668fb99 --- /dev/null +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.utils + +import java.util.Objects._ + +import io.fabric8.kubernetes.api.model._ +import org.mockito.Mockito.when +import org.scalatest.FunSpec +import org.scalatest.mock.MockitoSugar._ + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} + +class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { + + describe("KubernetesAppReport") { + it("should return application state") { + val status = when(mock[PodStatus].getPhase).thenReturn("Status").getMock[PodStatus] + val driver = when(mock[Pod].getStatus).thenReturn(status).getMock[Pod] + assertResult("status") { + KubernetesAppReport(Some(driver), Seq.empty, IndexedSeq.empty).getApplicationState + } + assertResult("unknown") { + KubernetesAppReport(None, Seq.empty, IndexedSeq.empty).getApplicationState + } + } + } + + describe("KubernetesClientFactory") { + it("should build KubernetesApi url from LivyConf masterUrl") { + def actual(sparkMaster: String): String = + KubernetesClientFactory.sparkMasterToKubernetesApi(sparkMaster) + + val masterUrl = "kubernetes.default.svc:443" + + assertResult(s"https://local")(actual(s"https://local")) + assertResult(s"https://$masterUrl")(actual(s"k8s://$masterUrl")) + assertResult(s"http://$masterUrl")(actual(s"k8s://http://$masterUrl")) + assertResult(s"https://$masterUrl")(actual(s"k8s://https://$masterUrl")) + assertResult(s"http://$masterUrl")(actual(s"http://$masterUrl")) + assertResult(s"https://$masterUrl")(actual(s"https://$masterUrl")) + } + + it("should create KubernetesClient with default configs") { + assert(nonNull(KubernetesClientFactory.createKubernetesClient(new LivyConf(false)))) + } + + it("should throw IllegalArgumentException in both oauth file and token provided") { + val conf = new LivyConf(false) + .set(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE, "dummy_path") + .set(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE, "dummy_value") + intercept[IllegalArgumentException] { + KubernetesClientFactory.createKubernetesClient(conf) + } + } + } +} From 6e8d22615ae126f9919134a82132282486ee88f1 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 27 Oct 2019 18:44:19 +0100 Subject: [PATCH 02/11] Add kubernetes dependency version to pom.xml Signed-off-by: Aliaksandr Sasnouskikh --- pom.xml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pom.xml b/pom.xml index 5bcf1a3c8..cebe9c576 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,7 @@ compile 2.2.3 ${spark.scala-2.11.version} + 4.1.1 3.0.0 1.9 4.5.3 @@ -298,6 +299,18 @@ ${metrics.version} + + io.fabric8 + kubernetes-client + ${kubernetes.client.version} + + + com.fasterxml.jackson.core + * + + + + io.netty netty-all From e39278729a74257d14712519e869557b4d4e97e5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 27 Oct 2019 19:16:30 +0100 Subject: [PATCH 03/11] Fix scalastyle Signed-off-by: Aliaksandr Sasnouskikh --- .../main/scala/org/apache/livy/utils/SparkKubernetesApp.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 6c8fb0b10..6366718b7 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -148,7 +148,8 @@ class SparkKubernetesApp private[utils]( kubernetesDiagnostics = appReport.getApplicationDiagnostics changeState(mapKubernetesState(appReport.getApplicationState, appTag)) } - debug(s"Application $appId is in state $state\nDiagnostics:\n${kubernetesDiagnostics.mkString("\n")}") + debug(s"Application $appId is in state $state\nDiagnostics:" + + s"\n${kubernetesDiagnostics.mkString("\n")}") } catch { case _: InterruptedException => kubernetesDiagnostics = ArrayBuffer("Application stopped by user.") From 176208ddd07568ac730e2eabbbf058720111c36e Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 27 Oct 2019 21:04:07 +0100 Subject: [PATCH 04/11] Added SparkKubernetesApp test, minor refactoring Signed-off-by: Aliaksandr Sasnouskikh --- .../livy/utils/SparkKubernetesApp.scala | 31 +++---- .../livy/utils/SparkKubernetesAppSpec.scala | 81 ++++++++++++++++++- 2 files changed, 91 insertions(+), 21 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 6366718b7..5b31a5352 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -39,6 +39,9 @@ object SparkKubernetesApp extends Logging { lazy val kubernetesClient: LivyKubernetesClient = KubernetesClientFactory.createKubernetesClient(livyConf) + private var livyConf: LivyConf = _ + private var sessionLeakageCheckTimeout: Long = _ + private var sessionLeakageCheckInterval: Long = _ private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() private val leakedAppsGCThread = new Thread() { @@ -70,26 +73,11 @@ object SparkKubernetesApp extends Logging { } } - private var livyConf: LivyConf = _ - - private var cacheLogSize: Int = _ - private var appLookupTimeout: FiniteDuration = _ - private var pollInterval: FiniteDuration = _ - - private var sessionLeakageCheckTimeout: Long = _ - private var sessionLeakageCheckInterval: Long = _ - def init(livyConf: LivyConf): Unit = { this.livyConf = livyConf - - cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) - appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds - pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds - sessionLeakageCheckInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL) sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT) - leakedAppsGCThread.setDaemon(true) leakedAppsGCThread.setName("LeakedAppsGCThread") leakedAppsGCThread.start() @@ -118,13 +106,18 @@ class SparkKubernetesApp private[utils]( import SparkKubernetesApp._ + private val appLookupTimeout = + livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds + private val cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) + private val pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds + private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String] private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] private var state: SparkApp.State = SparkApp.State.STARTING private val appPromise: Promise[KubernetesApplication] = Promise() - private val kubernetesAppMonitorThread = Utils + private[utils] val kubernetesAppMonitorThread = Utils .startDaemonThread(s"kubernetesAppMonitorThread-$this") { try { val app = try { @@ -141,12 +134,12 @@ class SparkKubernetesApp private[utils]( listener.foreach(_.appIdKnown(appId)) while (isRunning) { - Clock.sleep(pollInterval.toMillis) - val appReport = withRetry(kubernetesClient.getApplicationReport(app, cacheLogSize)) kubernetesAppLog = appReport.getApplicationLog kubernetesDiagnostics = appReport.getApplicationDiagnostics changeState(mapKubernetesState(appReport.getApplicationState, appTag)) + + Clock.sleep(pollInterval.toMillis) } debug(s"Application $appId is in state $state\nDiagnostics:" + s"\n${kubernetesDiagnostics.mkString("\n")}") @@ -179,7 +172,7 @@ class SparkKubernetesApp private[utils]( // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure. // We don't want a stuck session that can't be deleted. Emit a warning and move on. case _: TimeoutException | _: InterruptedException => - warn("Deleting a session while its Kubernetes application is not found.") + warn("Attempted to delete a session while its Kubernetes application is not found.") kubernetesAppMonitorThread.interrupt() } finally { process.foreach(_.destroy()) diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index 5e668fb99..c3c61d48f 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -14,19 +14,96 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.livy.utils import java.util.Objects._ +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.duration._ import io.fabric8.kubernetes.api.model._ -import org.mockito.Mockito.when +import org.mockito.Matchers.{eq => eqs, _} +import org.mockito.Mockito.{atLeast, verify, when} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.scalatest.FunSpec -import org.scalatest.mock.MockitoSugar._ +import org.scalatest.mock.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.utils.KubernetesApplicationState._ +import org.apache.livy.utils.SparkApp.State class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { + private def cleanupThread(t: Thread)(f: => Unit) = { + try { f } finally { t.interrupt() } + } + + private def mockSleep(ms: Long) = { + Thread.`yield`() + } + + describe("SparkKubernetesApp") { + val TEST_TIMEOUT = 30 seconds + val appId = "app_id" + val appTag = "app_tag" + val livyConf = new LivyConf(false) + livyConf.set(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT, "30s") + + it("should poll Kubernetes state and terminate") { + Clock.withSleepMethod(mockSleep) { + val mockApp = mock[KubernetesApplication] + when(mockApp.getApplicationId).thenReturn(appId) + when(mockApp.getApplicationTag).thenReturn(appTag) + val mockAppReport = mock[KubernetesAppReport] + when(mockAppReport.getApplicationLog).thenReturn(IndexedSeq("app", "log")) + when(mockAppReport.getApplicationDiagnostics).thenReturn(IndexedSeq("app", "diagnostics")) + val mockClient = mock[LivyKubernetesClient] + when(mockClient.getApplications(any(), any(), any())).thenReturn(Seq(mockApp)) + when(mockClient.getApplicationReport(eqs(mockApp), any(), any())).thenReturn(mockAppReport) + val mockListener = mock[SparkAppListener] + + // Simulate Kubernetes app state progression. + val applicationStateList = List( + PENDING, + CONTAINER_CREATING, + RUNNING, + COMPLETED + ) + val stateIndex = new AtomicInteger(0) + when(mockAppReport.getApplicationState).thenAnswer( + new Answer[String] { + override def answer(inv: InvocationOnMock): String = { + stateIndex.getAndIncrement() match { + case i if i < applicationStateList.size => + applicationStateList(i) + case _ => + applicationStateList.last + } + } + } + ) + + val app = new SparkKubernetesApp( + appTag, None, None, Some(mockListener), livyConf, mockClient) + + cleanupThread(app.kubernetesAppMonitorThread) { + app.kubernetesAppMonitorThread.join(TEST_TIMEOUT.toMillis) + assert(!app.kubernetesAppMonitorThread.isAlive, + "KubernetesAppMonitorThread should terminate after Kubernetes app is finished.") + verify(mockClient, atLeast(1)).getApplications(any(), anyString(), anyString()) + verify(mockClient, atLeast(1)) + .getApplicationReport(eqs(mockApp), anyInt(), anyString()) + verify(mockListener).appIdKnown(appId) + verify(mockListener).infoChanged(eqs(AppInfo())) + verify(mockListener).stateChanged(State.STARTING, State.RUNNING) + verify(mockListener).stateChanged(State.RUNNING, State.FINISHED) + } + } + } + } + describe("KubernetesAppReport") { it("should return application state") { val status = when(mock[PodStatus].getPhase).thenReturn("Status").getMock[PodStatus] From 2961663b6ca9e532cbc7c98676cfa6458ff6493f Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 27 Oct 2019 21:52:50 +0100 Subject: [PATCH 05/11] Minor cleanup Signed-off-by: Aliaksandr Sasnouskikh --- .../main/scala/org/apache/livy/utils/SparkKubernetesApp.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 5b31a5352..20bac86b2 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -253,7 +253,6 @@ object KubernetesApplicationState { } object KubernetesConstants { - val REQUESTED_BY_ANNOTATION = "requested-by" val CREATED_BY_ANNOTATION = "created-by" val SPARK_APP_ID_LABEL = "spark-app-selector" @@ -333,7 +332,7 @@ private[utils] case class KubernetesAppReport( } private[utils] class LivyKubernetesClient( - client: NamespacedKubernetesClient, namespaces: Set[String] = Set.empty) { + client: DefaultKubernetesClient, namespaces: Set[String] = Set.empty) { import KubernetesConstants._ import scala.collection.JavaConverters._ From dec48b1967d9f720a3f2465bbcafc438f1c83b39 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Mon, 28 Oct 2019 09:30:42 +0100 Subject: [PATCH 06/11] Add Kubernetes default namespace config Signed-off-by: Aliaksandr Sasnouskikh --- conf/livy.conf.template | 2 ++ .../main/scala/org/apache/livy/LivyConf.scala | 2 ++ .../apache/livy/utils/SparkKubernetesApp.scala | 16 +++++++++------- .../livy/utils/SparkKubernetesAppSpec.scala | 4 ++-- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index e7b3aea0e..55819e449 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -195,3 +195,5 @@ # livy.server.kubernetes.clientKeyFile = # Kubernetes client cert file path # livy.server.kubernetes.clientCertFile = +# Kubernetes client default namespace. +# livy.server.kubernetes.defaultNamespace = diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index ccf07284b..c6ad6e47e 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -230,6 +230,8 @@ object LivyConf { val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "") // Kubernetes client cert file path. val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "") + // Kubernetes client default namespace. + val KUBERNETES_DEFAULT_NAMESPACE = Entry("livy.server.kubernetes.defaultNamespace", "") // Comma-separated list of the Kubernetes namespaces to allow for applications creation. // All namespaces are allowed if empty. diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 20bac86b2..e151ab81f 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -407,26 +407,28 @@ private[utils] object KubernetesClientFactory { val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption val clientKeyFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption val clientCertFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_CERT_FILE).toOption + val clientNamespace = livyConf.get(LivyConf.KUBERNETES_DEFAULT_NAMESPACE).toOption val config = new ConfigBuilder() .withApiVersion("v1") .withMasterUrl(masterUrl) .withOption(oauthTokenValue) { - (token, configBuilder) => configBuilder.withOauthToken(token) + (token, builder) => builder.withOauthToken(token) } .withOption(oauthTokenFile) { - (file, configBuilder) => - configBuilder - .withOauthToken(Files.toString(new File(file), Charsets.UTF_8)) + (file, builder) => builder.withOauthToken(Files.toString(new File(file), Charsets.UTF_8)) } .withOption(caCertFile) { - (file, configBuilder) => configBuilder.withCaCertFile(file) + (file, builder) => builder.withCaCertFile(file) } .withOption(clientKeyFile) { - (file, configBuilder) => configBuilder.withClientKeyFile(file) + (file, builder) => builder.withClientKeyFile(file) } .withOption(clientCertFile) { - (file, configBuilder) => configBuilder.withClientCertFile(file) + (file, builder) => builder.withClientCertFile(file) + } + .withOption(clientNamespace) { + (namespace, builder) => builder.withNamespace(namespace) } .build() new LivyKubernetesClient( diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index c3c61d48f..4188289f1 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -118,7 +118,7 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { } describe("KubernetesClientFactory") { - it("should build KubernetesApi url from LivyConf masterUrl") { + it("should build KubernetesApi url from LivyConf master url") { def actual(sparkMaster: String): String = KubernetesClientFactory.sparkMasterToKubernetesApi(sparkMaster) @@ -136,7 +136,7 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { assert(nonNull(KubernetesClientFactory.createKubernetesClient(new LivyConf(false)))) } - it("should throw IllegalArgumentException in both oauth file and token provided") { + it("should throw IllegalArgumentException if both oauth file and token provided") { val conf = new LivyConf(false) .set(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE, "dummy_path") .set(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE, "dummy_value") From 02aee5bd109b115dd68f96966e297a7df3ef7497 Mon Sep 17 00:00:00 2001 From: jahstreet Date: Wed, 30 Oct 2019 22:24:18 +0100 Subject: [PATCH 07/11] Update livy.conf.template --- conf/livy.conf.template | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 55819e449..5adee0876 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -131,10 +131,10 @@ # How often Livy polls YARN to refresh YARN app state. # livy.server.yarn.poll-interval = 5s -# If Livy can't find the Kubernetes app within this time, consider it lost. +# If Livy can't find the Kubernetes app within this time, consider it lost # livy.server.kubernetes.app-lookup-timeout = 600s # When the cluster is busy, we may fail to launch Kubernetes app in app-lookup-timeout, then it -# would cause session leakage, so we need to check session leakage. +# would cause session leakage, so we need to check session leakage # How long to check livy session leakage # livy.server.kubernetes.app-leakage.check-timeout = 600s # How often to check livy session leakage @@ -145,7 +145,7 @@ # livy.server.kubernetes.poll-interval = 15s # Comma-separated list of the Kubernetes namespaces to allow for applications creation. -# All namespaces are allowed if empty. +# All namespaces are allowed if empty # livy.server.kubernetes.allowedNamespaces = # Days to keep Livy server request logs. @@ -195,5 +195,5 @@ # livy.server.kubernetes.clientKeyFile = # Kubernetes client cert file path # livy.server.kubernetes.clientCertFile = -# Kubernetes client default namespace. +# Kubernetes client default namespace # livy.server.kubernetes.defaultNamespace = From b521c3e3f1e541059b960b50ebda8e3e33b560ac Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sat, 2 Nov 2019 12:16:00 +0100 Subject: [PATCH 08/11] Minor refactoring Signed-off-by: Aliaksandr Sasnouskikh --- .../livy/utils/SparkKubernetesApp.scala | 58 +++++++++---------- .../livy/utils/SparkKubernetesAppSpec.scala | 5 +- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index e151ab81f..95d840795 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -93,6 +93,33 @@ object SparkKubernetesApp extends Logging { withRetry(fn, retries - 1) case Failure(e) => throw e } + + private[utils] object KubernetesApplicationState { + val PENDING = "pending" + val RUNNING = "running" + val SUCCEEDED = "succeeded" + val FAILED = "failed" + } + + private[utils] def mapKubernetesState( + kubernetesAppState: String, + appTag: String): SparkApp.State.Value = { + import KubernetesApplicationState._ + kubernetesAppState.toLowerCase match { + case PENDING => + SparkApp.State.STARTING + case RUNNING => + SparkApp.State.RUNNING + case SUCCEEDED => + SparkApp.State.FINISHED + case FAILED => + SparkApp.State.FAILED + case other => // any other combination is invalid, so FAIL the application. + error(s"Unknown Kubernetes state $other for app with tag $appTag") + SparkApp.State.FAILED + } + } + } class SparkKubernetesApp private[utils]( @@ -130,7 +157,7 @@ class SparkKubernetesApp private[utils]( appPromise.success(app) val appId = app.getApplicationId - Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId") + Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appTag") listener.foreach(_.appIdKnown(appId)) while (isRunning) { @@ -153,6 +180,7 @@ class SparkKubernetesApp private[utils]( changeState(SparkApp.State.FAILED) } finally { listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = None))) + info(s"Finished monitoring application $appTag with state $state") } } @@ -222,34 +250,6 @@ class SparkKubernetesApp private[utils]( } } - private def mapKubernetesState( - kubernetesAppState: String, - appTag: String): SparkApp.State.Value = { - import KubernetesApplicationState._ - kubernetesAppState.toLowerCase match { - case PENDING | CONTAINER_CREATING => - SparkApp.State.STARTING - case RUNNING => - SparkApp.State.RUNNING - case COMPLETED | SUCCEEDED => - SparkApp.State.FINISHED - case FAILED | ERROR => - SparkApp.State.FAILED - case other => // any other combination is invalid, so FAIL the application. - error(s"Unknown Kubernetes state $other for app with tag $appTag.") - SparkApp.State.FAILED - } - } -} - -object KubernetesApplicationState { - val PENDING = "pending" - val CONTAINER_CREATING = "containercreating" - val RUNNING = "running" - val COMPLETED = "completed" - val SUCCEEDED = "succeeded" - val FAILED = "failed" - val ERROR = "error" } object KubernetesConstants { diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index 4188289f1..cbb10fc64 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -31,7 +31,6 @@ import org.scalatest.FunSpec import org.scalatest.mock.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} -import org.apache.livy.utils.KubernetesApplicationState._ import org.apache.livy.utils.SparkApp.State class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { @@ -65,11 +64,11 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { val mockListener = mock[SparkAppListener] // Simulate Kubernetes app state progression. + import SparkKubernetesApp.KubernetesApplicationState._ val applicationStateList = List( PENDING, - CONTAINER_CREATING, RUNNING, - COMPLETED + SUCCEEDED ) val stateIndex = new AtomicInteger(0) when(mockAppReport.getApplicationState).thenAnswer( From 085b7a69214de60e0621ec1977bcc8ca9d4b42b0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sat, 2 Nov 2019 12:51:59 +0100 Subject: [PATCH 09/11] Remove appInfoChanged on finish monitoring app Signed-off-by: Aliaksandr Sasnouskikh --- .../main/scala/org/apache/livy/utils/SparkKubernetesApp.scala | 1 - .../scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 95d840795..c8561c14e 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -179,7 +179,6 @@ class SparkKubernetesApp private[utils]( kubernetesDiagnostics = ArrayBuffer(e.getMessage) changeState(SparkApp.State.FAILED) } finally { - listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = None))) info(s"Finished monitoring application $appTag with state $state") } } diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index cbb10fc64..f6ea6eded 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -95,7 +95,6 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { verify(mockClient, atLeast(1)) .getApplicationReport(eqs(mockApp), anyInt(), anyString()) verify(mockListener).appIdKnown(appId) - verify(mockListener).infoChanged(eqs(AppInfo())) verify(mockListener).stateChanged(State.STARTING, State.RUNNING) verify(mockListener).stateChanged(State.RUNNING, State.FINISHED) } From 05cc3a95eb799973a7ef83f6471ed7a2d8c6b186 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Thu, 31 Oct 2019 17:51:17 +0100 Subject: [PATCH 10/11] Build Spark UI links when running on Kubernetes Signed-off-by: Aliaksandr Sasnouskikh --- conf/livy.conf.template | 5 + .../main/scala/org/apache/livy/LivyConf.scala | 6 + .../livy/utils/SparkKubernetesApp.scala | 82 ++++++---- .../livy/utils/SparkKubernetesAppSpec.scala | 146 +++++++++++++----- 4 files changed, 173 insertions(+), 66 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 5adee0876..9044097fb 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -153,6 +153,11 @@ # If the Livy Web UI should be included in the Livy Server. Enabled by default. # livy.ui.enabled = true +# Wether to display on Livy UI links to Spark UI when running on Kubernetes +# livy.ui.kubernetes.sparkui.enabled = false +# String format to use to build links to Spark UI to be displayed on Livy UI when running on +# Kubernetes. Use '%s' placeholder for the link url part to be replaced by Spark application ID +# livy.ui.kubernetes.sparkui.link-format = http://localhost/%s # Whether to enable Livy server access control, if it is true then all the income requests will # be checked if the requested user has permission. diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index c6ad6e47e..a67735cea 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -63,6 +63,12 @@ object LivyConf { val SERVER_BASE_PATH = Entry("livy.ui.basePath", "") val UI_ENABLED = Entry("livy.ui.enabled", true) + // Wether to display on Livy UI links to Spark UI when running on Kubernetes + val UI_KUBERNETES_SPARK_UI_ENABLED = Entry("livy.ui.kubernetes.sparkui.enabled", false) + // String format to use to build links to Spark UI to be displayed on Livy UI when running on + // Kubernetes. Use '%s' placeholder for the link url part to be replaced by Spark application ID + val UI_KUBERNETES_SPARK_UI_LINK_FORMAT = + Entry("livy.ui.kubernetes.sparkui.link-format", "http://localhost/%s") val REQUEST_HEADER_SIZE = Entry("livy.server.request-header.size", 131072) val RESPONSE_HEADER_SIZE = Entry("livy.server.response-header.size", 131072) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index c8561c14e..03e1006f3 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -160,19 +160,24 @@ class SparkKubernetesApp private[utils]( Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appTag") listener.foreach(_.appIdKnown(appId)) + var appInfo = AppInfo() while (isRunning) { val appReport = withRetry(kubernetesClient.getApplicationReport(app, cacheLogSize)) kubernetesAppLog = appReport.getApplicationLog kubernetesDiagnostics = appReport.getApplicationDiagnostics changeState(mapKubernetesState(appReport.getApplicationState, appTag)) - + val latestAppInfo = AppInfo(sparkUiUrl = appReport.getTrackingUrl) + if (appInfo != latestAppInfo) { + listener.foreach(_.infoChanged(latestAppInfo)) + appInfo = latestAppInfo + } Clock.sleep(pollInterval.toMillis) } debug(s"Application $appId is in state $state\nDiagnostics:" + s"\n${kubernetesDiagnostics.mkString("\n")}") } catch { case _: InterruptedException => - kubernetesDiagnostics = ArrayBuffer("Application stopped by user.") + kubernetesDiagnostics = ArrayBuffer("Application stopped by user") changeState(SparkApp.State.KILLED) case NonFatal(e) => error("Error while refreshing Kubernetes state", e) @@ -199,7 +204,7 @@ class SparkKubernetesApp private[utils]( // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure. // We don't want a stuck session that can't be deleted. Emit a warning and move on. case _: TimeoutException | _: InterruptedException => - warn("Attempted to delete a session while its Kubernetes application is not found.") + warn("Attempted to delete a session while its Kubernetes application is not found") kubernetesAppMonitorThread.interrupt() } finally { process.foreach(_.destroy()) @@ -241,7 +246,7 @@ class SparkKubernetesApp private[utils]( s" $appTag in ${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" + " seconds. This may be because 1) spark-submit fail to submit application to " + "Kubernetes; or 2) Kubernetes cluster doesn't have enough resources to start the " + - "application in time. Please check Livy log and Kubernetes log to know the details.") + "application in time. Please check Livy log and Kubernetes log to know the details") } else { Clock.sleep(pollInterval.toMillis) getAppFromTag(appTag, pollInterval, deadline) @@ -267,45 +272,65 @@ class KubernetesApplication(driverPod: Pod) { import KubernetesConstants._ - private val appTag = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL) - private val appId = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) - private val namespace = driverPod.getMetadata.getNamespace + def getApplicationTag: String = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL) - def getApplicationTag: String = appTag + def getApplicationId: String = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) - def getApplicationId: String = appId + def getApplicationNamespace: String = driverPod.getMetadata.getNamespace - def getApplicationNamespace: String = namespace + def getApplicationState: String = driverPod.getStatus.getPhase.toLowerCase def getApplicationPod: Pod = driverPod } private[utils] case class KubernetesAppReport( driver: Option[Pod], - executors: Seq[Pod], - appLog: IndexedSeq[String]) { + executors: Set[Pod], + appLog: IndexedSeq[String], + livyConf: LivyConf) { + + import SparkKubernetesApp._ + + private val UNKNOWN = "unknown" + private val APP = driver.map(new KubernetesApplication(_)) + + def getTrackingUrl: Option[String] = { + if (livyConf.getBoolean(LivyConf.UI_KUBERNETES_SPARK_UI_ENABLED) && isRunning) { + val format = livyConf.get(LivyConf.UI_KUBERNETES_SPARK_UI_LINK_FORMAT) + Some(String.format(format, getApplicationId)) + } else { + None + } + } - def getApplicationState: String = - driver.map(_.getStatus.getPhase.toLowerCase).getOrElse("unknown") + def getApplicationState: String = getOrDefault(_.getApplicationState) + + def getApplicationTag: String = getOrDefault(_.getApplicationTag) + + def getApplicationId: String = getOrDefault(_.getApplicationId) def getApplicationLog: IndexedSeq[String] = appLog def getApplicationDiagnostics: IndexedSeq[String] = { - (Seq(driver) ++ executors.sortBy(_.getMetadata.getName).map(Some(_))) - .filter(_.nonEmpty) + (Seq(driver) ++ executors.toSeq.sortBy(_.getMetadata.getName).map(Some(_))) + .flatten .map(buildSparkPodDiagnosticsPrettyString) - .flatMap(_.split("\n")).toIndexedSeq + .flatMap(_.split("\n")) + .toIndexedSeq } - private def buildSparkPodDiagnosticsPrettyString(podOption: Option[Pod]): String = { + private def isRunning: Boolean = + SparkApp.State.RUNNING == mapKubernetesState(getApplicationState, getApplicationTag) + + private def getOrDefault(extractor: KubernetesApplication => String): String = + APP.map(extractor).flatMap(Option(_)).getOrElse(UNKNOWN) + + private def buildSparkPodDiagnosticsPrettyString(pod: Pod): String = { import scala.collection.JavaConverters._ def printMap(map: Map[_, _]): String = map.map { case (key, value) => s"$key=$value" }.mkString(", ") - if (podOption.isEmpty) return "unknown" - val pod = podOption.get - s"${pod.getMetadata.getName}.${pod.getMetadata.getNamespace}:" + s"\n\tnode: ${pod.getSpec.getNodeName}" + s"\n\thostname: ${pod.getSpec.getHostname}" + @@ -331,17 +356,18 @@ private[utils] case class KubernetesAppReport( } private[utils] class LivyKubernetesClient( - client: DefaultKubernetesClient, namespaces: Set[String] = Set.empty) { + client: DefaultKubernetesClient, livyConf: LivyConf) { - import KubernetesConstants._ import scala.collection.JavaConverters._ + import KubernetesConstants._ + def getApplications( labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER), appTagLabel: String = SPARK_APP_TAG_LABEL, appIdLabel: String = SPARK_APP_ID_LABEL ): Seq[KubernetesApplication] = - Option(namespaces).filter(_.nonEmpty) + Option(livyConf.getKubernetesNamespaces()).filter(_.nonEmpty) .map(_.map(client.inNamespace)) .getOrElse(Seq(client.inAnyNamespace())) .map(_.pods @@ -361,11 +387,11 @@ private[utils] class LivyKubernetesClient( ): KubernetesAppReport = { val pods = client.inNamespace(app.getApplicationNamespace).pods .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava) - .list.getItems.asScala + .list.getItems.asScala.toSet val driver = pods.find(isDriver) val executors = pods.filter(isExecutor) val appLog = getApplicationLog(app, cacheLogSize) - KubernetesAppReport(driver, executors, appLog) + KubernetesAppReport(driver, executors, appLog, livyConf) } def getDefaultNamespace: String = client.getNamespace @@ -401,7 +427,7 @@ private[utils] object KubernetesClientFactory { val oauthTokenValue = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE).toOption require(oauthTokenFile.isEmpty || oauthTokenValue.isEmpty, "Cannot specify OAuth token through both " + - s"a file $oauthTokenFile and a value $oauthTokenValue.") + s"a file $oauthTokenFile and a value $oauthTokenValue") val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption val clientKeyFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption @@ -431,7 +457,7 @@ private[utils] object KubernetesClientFactory { } .build() new LivyKubernetesClient( - new DefaultKubernetesClient(config), livyConf.getKubernetesNamespaces()) + new DefaultKubernetesClient(config), livyConf) } private[utils] def sparkMasterToKubernetesApi(sparkMaster: String): String = { diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index f6ea6eded..891ded465 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -32,6 +32,7 @@ import org.scalatest.mock.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} import org.apache.livy.utils.SparkApp.State +import org.apache.livy.utils.SparkKubernetesApp.KubernetesApplicationState._ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { @@ -47,46 +48,55 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { val TEST_TIMEOUT = 30 seconds val appId = "app_id" val appTag = "app_tag" - val livyConf = new LivyConf(false) - livyConf.set(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT, "30s") + val trackingUrl = "tracking_url" - it("should poll Kubernetes state and terminate") { - Clock.withSleepMethod(mockSleep) { - val mockApp = mock[KubernetesApplication] - when(mockApp.getApplicationId).thenReturn(appId) - when(mockApp.getApplicationTag).thenReturn(appTag) - val mockAppReport = mock[KubernetesAppReport] - when(mockAppReport.getApplicationLog).thenReturn(IndexedSeq("app", "log")) - when(mockAppReport.getApplicationDiagnostics).thenReturn(IndexedSeq("app", "diagnostics")) - val mockClient = mock[LivyKubernetesClient] - when(mockClient.getApplications(any(), any(), any())).thenReturn(Seq(mockApp)) - when(mockClient.getApplicationReport(eqs(mockApp), any(), any())).thenReturn(mockAppReport) - val mockListener = mock[SparkAppListener] + def initMockApp: KubernetesApplication = { + val mockApp = mock[KubernetesApplication] + when(mockApp.getApplicationId).thenReturn(appId) + when(mockApp.getApplicationTag).thenReturn(appTag) + mockApp + } - // Simulate Kubernetes app state progression. - import SparkKubernetesApp.KubernetesApplicationState._ - val applicationStateList = List( - PENDING, - RUNNING, - SUCCEEDED - ) - val stateIndex = new AtomicInteger(0) - when(mockAppReport.getApplicationState).thenAnswer( - new Answer[String] { - override def answer(inv: InvocationOnMock): String = { - stateIndex.getAndIncrement() match { - case i if i < applicationStateList.size => - applicationStateList(i) - case _ => - applicationStateList.last - } + def initMockClient(mockApp: KubernetesApplication): LivyKubernetesClient = { + val mockAppReport = mock[KubernetesAppReport] + when(mockAppReport.getApplicationLog).thenReturn(IndexedSeq("app", "log")) + when(mockAppReport.getApplicationDiagnostics).thenReturn(IndexedSeq("app", "diagnostics")) + when(mockAppReport.getTrackingUrl).thenReturn(Some(trackingUrl)) + val mockClient = mock[LivyKubernetesClient] + when(mockClient.getApplications(any(), any(), any())).thenReturn(Seq(mockApp)) + when(mockClient.getApplicationReport(eqs(mockApp), any(), any())).thenReturn(mockAppReport) + + // Simulate Kubernetes app state progression. + val applicationStateList = List( + PENDING, + RUNNING, + SUCCEEDED + ) + val stateIndex = new AtomicInteger(0) + when(mockAppReport.getApplicationState).thenAnswer( + new Answer[String] { + override def answer(inv: InvocationOnMock): String = { + stateIndex.getAndIncrement() match { + case i if i < applicationStateList.size => + applicationStateList(i) + case _ => + applicationStateList.last } } - ) + } + ) + mockClient + } + it("should poll Kubernetes state and terminate") { + val livyConf = new LivyConf(false) + livyConf.set(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT, "30s") + Clock.withSleepMethod(mockSleep) { + val mockListener = mock[SparkAppListener] + val mockApp = initMockApp + val mockClient = initMockClient(mockApp) val app = new SparkKubernetesApp( appTag, None, None, Some(mockListener), livyConf, mockClient) - cleanupThread(app.kubernetesAppMonitorThread) { app.kubernetesAppMonitorThread.join(TEST_TIMEOUT.toMillis) assert(!app.kubernetesAppMonitorThread.isAlive, @@ -100,19 +110,79 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { } } } + + it("should build sparkUiUrl and update AppInfo") { + val livyConf = new LivyConf(false) + livyConf.set(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT, "30s") + Clock.withSleepMethod(mockSleep) { + val mockListener = mock[SparkAppListener] + val mockApp = initMockApp + val mockClient = initMockClient(mockApp) + val app = new SparkKubernetesApp( + appTag, None, None, Some(mockListener), livyConf, mockClient) + cleanupThread(app.kubernetesAppMonitorThread) { + app.kubernetesAppMonitorThread.join(TEST_TIMEOUT.toMillis) + assert(!app.kubernetesAppMonitorThread.isAlive, + "KubernetesAppMonitorThread should terminate after Kubernetes app is finished.") + verify(mockListener) + .infoChanged(eqs(AppInfo(sparkUiUrl = Some(trackingUrl)))) + verify(mockListener).infoChanged(eqs(AppInfo())) + } + } + } } describe("KubernetesAppReport") { + + import scala.collection.JavaConverters._ + import KubernetesConstants._ + + def driverMock(state: String): Pod = { + val status = when(mock[PodStatus].getPhase).thenReturn(state).getMock[PodStatus] + when(mock[Pod].getStatus).thenReturn(status).getMock[Pod] + } + + def witLabels(pod: Pod, labelMap: Map[String, String]): Pod = { + val metaWithLabel = when(mock[ObjectMeta].getLabels).thenReturn(labelMap.asJava) + .getMock[ObjectMeta] + when(pod.getMetadata).thenReturn(metaWithLabel).getMock[Pod] + } + + def report(driver: Option[Pod], livyConf: LivyConf): KubernetesAppReport = + KubernetesAppReport(driver, Set.empty, IndexedSeq.empty, livyConf) + it("should return application state") { - val status = when(mock[PodStatus].getPhase).thenReturn("Status").getMock[PodStatus] - val driver = when(mock[Pod].getStatus).thenReturn(status).getMock[Pod] - assertResult("status") { - KubernetesAppReport(Some(driver), Seq.empty, IndexedSeq.empty).getApplicationState + val livyConf = new LivyConf(false) + val driver = driverMock("State") + assertResult("state") { + KubernetesAppReport(Some(driver), Set.empty, IndexedSeq.empty, livyConf).getApplicationState } assertResult("unknown") { - KubernetesAppReport(None, Seq.empty, IndexedSeq.empty).getApplicationState + KubernetesAppReport(None, Set.empty, IndexedSeq.empty, livyConf).getApplicationState } } + + it("should return Spark UI url") { + + def test(expected: Option[String], driver: Option[Pod], livyConf: LivyConf): Unit = + assertResult(expected) { + report(driver, livyConf).getTrackingUrl + } + + val appId = "app_id" + val livyConf = new LivyConf(false) + livyConf.set(LivyConf.UI_KUBERNETES_SPARK_UI_ENABLED, true) + livyConf.set(LivyConf.UI_KUBERNETES_SPARK_UI_LINK_FORMAT, "https://cluser.com/spark/%s") + + test(Some(s"https://cluser.com/spark/$appId"), + Some(witLabels(driverMock("Running"), Map(SPARK_APP_ID_LABEL -> appId))), livyConf) + test(Some(s"https://cluser.com/spark/unknown"), + Some(witLabels(driverMock("Running"), Map.empty)), livyConf) + test(None, Some(witLabels(driverMock("State"), Map.empty)), livyConf) + test(None, None, livyConf) + test(None, Some(witLabels(driverMock("Running"), Map(SPARK_APP_ID_LABEL -> appId))), + new LivyConf(false)) + } } describe("KubernetesClientFactory") { From 4050c0dc1959ad6a0b401f70e96830f35d9c5f80 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sat, 2 Nov 2019 12:55:09 +0100 Subject: [PATCH 11/11] Fix appInfoChanged after rebase Signed-off-by: Aliaksandr Sasnouskikh --- .../scala/org/apache/livy/utils/SparkKubernetesApp.scala | 1 + .../scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 03e1006f3..dcb98c52d 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -184,6 +184,7 @@ class SparkKubernetesApp private[utils]( kubernetesDiagnostics = ArrayBuffer(e.getMessage) changeState(SparkApp.State.FAILED) } finally { + listener.foreach(_.infoChanged(AppInfo())) info(s"Finished monitoring application $appTag with state $state") } } diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index 891ded465..f84da4874 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -105,6 +105,7 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { verify(mockClient, atLeast(1)) .getApplicationReport(eqs(mockApp), anyInt(), anyString()) verify(mockListener).appIdKnown(appId) + verify(mockListener).infoChanged(AppInfo()) verify(mockListener).stateChanged(State.STARTING, State.RUNNING) verify(mockListener).stateChanged(State.RUNNING, State.FINISHED) } @@ -125,8 +126,8 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { assert(!app.kubernetesAppMonitorThread.isAlive, "KubernetesAppMonitorThread should terminate after Kubernetes app is finished.") verify(mockListener) - .infoChanged(eqs(AppInfo(sparkUiUrl = Some(trackingUrl)))) - verify(mockListener).infoChanged(eqs(AppInfo())) + .infoChanged(AppInfo(sparkUiUrl = Some(trackingUrl))) + verify(mockListener).infoChanged(AppInfo()) } } }