From 758acb05a82f73caf68e57611c524302e84948a7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 27 Oct 2019 18:36:54 +0100 Subject: [PATCH 01/16] Implement base kubernetes fetures. Signed-off-by: Aliaksandr Sasnouskikh --- conf/livy.conf.template | 34 +- .../java/org/apache/livy/rsc/RSCConf.java | 7 + .../org/apache/livy/rsc/driver/RSCDriver.java | 7 + server/pom.xml | 5 + .../main/scala/org/apache/livy/LivyConf.scala | 36 ++ .../org/apache/livy/server/LivyServer.scala | 13 +- .../interactive/InteractiveSession.scala | 5 + .../org/apache/livy/utils/SparkApp.scala | 30 +- .../livy/utils/SparkKubernetesApp.scala | 463 ++++++++++++++++++ .../livy/utils/SparkKubernetesAppSpec.scala | 71 +++ 10 files changed, 662 insertions(+), 9 deletions(-) create mode 100644 server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala create mode 100644 server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 456bec749..da809abb5 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -145,12 +145,29 @@ # cause session leakage, so we need to check session leakage. # How long to check livy session leakage # livy.server.yarn.app-leakage.check-timeout = 600s -# how often to check livy session leakage +# How often to check livy session leakage # livy.server.yarn.app-leakage.check-interval = 60s # How often Livy polls YARN to refresh YARN app state. # livy.server.yarn.poll-interval = 5s -# + +# If Livy can't find the Kubernetes app within this time, consider it lost. +# livy.server.kubernetes.app-lookup-timeout = 600s +# When the cluster is busy, we may fail to launch Kubernetes app in app-lookup-timeout, then it +# would cause session leakage, so we need to check session leakage. +# How long to check livy session leakage +# livy.server.kubernetes.app-leakage.check-timeout = 600s +# How often to check livy session leakage +# livy.server.kubernetes.app-leakage.check-interval = 60s + +# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description +# details, routes, etc...) +# livy.server.kubernetes.poll-interval = 15s + +# Comma-separated list of the Kubernetes namespaces to allow for applications creation. +# All namespaces are allowed if empty. +# livy.server.kubernetes.allowedNamespaces = + # Days to keep Livy server request logs. # livy.server.request-log-retain.days = 5 @@ -185,3 +202,16 @@ # livy.server.auth..class = # livy.server.auth..param. = # livy.server.auth..param. = + +# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount +# if deployed to Kubernetes cluster as a Pod) +# Kubernetes oauth token file path +# livy.server.kubernetes.oauthTokenFile = +# Kubernetes oauth token string value +# livy.server.kubernetes.oauthTokenValue = +# Kubernetes CA cert file path +# livy.server.kubernetes.caCertFile = +# Kubernetes client key file path +# livy.server.kubernetes.clientKeyFile = +# Kubernetes client cert file path +# livy.server.kubernetes.clientCertFile = diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index 4c45956d7..200f5a5f0 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -25,6 +25,7 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Properties; import javax.security.sasl.Sasl; @@ -151,6 +152,12 @@ public String findLocalAddress() throws IOException { return address.getCanonicalHostName(); } + public boolean isRunningOnKubernetes() { + return Optional.ofNullable(get("livy.spark.master")) + .filter(s -> s.startsWith("k8s")) + .isPresent(); + } + private static final Map configsWithAlternatives = Collections.unmodifiableMap(new HashMap() {{ put(RSCConf.Entry.CLIENT_IN_PROCESS.key, DepConf.CLIENT_IN_PROCESS); diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java index a8f31f79d..25f49a6ef 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java @@ -169,6 +169,13 @@ private void initializeServer() throws Exception { // on the cluster, it would be tricky to solve that problem in a generic way. livyConf.set(RPC_SERVER_ADDRESS, null); + // If we are running on Kubernetes, set RPC_SERVER_ADDRESS from "spark.driver.host" option, + // which is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep: + // line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc" + if (livyConf.isRunningOnKubernetes()) { + livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host")); + } + if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) { // Test flag is turned on so we will just infinite loop here. It should cause // timeout and we should still see yarn application being cleaned up. diff --git a/server/pom.xml b/server/pom.xml index 188158b03..2fcafabaf 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -84,6 +84,11 @@ metrics-healthchecks + + io.fabric8 + kubernetes-client + + javax.servlet javax.servlet-api diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 1b06fdebd..4c4dff0c5 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -249,6 +249,33 @@ object LivyConf { // how often to check livy session leakage val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s") + // Kubernetes oauth token file path. + val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "") + // Kubernetes oauth token string value. + val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "") + // Kubernetes CA cert file path. + val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "") + // Kubernetes client key file path. + val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "") + // Kubernetes client cert file path. + val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "") + + // Comma-separated list of the Kubernetes namespaces to allow for applications creation. + // All namespaces are allowed if empty. + val KUBERNETES_ALLOWED_NAMESPACES = Entry("livy.server.kubernetes.allowedNamespaces", null) + + // If Livy can't find the Kubernetes app within this time, consider it lost. + val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s") + // How often Livy polls Kubernetes to refresh Kubernetes app state. + val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s") + + // How long to check livy session leakage. + val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT = + Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s") + // How often to check livy session leakage. + val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL = + Entry("livy.server.kubernetes.app-leakage.check-interval", "60s") + // Whether session timeout should be checked, by default it will be checked, which means inactive // session will be stopped after "livy.server.session.timeout" val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) @@ -360,6 +387,15 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return true if spark master starts with yarn. */ def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn") + /** Return true if spark master starts with k8s. */ + def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s") + + /** Return Kubernetes namespace or all if not set. */ + def getKubernetesNamespaces(): Set[String] = + Option(get(KUBERNETES_ALLOWED_NAMESPACES)).filterNot(_.isEmpty) + .map(_.split(",").toSet) + .getOrElse(Set.empty) + /** Return the spark deploy mode Livy sessions should use. */ def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 3e715bdf1..67da94d29 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManag import org.apache.livy.server.ui.UIServlet import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager} import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF +import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp} import org.apache.livy.utils.LivySparkUtils._ -import org.apache.livy.utils.SparkYarnApp class LivyServer extends Logging { @@ -142,10 +142,13 @@ class LivyServer extends Logging { testRecovery(livyConf) - // Initialize YarnClient ASAP to save time. + // Initialize YarnClient or KubernetesClient ASAP to save time. if (livyConf.isRunningOnYarn()) { SparkYarnApp.init(livyConf) Future { SparkYarnApp.yarnClient } + } else if (livyConf.isRunningOnKubernetes()) { + SparkKubernetesApp.init(livyConf) + Future { SparkKubernetesApp.kubernetesClient } } if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") { @@ -415,10 +418,10 @@ class LivyServer extends Logging { } private[livy] def testRecovery(livyConf: LivyConf): Unit = { - if (!livyConf.isRunningOnYarn()) { - // If recovery is turned on but we are not running on YARN, quit. + if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) { + // If recovery is turned on but we are not running on YARN or Kubernetes, quit. require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF, - "Session recovery requires YARN.") + "Session recovery requires YARN or Kubernetes.") } } } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index c4c273acd..a1030d461 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -404,6 +404,9 @@ class InteractiveSession( if (livyConf.isRunningOnYarn() || driverProcess.isDefined) { Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + } else if (livyConf.isRunningOnKubernetes()) { + // Create SparkKubernetesApp anyway to recover app monitoring on Livy server restart + Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) } else { None } @@ -481,6 +484,8 @@ class InteractiveSession( transition(SessionState.ShuttingDown) sessionStore.remove(RECOVERY_SESSION_TYPE, id) client.foreach { _.stop(true) } + // We need to call #kill here explicitly to delete Interactive pods from the cluster + if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill()) } catch { case _: Exception => app.foreach { diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index 9afe28162..ac74e585d 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -71,13 +71,36 @@ object SparkApp { sparkConf ++ Map( SPARK_YARN_TAG_KEY -> mergedYarnTags, "spark.yarn.submit.waitAppCompletion" -> "false") + } else if (livyConf.isRunningOnKubernetes()) { + + // We don't allow to submit applications to the namespaces different from the configured + val kubernetesNamespaces = livyConf.getKubernetesNamespaces() + val targetNamespace = sparkConf.getOrElse("spark.kubernetes.namespace", + SparkKubernetesApp.kubernetesClient.getDefaultNamespace) + if (kubernetesNamespaces.nonEmpty && !kubernetesNamespaces.contains(targetNamespace)) { + throw new IllegalArgumentException( + s"Requested namespace $targetNamespace doesn't match the configured: " + + s"${kubernetesNamespaces.mkString(", ")}") + } + + import KubernetesConstants._ + sparkConf ++ Map( + "spark.kubernetes.namespace" -> targetNamespace, + // Mark Spark pods with the unique appTag label to be used for their discovery + s"spark.kubernetes.driver.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag, + s"spark.kubernetes.executor.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag, + // Mark Spark pods as created by Livy for the additional tracing + s"spark.kubernetes.driver.label.$CREATED_BY_ANNOTATION" -> "livy", + s"spark.kubernetes.executor.label.$CREATED_BY_ANNOTATION" -> "livy", + "spark.kubernetes.submission.waitAppCompletion" -> "false") } else { sparkConf } } /** - * Return a SparkApp object to control the underlying Spark application via YARN or spark-submit. + * Return a SparkApp object to control the underlying Spark application via YARN, Kubernetes + * or spark-submit. * * @param uniqueAppTag A tag that can uniquely identify the application. */ @@ -89,8 +112,11 @@ object SparkApp { listener: Option[SparkAppListener]): SparkApp = { if (livyConf.isRunningOnYarn()) { new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf) + } else if (livyConf.isRunningOnKubernetes()) { + new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf) } else { - require(process.isDefined, "process must not be None when Livy master is not YARN.") + require(process.isDefined, "process must not be None when Livy master is not YARN or " + + "Kubernetes.") new SparkProcApp(process.get, listener) } } diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala new file mode 100644 index 000000000..6c8fb0b10 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.utils + +import java.util.concurrent.TimeoutException + +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client._ + +import org.apache.livy.{LivyConf, Logging, Utils} + +object SparkKubernetesApp extends Logging { + + private val RETRY_BACKOFF_MILLIS = 1000 + + lazy val kubernetesClient: LivyKubernetesClient = + KubernetesClientFactory.createKubernetesClient(livyConf) + + private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() + + private val leakedAppsGCThread = new Thread() { + override def run(): Unit = { + while (true) { + if (!leakedAppTags.isEmpty) { + // kill the app if found it or remove it if exceeding a threshold + val leakedApps = leakedAppTags.entrySet().iterator() + val now = System.currentTimeMillis() + val apps = withRetry(kubernetesClient.getApplications()).groupBy(_.getApplicationTag) + while (leakedApps.hasNext) { + val leakedApp = leakedApps.next() + apps.get(leakedApp.getKey) match { + case Some(seq) => + seq.foreach(app => + if (withRetry(kubernetesClient.killApplication(app))) { + leakedApps.remove() + info(s"Killed leaked app with tag ${leakedApp.getKey}") + } + ) + case None if (leakedApp.getValue - now) > sessionLeakageCheckTimeout => + leakedApps.remove() + info(s"Remove leaked Kubernetes app tag ${leakedApp.getKey}") + } + } + } + Thread.sleep(sessionLeakageCheckInterval) + } + } + } + + private var livyConf: LivyConf = _ + + private var cacheLogSize: Int = _ + private var appLookupTimeout: FiniteDuration = _ + private var pollInterval: FiniteDuration = _ + + private var sessionLeakageCheckTimeout: Long = _ + private var sessionLeakageCheckInterval: Long = _ + + def init(livyConf: LivyConf): Unit = { + this.livyConf = livyConf + + cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) + appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds + pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds + + sessionLeakageCheckInterval = + livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL) + sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT) + + leakedAppsGCThread.setDaemon(true) + leakedAppsGCThread.setName("LeakedAppsGCThread") + leakedAppsGCThread.start() + } + + // Returning T, throwing the exception on failure + @tailrec + private def withRetry[T](fn: => T, retries: Int = 3): T = + Try { fn } match { + case Success(x) => x + case _ if retries > 1 => + Thread.sleep(RETRY_BACKOFF_MILLIS) + withRetry(fn, retries - 1) + case Failure(e) => throw e + } +} + +class SparkKubernetesApp private[utils]( + appTag: String, + appIdOption: Option[String], + process: Option[LineBufferedProcess], + listener: Option[SparkAppListener], + livyConf: LivyConf, + kubernetesClient: => LivyKubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit tests + extends SparkApp with Logging { + + import SparkKubernetesApp._ + + private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String] + private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] + + private var state: SparkApp.State = SparkApp.State.STARTING + private val appPromise: Promise[KubernetesApplication] = Promise() + + private val kubernetesAppMonitorThread = Utils + .startDaemonThread(s"kubernetesAppMonitorThread-$this") { + try { + val app = try { + getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow) + } catch { + case e: Exception => + appPromise.failure(e) + throw e + } + appPromise.success(app) + val appId = app.getApplicationId + + Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId") + listener.foreach(_.appIdKnown(appId)) + + while (isRunning) { + Clock.sleep(pollInterval.toMillis) + + val appReport = withRetry(kubernetesClient.getApplicationReport(app, cacheLogSize)) + kubernetesAppLog = appReport.getApplicationLog + kubernetesDiagnostics = appReport.getApplicationDiagnostics + changeState(mapKubernetesState(appReport.getApplicationState, appTag)) + } + debug(s"Application $appId is in state $state\nDiagnostics:\n${kubernetesDiagnostics.mkString("\n")}") + } catch { + case _: InterruptedException => + kubernetesDiagnostics = ArrayBuffer("Application stopped by user.") + changeState(SparkApp.State.KILLED) + case NonFatal(e) => + error("Error while refreshing Kubernetes state", e) + kubernetesDiagnostics = ArrayBuffer(e.getMessage) + changeState(SparkApp.State.FAILED) + } finally { + listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = None))) + } + } + + override def log(): IndexedSeq[String] = + ("stdout: " +: kubernetesAppLog) ++ + ("\nstderr: " +: (process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++ + process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++ + ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics) + + override def kill(): Unit = + try { + withRetry { + kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout)) + } + } catch { + // We cannot kill the Kubernetes app without the appTag. + // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure. + // We don't want a stuck session that can't be deleted. Emit a warning and move on. + case _: TimeoutException | _: InterruptedException => + warn("Deleting a session while its Kubernetes application is not found.") + kubernetesAppMonitorThread.interrupt() + } finally { + process.foreach(_.destroy()) + } + + private def isRunning: Boolean = { + state != SparkApp.State.FAILED && + state != SparkApp.State.FINISHED && + state != SparkApp.State.KILLED + } + + private def changeState(newState: SparkApp.State.Value): Unit = { + if (state != newState) { + listener.foreach(_.stateChanged(state, newState)) + state = newState + } + } + + /** + * Find the corresponding KubernetesApplication from an application tag. + * + * @param appTag The application tag tagged on the target application. + * If the tag is not unique, it returns the first application it found. + * @return KubernetesApplication or the failure. + */ + @tailrec + private def getAppFromTag( + appTag: String, + pollInterval: Duration, + deadline: Deadline): KubernetesApplication = { + withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag))) + match { + case Some(app) => app + case None => + if (deadline.isOverdue) { + process.foreach(_.destroy()) + leakedAppTags.put(appTag, System.currentTimeMillis()) + throw new IllegalStateException(s"No Kubernetes application is found with tag" + + s" $appTag in ${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" + + " seconds. This may be because 1) spark-submit fail to submit application to " + + "Kubernetes; or 2) Kubernetes cluster doesn't have enough resources to start the " + + "application in time. Please check Livy log and Kubernetes log to know the details.") + } else { + Clock.sleep(pollInterval.toMillis) + getAppFromTag(appTag, pollInterval, deadline) + } + } + } + + private def mapKubernetesState( + kubernetesAppState: String, + appTag: String): SparkApp.State.Value = { + import KubernetesApplicationState._ + kubernetesAppState.toLowerCase match { + case PENDING | CONTAINER_CREATING => + SparkApp.State.STARTING + case RUNNING => + SparkApp.State.RUNNING + case COMPLETED | SUCCEEDED => + SparkApp.State.FINISHED + case FAILED | ERROR => + SparkApp.State.FAILED + case other => // any other combination is invalid, so FAIL the application. + error(s"Unknown Kubernetes state $other for app with tag $appTag.") + SparkApp.State.FAILED + } + } +} + +object KubernetesApplicationState { + val PENDING = "pending" + val CONTAINER_CREATING = "containercreating" + val RUNNING = "running" + val COMPLETED = "completed" + val SUCCEEDED = "succeeded" + val FAILED = "failed" + val ERROR = "error" +} + +object KubernetesConstants { + val REQUESTED_BY_ANNOTATION = "requested-by" + val CREATED_BY_ANNOTATION = "created-by" + + val SPARK_APP_ID_LABEL = "spark-app-selector" + val SPARK_APP_TAG_LABEL = "spark-app-tag" + val SPARK_ROLE_LABEL = "spark-role" + val SPARK_EXEC_ID_LABEL = "spark-exec-id" + + val SPARK_ROLE_DRIVER = "driver" + val SPARK_ROLE_EXECUTOR = "executor" +} + +class KubernetesApplication(driverPod: Pod) { + + import KubernetesConstants._ + + private val appTag = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL) + private val appId = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) + private val namespace = driverPod.getMetadata.getNamespace + + def getApplicationTag: String = appTag + + def getApplicationId: String = appId + + def getApplicationNamespace: String = namespace + + def getApplicationPod: Pod = driverPod +} + +private[utils] case class KubernetesAppReport( + driver: Option[Pod], + executors: Seq[Pod], + appLog: IndexedSeq[String]) { + + def getApplicationState: String = + driver.map(_.getStatus.getPhase.toLowerCase).getOrElse("unknown") + + def getApplicationLog: IndexedSeq[String] = appLog + + def getApplicationDiagnostics: IndexedSeq[String] = { + (Seq(driver) ++ executors.sortBy(_.getMetadata.getName).map(Some(_))) + .filter(_.nonEmpty) + .map(buildSparkPodDiagnosticsPrettyString) + .flatMap(_.split("\n")).toIndexedSeq + } + + private def buildSparkPodDiagnosticsPrettyString(podOption: Option[Pod]): String = { + import scala.collection.JavaConverters._ + def printMap(map: Map[_, _]): String = map.map { + case (key, value) => s"$key=$value" + }.mkString(", ") + + if (podOption.isEmpty) return "unknown" + val pod = podOption.get + + s"${pod.getMetadata.getName}.${pod.getMetadata.getNamespace}:" + + s"\n\tnode: ${pod.getSpec.getNodeName}" + + s"\n\thostname: ${pod.getSpec.getHostname}" + + s"\n\tpodIp: ${pod.getStatus.getPodIP}" + + s"\n\tstartTime: ${pod.getStatus.getStartTime}" + + s"\n\tphase: ${pod.getStatus.getPhase}" + + s"\n\treason: ${pod.getStatus.getReason}" + + s"\n\tmessage: ${pod.getStatus.getMessage}" + + s"\n\tlabels: ${printMap(pod.getMetadata.getLabels.asScala.toMap)}" + + s"\n\tcontainers:" + + s"\n\t\t${ + pod.getSpec.getContainers.asScala.map(container => + s"${container.getName}:" + + s"\n\t\t\timage: ${container.getImage}" + + s"\n\t\t\trequests: ${printMap(container.getResources.getRequests.asScala.toMap)}" + + s"\n\t\t\tlimits: ${printMap(container.getResources.getLimits.asScala.toMap)}" + + s"\n\t\t\tcommand: ${container.getCommand} ${container.getArgs}" + ).mkString("\n\t\t") + }" + + s"\n\tconditions:" + + s"\n\t\t${pod.getStatus.getConditions.asScala.mkString("\n\t\t")}" + } +} + +private[utils] class LivyKubernetesClient( + client: NamespacedKubernetesClient, namespaces: Set[String] = Set.empty) { + + import KubernetesConstants._ + import scala.collection.JavaConverters._ + + def getApplications( + labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER), + appTagLabel: String = SPARK_APP_TAG_LABEL, + appIdLabel: String = SPARK_APP_ID_LABEL + ): Seq[KubernetesApplication] = + Option(namespaces).filter(_.nonEmpty) + .map(_.map(client.inNamespace)) + .getOrElse(Seq(client.inAnyNamespace())) + .map(_.pods + .withLabels(labels.asJava) + .withLabel(appTagLabel) + .withLabel(appIdLabel) + .list.getItems.asScala.map(new KubernetesApplication(_))) + .reduce(_ ++ _) + + def killApplication(app: KubernetesApplication): Boolean = + client.inNamespace(app.getApplicationNamespace).pods.delete(app.getApplicationPod) + + def getApplicationReport( + app: KubernetesApplication, + cacheLogSize: Int, + appTagLabel: String = SPARK_APP_TAG_LABEL + ): KubernetesAppReport = { + val pods = client.inNamespace(app.getApplicationNamespace).pods + .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava) + .list.getItems.asScala + val driver = pods.find(isDriver) + val executors = pods.filter(isExecutor) + val appLog = getApplicationLog(app, cacheLogSize) + KubernetesAppReport(driver, executors, appLog) + } + + def getDefaultNamespace: String = client.getNamespace + + private def getApplicationLog( + app: KubernetesApplication, cacheLogSize: Int): IndexedSeq[String] = + Try( + client.inNamespace(app.getApplicationNamespace).pods + .withName(app.getApplicationPod.getMetadata.getName) + .tailingLines(cacheLogSize).getLog.split("\n").toIndexedSeq + ).getOrElse(IndexedSeq.empty) + + private def isDriver: Pod => Boolean = + _.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER + + private def isExecutor: Pod => Boolean = + _.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR + +} + +private[utils] object KubernetesClientFactory { + + import java.io.File + + import com.google.common.base.Charsets + import com.google.common.io.Files + import io.fabric8.kubernetes.client.ConfigBuilder + + def createKubernetesClient(livyConf: LivyConf): LivyKubernetesClient = { + val masterUrl = sparkMasterToKubernetesApi(livyConf.sparkMaster()) + + val oauthTokenFile = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE).toOption + val oauthTokenValue = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE).toOption + require(oauthTokenFile.isEmpty || oauthTokenValue.isEmpty, + "Cannot specify OAuth token through both " + + s"a file $oauthTokenFile and a value $oauthTokenValue.") + + val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption + val clientKeyFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption + val clientCertFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_CERT_FILE).toOption + + val config = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(masterUrl) + .withOption(oauthTokenValue) { + (token, configBuilder) => configBuilder.withOauthToken(token) + } + .withOption(oauthTokenFile) { + (file, configBuilder) => + configBuilder + .withOauthToken(Files.toString(new File(file), Charsets.UTF_8)) + } + .withOption(caCertFile) { + (file, configBuilder) => configBuilder.withCaCertFile(file) + } + .withOption(clientKeyFile) { + (file, configBuilder) => configBuilder.withClientKeyFile(file) + } + .withOption(clientCertFile) { + (file, configBuilder) => configBuilder.withClientCertFile(file) + } + .build() + new LivyKubernetesClient( + new DefaultKubernetesClient(config), livyConf.getKubernetesNamespaces()) + } + + private[utils] def sparkMasterToKubernetesApi(sparkMaster: String): String = { + val replaced = sparkMaster.replaceFirst("k8s://", "") + if (!replaced.startsWith("http")) s"https://$replaced" + else replaced + } + + implicit class OptionString(val string: String) extends AnyVal { + def toOption: Option[String] = + if (string == null || string.trim.isEmpty) None + else Some(string) + } + + private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder) + extends AnyVal { + + def withOption[T](option: Option[T]) + (configurator: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder = + option.map(configurator(_, configBuilder)).getOrElse(configBuilder) + } + +} diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala new file mode 100644 index 000000000..5e668fb99 --- /dev/null +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.utils + +import java.util.Objects._ + +import io.fabric8.kubernetes.api.model._ +import org.mockito.Mockito.when +import org.scalatest.FunSpec +import org.scalatest.mock.MockitoSugar._ + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} + +class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { + + describe("KubernetesAppReport") { + it("should return application state") { + val status = when(mock[PodStatus].getPhase).thenReturn("Status").getMock[PodStatus] + val driver = when(mock[Pod].getStatus).thenReturn(status).getMock[Pod] + assertResult("status") { + KubernetesAppReport(Some(driver), Seq.empty, IndexedSeq.empty).getApplicationState + } + assertResult("unknown") { + KubernetesAppReport(None, Seq.empty, IndexedSeq.empty).getApplicationState + } + } + } + + describe("KubernetesClientFactory") { + it("should build KubernetesApi url from LivyConf masterUrl") { + def actual(sparkMaster: String): String = + KubernetesClientFactory.sparkMasterToKubernetesApi(sparkMaster) + + val masterUrl = "kubernetes.default.svc:443" + + assertResult(s"https://local")(actual(s"https://local")) + assertResult(s"https://$masterUrl")(actual(s"k8s://$masterUrl")) + assertResult(s"http://$masterUrl")(actual(s"k8s://http://$masterUrl")) + assertResult(s"https://$masterUrl")(actual(s"k8s://https://$masterUrl")) + assertResult(s"http://$masterUrl")(actual(s"http://$masterUrl")) + assertResult(s"https://$masterUrl")(actual(s"https://$masterUrl")) + } + + it("should create KubernetesClient with default configs") { + assert(nonNull(KubernetesClientFactory.createKubernetesClient(new LivyConf(false)))) + } + + it("should throw IllegalArgumentException in both oauth file and token provided") { + val conf = new LivyConf(false) + .set(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE, "dummy_path") + .set(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE, "dummy_value") + intercept[IllegalArgumentException] { + KubernetesClientFactory.createKubernetesClient(conf) + } + } + } +} From 0f256ee0b4f1fa7fe9b90b00da4fd14df6452952 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 27 Oct 2019 18:44:19 +0100 Subject: [PATCH 02/16] Add kubernetes dependency version to pom.xml Signed-off-by: Aliaksandr Sasnouskikh --- pom.xml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pom.xml b/pom.xml index d2e535a70..5998e2e10 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,7 @@ 2.4.5 2.4.5 ${spark.scala-2.11.version} + 4.1.1 3.0.0 1.9 4.5.3 @@ -316,6 +317,18 @@ ${metrics.version} + + io.fabric8 + kubernetes-client + ${kubernetes.client.version} + + + com.fasterxml.jackson.core + * + + + + io.netty netty-all From 20638be4cb5c780fa3f3e8b45fdde569a01d60f8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 27 Oct 2019 19:16:30 +0100 Subject: [PATCH 03/16] Fix scalastyle Signed-off-by: Aliaksandr Sasnouskikh --- .../main/scala/org/apache/livy/utils/SparkKubernetesApp.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 6c8fb0b10..6366718b7 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -148,7 +148,8 @@ class SparkKubernetesApp private[utils]( kubernetesDiagnostics = appReport.getApplicationDiagnostics changeState(mapKubernetesState(appReport.getApplicationState, appTag)) } - debug(s"Application $appId is in state $state\nDiagnostics:\n${kubernetesDiagnostics.mkString("\n")}") + debug(s"Application $appId is in state $state\nDiagnostics:" + + s"\n${kubernetesDiagnostics.mkString("\n")}") } catch { case _: InterruptedException => kubernetesDiagnostics = ArrayBuffer("Application stopped by user.") From a89064cacff989c2fca257aa3b3ff1d64d07d54b Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 27 Oct 2019 21:04:07 +0100 Subject: [PATCH 04/16] Added SparkKubernetesApp test, minor refactoring Signed-off-by: Aliaksandr Sasnouskikh --- .../livy/utils/SparkKubernetesApp.scala | 31 +++---- .../livy/utils/SparkKubernetesAppSpec.scala | 81 ++++++++++++++++++- 2 files changed, 91 insertions(+), 21 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 6366718b7..5b31a5352 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -39,6 +39,9 @@ object SparkKubernetesApp extends Logging { lazy val kubernetesClient: LivyKubernetesClient = KubernetesClientFactory.createKubernetesClient(livyConf) + private var livyConf: LivyConf = _ + private var sessionLeakageCheckTimeout: Long = _ + private var sessionLeakageCheckInterval: Long = _ private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() private val leakedAppsGCThread = new Thread() { @@ -70,26 +73,11 @@ object SparkKubernetesApp extends Logging { } } - private var livyConf: LivyConf = _ - - private var cacheLogSize: Int = _ - private var appLookupTimeout: FiniteDuration = _ - private var pollInterval: FiniteDuration = _ - - private var sessionLeakageCheckTimeout: Long = _ - private var sessionLeakageCheckInterval: Long = _ - def init(livyConf: LivyConf): Unit = { this.livyConf = livyConf - - cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) - appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds - pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds - sessionLeakageCheckInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL) sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT) - leakedAppsGCThread.setDaemon(true) leakedAppsGCThread.setName("LeakedAppsGCThread") leakedAppsGCThread.start() @@ -118,13 +106,18 @@ class SparkKubernetesApp private[utils]( import SparkKubernetesApp._ + private val appLookupTimeout = + livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds + private val cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) + private val pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds + private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String] private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] private var state: SparkApp.State = SparkApp.State.STARTING private val appPromise: Promise[KubernetesApplication] = Promise() - private val kubernetesAppMonitorThread = Utils + private[utils] val kubernetesAppMonitorThread = Utils .startDaemonThread(s"kubernetesAppMonitorThread-$this") { try { val app = try { @@ -141,12 +134,12 @@ class SparkKubernetesApp private[utils]( listener.foreach(_.appIdKnown(appId)) while (isRunning) { - Clock.sleep(pollInterval.toMillis) - val appReport = withRetry(kubernetesClient.getApplicationReport(app, cacheLogSize)) kubernetesAppLog = appReport.getApplicationLog kubernetesDiagnostics = appReport.getApplicationDiagnostics changeState(mapKubernetesState(appReport.getApplicationState, appTag)) + + Clock.sleep(pollInterval.toMillis) } debug(s"Application $appId is in state $state\nDiagnostics:" + s"\n${kubernetesDiagnostics.mkString("\n")}") @@ -179,7 +172,7 @@ class SparkKubernetesApp private[utils]( // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure. // We don't want a stuck session that can't be deleted. Emit a warning and move on. case _: TimeoutException | _: InterruptedException => - warn("Deleting a session while its Kubernetes application is not found.") + warn("Attempted to delete a session while its Kubernetes application is not found.") kubernetesAppMonitorThread.interrupt() } finally { process.foreach(_.destroy()) diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index 5e668fb99..c3c61d48f 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -14,19 +14,96 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.livy.utils import java.util.Objects._ +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.duration._ import io.fabric8.kubernetes.api.model._ -import org.mockito.Mockito.when +import org.mockito.Matchers.{eq => eqs, _} +import org.mockito.Mockito.{atLeast, verify, when} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.scalatest.FunSpec -import org.scalatest.mock.MockitoSugar._ +import org.scalatest.mock.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.utils.KubernetesApplicationState._ +import org.apache.livy.utils.SparkApp.State class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { + private def cleanupThread(t: Thread)(f: => Unit) = { + try { f } finally { t.interrupt() } + } + + private def mockSleep(ms: Long) = { + Thread.`yield`() + } + + describe("SparkKubernetesApp") { + val TEST_TIMEOUT = 30 seconds + val appId = "app_id" + val appTag = "app_tag" + val livyConf = new LivyConf(false) + livyConf.set(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT, "30s") + + it("should poll Kubernetes state and terminate") { + Clock.withSleepMethod(mockSleep) { + val mockApp = mock[KubernetesApplication] + when(mockApp.getApplicationId).thenReturn(appId) + when(mockApp.getApplicationTag).thenReturn(appTag) + val mockAppReport = mock[KubernetesAppReport] + when(mockAppReport.getApplicationLog).thenReturn(IndexedSeq("app", "log")) + when(mockAppReport.getApplicationDiagnostics).thenReturn(IndexedSeq("app", "diagnostics")) + val mockClient = mock[LivyKubernetesClient] + when(mockClient.getApplications(any(), any(), any())).thenReturn(Seq(mockApp)) + when(mockClient.getApplicationReport(eqs(mockApp), any(), any())).thenReturn(mockAppReport) + val mockListener = mock[SparkAppListener] + + // Simulate Kubernetes app state progression. + val applicationStateList = List( + PENDING, + CONTAINER_CREATING, + RUNNING, + COMPLETED + ) + val stateIndex = new AtomicInteger(0) + when(mockAppReport.getApplicationState).thenAnswer( + new Answer[String] { + override def answer(inv: InvocationOnMock): String = { + stateIndex.getAndIncrement() match { + case i if i < applicationStateList.size => + applicationStateList(i) + case _ => + applicationStateList.last + } + } + } + ) + + val app = new SparkKubernetesApp( + appTag, None, None, Some(mockListener), livyConf, mockClient) + + cleanupThread(app.kubernetesAppMonitorThread) { + app.kubernetesAppMonitorThread.join(TEST_TIMEOUT.toMillis) + assert(!app.kubernetesAppMonitorThread.isAlive, + "KubernetesAppMonitorThread should terminate after Kubernetes app is finished.") + verify(mockClient, atLeast(1)).getApplications(any(), anyString(), anyString()) + verify(mockClient, atLeast(1)) + .getApplicationReport(eqs(mockApp), anyInt(), anyString()) + verify(mockListener).appIdKnown(appId) + verify(mockListener).infoChanged(eqs(AppInfo())) + verify(mockListener).stateChanged(State.STARTING, State.RUNNING) + verify(mockListener).stateChanged(State.RUNNING, State.FINISHED) + } + } + } + } + describe("KubernetesAppReport") { it("should return application state") { val status = when(mock[PodStatus].getPhase).thenReturn("Status").getMock[PodStatus] From d45cdbf6ff6b7824e91779454c3678d5ccf06e95 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 27 Oct 2019 21:52:50 +0100 Subject: [PATCH 05/16] Minor cleanup Signed-off-by: Aliaksandr Sasnouskikh --- .../main/scala/org/apache/livy/utils/SparkKubernetesApp.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 5b31a5352..20bac86b2 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -253,7 +253,6 @@ object KubernetesApplicationState { } object KubernetesConstants { - val REQUESTED_BY_ANNOTATION = "requested-by" val CREATED_BY_ANNOTATION = "created-by" val SPARK_APP_ID_LABEL = "spark-app-selector" @@ -333,7 +332,7 @@ private[utils] case class KubernetesAppReport( } private[utils] class LivyKubernetesClient( - client: NamespacedKubernetesClient, namespaces: Set[String] = Set.empty) { + client: DefaultKubernetesClient, namespaces: Set[String] = Set.empty) { import KubernetesConstants._ import scala.collection.JavaConverters._ From 8f8be888636aae5663447eb358d706a8922d1b56 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Mon, 28 Oct 2019 09:30:42 +0100 Subject: [PATCH 06/16] Add Kubernetes default namespace config Signed-off-by: Aliaksandr Sasnouskikh --- conf/livy.conf.template | 2 ++ .../main/scala/org/apache/livy/LivyConf.scala | 2 ++ .../apache/livy/utils/SparkKubernetesApp.scala | 16 +++++++++------- .../livy/utils/SparkKubernetesAppSpec.scala | 4 ++-- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index da809abb5..d8c14265d 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -215,3 +215,5 @@ # livy.server.kubernetes.clientKeyFile = # Kubernetes client cert file path # livy.server.kubernetes.clientCertFile = +# Kubernetes client default namespace. +# livy.server.kubernetes.defaultNamespace = diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 4c4dff0c5..20d1ab7b6 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -259,6 +259,8 @@ object LivyConf { val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "") // Kubernetes client cert file path. val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "") + // Kubernetes client default namespace. + val KUBERNETES_DEFAULT_NAMESPACE = Entry("livy.server.kubernetes.defaultNamespace", "") // Comma-separated list of the Kubernetes namespaces to allow for applications creation. // All namespaces are allowed if empty. diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 20bac86b2..e151ab81f 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -407,26 +407,28 @@ private[utils] object KubernetesClientFactory { val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption val clientKeyFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption val clientCertFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_CERT_FILE).toOption + val clientNamespace = livyConf.get(LivyConf.KUBERNETES_DEFAULT_NAMESPACE).toOption val config = new ConfigBuilder() .withApiVersion("v1") .withMasterUrl(masterUrl) .withOption(oauthTokenValue) { - (token, configBuilder) => configBuilder.withOauthToken(token) + (token, builder) => builder.withOauthToken(token) } .withOption(oauthTokenFile) { - (file, configBuilder) => - configBuilder - .withOauthToken(Files.toString(new File(file), Charsets.UTF_8)) + (file, builder) => builder.withOauthToken(Files.toString(new File(file), Charsets.UTF_8)) } .withOption(caCertFile) { - (file, configBuilder) => configBuilder.withCaCertFile(file) + (file, builder) => builder.withCaCertFile(file) } .withOption(clientKeyFile) { - (file, configBuilder) => configBuilder.withClientKeyFile(file) + (file, builder) => builder.withClientKeyFile(file) } .withOption(clientCertFile) { - (file, configBuilder) => configBuilder.withClientCertFile(file) + (file, builder) => builder.withClientCertFile(file) + } + .withOption(clientNamespace) { + (namespace, builder) => builder.withNamespace(namespace) } .build() new LivyKubernetesClient( diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index c3c61d48f..4188289f1 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -118,7 +118,7 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { } describe("KubernetesClientFactory") { - it("should build KubernetesApi url from LivyConf masterUrl") { + it("should build KubernetesApi url from LivyConf master url") { def actual(sparkMaster: String): String = KubernetesClientFactory.sparkMasterToKubernetesApi(sparkMaster) @@ -136,7 +136,7 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { assert(nonNull(KubernetesClientFactory.createKubernetesClient(new LivyConf(false)))) } - it("should throw IllegalArgumentException in both oauth file and token provided") { + it("should throw IllegalArgumentException if both oauth file and token provided") { val conf = new LivyConf(false) .set(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE, "dummy_path") .set(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE, "dummy_value") From 9a458b7e7450ca11995a70e88ba67fae8fcc3d76 Mon Sep 17 00:00:00 2001 From: jahstreet Date: Wed, 30 Oct 2019 22:24:18 +0100 Subject: [PATCH 07/16] Update livy.conf.template --- conf/livy.conf.template | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index d8c14265d..80934be48 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -151,10 +151,10 @@ # How often Livy polls YARN to refresh YARN app state. # livy.server.yarn.poll-interval = 5s -# If Livy can't find the Kubernetes app within this time, consider it lost. +# If Livy can't find the Kubernetes app within this time, consider it lost # livy.server.kubernetes.app-lookup-timeout = 600s # When the cluster is busy, we may fail to launch Kubernetes app in app-lookup-timeout, then it -# would cause session leakage, so we need to check session leakage. +# would cause session leakage, so we need to check session leakage # How long to check livy session leakage # livy.server.kubernetes.app-leakage.check-timeout = 600s # How often to check livy session leakage @@ -165,7 +165,7 @@ # livy.server.kubernetes.poll-interval = 15s # Comma-separated list of the Kubernetes namespaces to allow for applications creation. -# All namespaces are allowed if empty. +# All namespaces are allowed if empty # livy.server.kubernetes.allowedNamespaces = # Days to keep Livy server request logs. @@ -215,5 +215,5 @@ # livy.server.kubernetes.clientKeyFile = # Kubernetes client cert file path # livy.server.kubernetes.clientCertFile = -# Kubernetes client default namespace. +# Kubernetes client default namespace # livy.server.kubernetes.defaultNamespace = From 6a37bd3f69b6795ba1dbfccaf1a40389a4ab2bea Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sat, 2 Nov 2019 12:16:00 +0100 Subject: [PATCH 08/16] Minor refactoring Signed-off-by: Aliaksandr Sasnouskikh --- .../livy/utils/SparkKubernetesApp.scala | 58 +++++++++---------- .../livy/utils/SparkKubernetesAppSpec.scala | 5 +- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index e151ab81f..95d840795 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -93,6 +93,33 @@ object SparkKubernetesApp extends Logging { withRetry(fn, retries - 1) case Failure(e) => throw e } + + private[utils] object KubernetesApplicationState { + val PENDING = "pending" + val RUNNING = "running" + val SUCCEEDED = "succeeded" + val FAILED = "failed" + } + + private[utils] def mapKubernetesState( + kubernetesAppState: String, + appTag: String): SparkApp.State.Value = { + import KubernetesApplicationState._ + kubernetesAppState.toLowerCase match { + case PENDING => + SparkApp.State.STARTING + case RUNNING => + SparkApp.State.RUNNING + case SUCCEEDED => + SparkApp.State.FINISHED + case FAILED => + SparkApp.State.FAILED + case other => // any other combination is invalid, so FAIL the application. + error(s"Unknown Kubernetes state $other for app with tag $appTag") + SparkApp.State.FAILED + } + } + } class SparkKubernetesApp private[utils]( @@ -130,7 +157,7 @@ class SparkKubernetesApp private[utils]( appPromise.success(app) val appId = app.getApplicationId - Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId") + Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appTag") listener.foreach(_.appIdKnown(appId)) while (isRunning) { @@ -153,6 +180,7 @@ class SparkKubernetesApp private[utils]( changeState(SparkApp.State.FAILED) } finally { listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = None))) + info(s"Finished monitoring application $appTag with state $state") } } @@ -222,34 +250,6 @@ class SparkKubernetesApp private[utils]( } } - private def mapKubernetesState( - kubernetesAppState: String, - appTag: String): SparkApp.State.Value = { - import KubernetesApplicationState._ - kubernetesAppState.toLowerCase match { - case PENDING | CONTAINER_CREATING => - SparkApp.State.STARTING - case RUNNING => - SparkApp.State.RUNNING - case COMPLETED | SUCCEEDED => - SparkApp.State.FINISHED - case FAILED | ERROR => - SparkApp.State.FAILED - case other => // any other combination is invalid, so FAIL the application. - error(s"Unknown Kubernetes state $other for app with tag $appTag.") - SparkApp.State.FAILED - } - } -} - -object KubernetesApplicationState { - val PENDING = "pending" - val CONTAINER_CREATING = "containercreating" - val RUNNING = "running" - val COMPLETED = "completed" - val SUCCEEDED = "succeeded" - val FAILED = "failed" - val ERROR = "error" } object KubernetesConstants { diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index 4188289f1..cbb10fc64 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -31,7 +31,6 @@ import org.scalatest.FunSpec import org.scalatest.mock.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} -import org.apache.livy.utils.KubernetesApplicationState._ import org.apache.livy.utils.SparkApp.State class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { @@ -65,11 +64,11 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { val mockListener = mock[SparkAppListener] // Simulate Kubernetes app state progression. + import SparkKubernetesApp.KubernetesApplicationState._ val applicationStateList = List( PENDING, - CONTAINER_CREATING, RUNNING, - COMPLETED + SUCCEEDED ) val stateIndex = new AtomicInteger(0) when(mockAppReport.getApplicationState).thenAnswer( From 79544812f2dedcff05b02f4718f672c2c22fc7fa Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sat, 2 Nov 2019 12:51:59 +0100 Subject: [PATCH 09/16] Remove appInfoChanged on finish monitoring app Signed-off-by: Aliaksandr Sasnouskikh --- .../main/scala/org/apache/livy/utils/SparkKubernetesApp.scala | 1 - .../scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 95d840795..c8561c14e 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -179,7 +179,6 @@ class SparkKubernetesApp private[utils]( kubernetesDiagnostics = ArrayBuffer(e.getMessage) changeState(SparkApp.State.FAILED) } finally { - listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = None))) info(s"Finished monitoring application $appTag with state $state") } } diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index cbb10fc64..f6ea6eded 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -95,7 +95,6 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { verify(mockClient, atLeast(1)) .getApplicationReport(eqs(mockApp), anyInt(), anyString()) verify(mockListener).appIdKnown(appId) - verify(mockListener).infoChanged(eqs(AppInfo())) verify(mockListener).stateChanged(State.STARTING, State.RUNNING) verify(mockListener).stateChanged(State.RUNNING, State.FINISHED) } From 52b45e1b2a1026ff37496e075a8c2361f47d8897 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sat, 2 Nov 2019 16:04:55 +0100 Subject: [PATCH 10/16] Trigger build Signed-off-by: Aliaksandr Sasnouskikh --- .../scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index f6ea6eded..87fb36afb 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -90,7 +90,7 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { cleanupThread(app.kubernetesAppMonitorThread) { app.kubernetesAppMonitorThread.join(TEST_TIMEOUT.toMillis) assert(!app.kubernetesAppMonitorThread.isAlive, - "KubernetesAppMonitorThread should terminate after Kubernetes app is finished.") + "KubernetesAppMonitorThread should terminate after Kubernetes app is finished") verify(mockClient, atLeast(1)).getApplications(any(), anyString(), anyString()) verify(mockClient, atLeast(1)) .getApplicationReport(eqs(mockApp), anyInt(), anyString()) From 06f14db81b2e4eb20f9d2d5938357862dd1c00ed Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Wed, 13 Nov 2019 15:35:46 +0100 Subject: [PATCH 11/16] Resolve conversations Signed-off-by: Aliaksandr Sasnouskikh --- .../org/apache/livy/utils/SparkApp.scala | 2 +- .../livy/utils/SparkKubernetesApp.scala | 148 ++++++++++-------- 2 files changed, 84 insertions(+), 66 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index ac74e585d..ffe7eecc7 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -80,7 +80,7 @@ object SparkApp { if (kubernetesNamespaces.nonEmpty && !kubernetesNamespaces.contains(targetNamespace)) { throw new IllegalArgumentException( s"Requested namespace $targetNamespace doesn't match the configured: " + - s"${kubernetesNamespaces.mkString(", ")}") + kubernetesNamespaces.mkString(", ")) } import KubernetesConstants._ diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index c8561c14e..4a38be396 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -34,14 +34,10 @@ import org.apache.livy.{LivyConf, Logging, Utils} object SparkKubernetesApp extends Logging { - private val RETRY_BACKOFF_MILLIS = 1000 - lazy val kubernetesClient: LivyKubernetesClient = KubernetesClientFactory.createKubernetesClient(livyConf) - private var livyConf: LivyConf = _ - private var sessionLeakageCheckTimeout: Long = _ - private var sessionLeakageCheckInterval: Long = _ + private val RETRY_BACKOFF_MILLIS = 1000 private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() private val leakedAppsGCThread = new Thread() { @@ -51,21 +47,30 @@ object SparkKubernetesApp extends Logging { // kill the app if found it or remove it if exceeding a threshold val leakedApps = leakedAppTags.entrySet().iterator() val now = System.currentTimeMillis() - val apps = withRetry(kubernetesClient.getApplications()).groupBy(_.getApplicationTag) - while (leakedApps.hasNext) { - val leakedApp = leakedApps.next() - apps.get(leakedApp.getKey) match { - case Some(seq) => - seq.foreach(app => - if (withRetry(kubernetesClient.killApplication(app))) { - leakedApps.remove() - info(s"Killed leaked app with tag ${leakedApp.getKey}") - } - ) - case None if (leakedApp.getValue - now) > sessionLeakageCheckTimeout => - leakedApps.remove() - info(s"Remove leaked Kubernetes app tag ${leakedApp.getKey}") + try { + val apps = withRetry(kubernetesClient.getApplications()).groupBy(_.getApplicationTag) + while (leakedApps.hasNext) { + val leakedApp = leakedApps.next() + apps.get(leakedApp.getKey) match { + case Some(seq) => + seq.foreach(app => + if (withRetry(kubernetesClient.killApplication(app))) { + leakedApps.remove() + info(s"Killed leaked app with tag ${leakedApp.getKey}") + } else { + warn(s"Leaked app with tag ${leakedApp.getKey} haven't been killed") + } + ) + case None if (leakedApp.getValue - now) > sessionLeakageCheckTimeout => + leakedApps.remove() + warn(s"Leaked app with tag ${leakedApp.getKey} doesn't exist") + } } + } catch { + case e: KubernetesClientException => + error("Kubernetes client failure", e) + case NonFatal(e) => + error("Failed to remove leaked apps", e) } } Thread.sleep(sessionLeakageCheckInterval) @@ -73,6 +78,10 @@ object SparkKubernetesApp extends Logging { } } + private var livyConf: LivyConf = _ + private var sessionLeakageCheckTimeout: Long = _ + private var sessionLeakageCheckInterval: Long = _ + def init(livyConf: LivyConf): Unit = { this.livyConf = livyConf sessionLeakageCheckInterval = @@ -85,7 +94,7 @@ object SparkKubernetesApp extends Logging { // Returning T, throwing the exception on failure @tailrec - private def withRetry[T](fn: => T, retries: Int = 3): T = + private def withRetry[T](fn: => T, retries: Int = 3): T = { Try { fn } match { case Success(x) => x case _ if retries > 1 => @@ -93,17 +102,11 @@ object SparkKubernetesApp extends Logging { withRetry(fn, retries - 1) case Failure(e) => throw e } - - private[utils] object KubernetesApplicationState { - val PENDING = "pending" - val RUNNING = "running" - val SUCCEEDED = "succeeded" - val FAILED = "failed" } private[utils] def mapKubernetesState( - kubernetesAppState: String, - appTag: String): SparkApp.State.Value = { + kubernetesAppState: String, + appTag: String): SparkApp.State.Value = { import KubernetesApplicationState._ kubernetesAppState.toLowerCase match { case PENDING => @@ -120,15 +123,22 @@ object SparkKubernetesApp extends Logging { } } + private[utils] object KubernetesApplicationState { + val PENDING = "pending" + val RUNNING = "running" + val SUCCEEDED = "succeeded" + val FAILED = "failed" + } } class SparkKubernetesApp private[utils]( - appTag: String, - appIdOption: Option[String], - process: Option[LineBufferedProcess], - listener: Option[SparkAppListener], - livyConf: LivyConf, - kubernetesClient: => LivyKubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit tests + appTag: String, + appIdOption: Option[String], + process: Option[LineBufferedProcess], + listener: Option[SparkAppListener], + livyConf: LivyConf, + // For unit tests + kubernetesClient: => LivyKubernetesClient = SparkKubernetesApp.kubernetesClient) extends SparkApp with Logging { import SparkKubernetesApp._ @@ -175,7 +185,7 @@ class SparkKubernetesApp private[utils]( kubernetesDiagnostics = ArrayBuffer("Application stopped by user.") changeState(SparkApp.State.KILLED) case NonFatal(e) => - error("Error while refreshing Kubernetes state", e) + error("Couldn't refresh Kubernetes state", e) kubernetesDiagnostics = ArrayBuffer(e.getMessage) changeState(SparkApp.State.FAILED) } finally { @@ -183,13 +193,14 @@ class SparkKubernetesApp private[utils]( } } - override def log(): IndexedSeq[String] = + override def log(): IndexedSeq[String] = { ("stdout: " +: kubernetesAppLog) ++ ("\nstderr: " +: (process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++ process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++ ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics) + } - override def kill(): Unit = + override def kill(): Unit = { try { withRetry { kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout)) @@ -204,6 +215,7 @@ class SparkKubernetesApp private[utils]( } finally { process.foreach(_.destroy()) } + } private def isRunning: Boolean = { state != SparkApp.State.FAILED && @@ -227,9 +239,9 @@ class SparkKubernetesApp private[utils]( */ @tailrec private def getAppFromTag( - appTag: String, - pollInterval: Duration, - deadline: Deadline): KubernetesApplication = { + appTag: String, + pollInterval: Duration, + deadline: Deadline): KubernetesApplication = { withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag))) match { case Some(app) => app @@ -281,12 +293,13 @@ class KubernetesApplication(driverPod: Pod) { } private[utils] case class KubernetesAppReport( - driver: Option[Pod], - executors: Seq[Pod], - appLog: IndexedSeq[String]) { + driver: Option[Pod], + executors: Seq[Pod], + appLog: IndexedSeq[String]) { - def getApplicationState: String = + def getApplicationState: String = { driver.map(_.getStatus.getPhase.toLowerCase).getOrElse("unknown") + } def getApplicationLog: IndexedSeq[String] = appLog @@ -299,9 +312,11 @@ private[utils] case class KubernetesAppReport( private def buildSparkPodDiagnosticsPrettyString(podOption: Option[Pod]): String = { import scala.collection.JavaConverters._ - def printMap(map: Map[_, _]): String = map.map { - case (key, value) => s"$key=$value" - }.mkString(", ") + def printMap(map: Map[_, _]): String = { + map.map { + case (key, value) => s"$key=$value" + }.mkString(", ") + } if (podOption.isEmpty) return "unknown" val pod = podOption.get @@ -331,16 +346,15 @@ private[utils] case class KubernetesAppReport( } private[utils] class LivyKubernetesClient( - client: DefaultKubernetesClient, namespaces: Set[String] = Set.empty) { + client: DefaultKubernetesClient, namespaces: Set[String] = Set.empty) { import KubernetesConstants._ import scala.collection.JavaConverters._ def getApplications( - labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER), - appTagLabel: String = SPARK_APP_TAG_LABEL, - appIdLabel: String = SPARK_APP_ID_LABEL - ): Seq[KubernetesApplication] = + labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER), + appTagLabel: String = SPARK_APP_TAG_LABEL, + appIdLabel: String = SPARK_APP_ID_LABEL): Seq[KubernetesApplication] = { Option(namespaces).filter(_.nonEmpty) .map(_.map(client.inNamespace)) .getOrElse(Seq(client.inAnyNamespace())) @@ -350,15 +364,16 @@ private[utils] class LivyKubernetesClient( .withLabel(appIdLabel) .list.getItems.asScala.map(new KubernetesApplication(_))) .reduce(_ ++ _) + } - def killApplication(app: KubernetesApplication): Boolean = + def killApplication(app: KubernetesApplication): Boolean = { client.inNamespace(app.getApplicationNamespace).pods.delete(app.getApplicationPod) + } def getApplicationReport( - app: KubernetesApplication, - cacheLogSize: Int, - appTagLabel: String = SPARK_APP_TAG_LABEL - ): KubernetesAppReport = { + app: KubernetesApplication, + cacheLogSize: Int, + appTagLabel: String = SPARK_APP_TAG_LABEL): KubernetesAppReport = { val pods = client.inNamespace(app.getApplicationNamespace).pods .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava) .list.getItems.asScala @@ -368,22 +383,24 @@ private[utils] class LivyKubernetesClient( KubernetesAppReport(driver, executors, appLog) } - def getDefaultNamespace: String = client.getNamespace - private def getApplicationLog( - app: KubernetesApplication, cacheLogSize: Int): IndexedSeq[String] = + app: KubernetesApplication, cacheLogSize: Int): IndexedSeq[String] = { Try( client.inNamespace(app.getApplicationNamespace).pods .withName(app.getApplicationPod.getMetadata.getName) .tailingLines(cacheLogSize).getLog.split("\n").toIndexedSeq ).getOrElse(IndexedSeq.empty) + } - private def isDriver: Pod => Boolean = + private def isDriver: Pod => Boolean = { _.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER + } - private def isExecutor: Pod => Boolean = + private def isExecutor: Pod => Boolean = { _.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR + } + def getDefaultNamespace: String = client.getNamespace } private[utils] object KubernetesClientFactory { @@ -441,17 +458,18 @@ private[utils] object KubernetesClientFactory { } implicit class OptionString(val string: String) extends AnyVal { - def toOption: Option[String] = + def toOption: Option[String] = { if (string == null || string.trim.isEmpty) None else Some(string) + } } private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder) extends AnyVal { def withOption[T](option: Option[T]) - (configurator: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder = + (configurator: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder = { option.map(configurator(_, configBuilder)).getOrElse(configBuilder) + } } - } From e6bc1a2857d7c900aa7841b0de8ae68010970abc Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Thu, 5 Dec 2019 10:46:28 +0100 Subject: [PATCH 12/16] Minor tweaks Signed-off-by: Aliaksandr Sasnouskikh --- .../main/scala/org/apache/livy/utils/SparkKubernetesApp.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 4a38be396..cab0e0f97 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -249,7 +249,7 @@ class SparkKubernetesApp private[utils]( if (deadline.isOverdue) { process.foreach(_.destroy()) leakedAppTags.put(appTag, System.currentTimeMillis()) - throw new IllegalStateException(s"No Kubernetes application is found with tag" + + throw new IllegalStateException("No Kubernetes application is found with tag" + s" $appTag in ${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" + " seconds. This may be because 1) spark-submit fail to submit application to " + "Kubernetes; or 2) Kubernetes cluster doesn't have enough resources to start the " + @@ -260,7 +260,6 @@ class SparkKubernetesApp private[utils]( } } } - } object KubernetesConstants { From 5c0699227a40428404f04bd680f389428a07ba29 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Wed, 15 Jan 2020 10:14:48 +0100 Subject: [PATCH 13/16] Resolve conversatins Signed-off-by: Aliaksandr Sasnouskikh --- .../livy/server/interactive/InteractiveSession.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index a1030d461..7a8bee270 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -402,10 +402,9 @@ class InteractiveSession( val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) - if (livyConf.isRunningOnYarn() || driverProcess.isDefined) { - Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) - } else if (livyConf.isRunningOnKubernetes()) { - // Create SparkKubernetesApp anyway to recover app monitoring on Livy server restart + if (livyConf.isRunningOnYarn() || driverProcess.isDefined + // Create SparkKubernetesApp anyway to recover app monitoring on Livy server restart + || livyConf.isRunningOnKubernetes()) { Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) } else { None From 769ec2313922be9839d3a4c86b05d0ecb12716f4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sat, 4 Apr 2020 15:16:57 +0200 Subject: [PATCH 14/16] Upgrade Kubernetes client version Signed-off-by: Aliaksandr Sasnouskikh --- pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5998e2e10..576803b14 100644 --- a/pom.xml +++ b/pom.xml @@ -84,7 +84,7 @@ 2.4.5 2.4.5 ${spark.scala-2.11.version} - 4.1.1 + 4.6.4 3.0.0 1.9 4.5.3 @@ -1088,6 +1088,7 @@ https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz spark-3.0.0-bin-hadoop2.7 + 4.9.2 From 3f765122e67f7ebe413c1f7684c3878da978735d Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 5 Apr 2020 01:09:14 +0200 Subject: [PATCH 15/16] Fix import --- .../main/scala/org/apache/livy/utils/SparkKubernetesApp.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index cab0e0f97..6091705cb 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -27,7 +27,7 @@ import scala.language.postfixOps import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal -import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client._ import org.apache.livy.{LivyConf, Logging, Utils} From b87c0cebb65ce7f34e6b4b6b738095be6254cf69 Mon Sep 17 00:00:00 2001 From: Aliaksandr Sasnouskikh Date: Sun, 5 Jul 2020 10:17:02 +0200 Subject: [PATCH 16/16] Add jackson.version 2.10.3 to spark-3.0 profile to fit kubernetes client compatibility --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 576803b14..81588c675 100644 --- a/pom.xml +++ b/pom.xml @@ -1089,6 +1089,7 @@ spark-3.0.0-bin-hadoop2.7 4.9.2 + 2.10.3