diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 456bec749..7d6cdb4d5 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -157,6 +157,9 @@ # If the Livy Web UI should be included in the Livy Server. Enabled by default. # livy.ui.enabled = true +# Used to build links to Spark History Server pages on Spark App completion (Kubernetes only) +# livy.ui.history-server-url = http://spark-history-server + # Whether to enable Livy server access control, if it is true then all the income requests will # be checked if the requested user has permission. # livy.server.access-control.enabled = false @@ -185,3 +188,55 @@ # livy.server.auth..class = # livy.server.auth..param. = # livy.server.auth..param. = + +# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount +# if deployed to Kubernetes cluster as a Pod) +# Kubernetes oauth token file path +# livy.server.kubernetes.oauthTokenFile = +# Kubernetes oauth token string value +# livy.server.kubernetes.oauthTokenValue = +# Kubernetes CA cert file path +# livy.server.kubernetes.caCertFile = +# Kubernetes client key file path +# livy.server.kubernetes.clientKeyFile = +# Kubernetes client cert file path +# livy.server.kubernetes.clientCertFile = + +# If Livy can't find the Kubernetes app within this time, consider it lost. +# livy.server.kubernetes.app-lookup-timeout = 600s +# When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would +# cause session leakage, so we need to check session leakage. +# How long to check livy session leakage +# livy.server.kubernetes.app-leakage.check-timeout = 600s +# How often to check livy session leakage +# livy.server.kubernetes.app-leakage.check-interval = 60s + +# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description +# details, routes, etc...) +# livy.server.kubernetes.poll-interval = 15s + +# Weather to create Kubernetes Nginx Ingress for Spark UI. If set to true, configure the desired +# options below +# livy.server.kubernetes.ingress.create = false +# Kubernetes Nginx Ingress protocol. If set to https refer Ingress TLS section below +# livy.server.kubernetes.ingress.protocol = http +# Kubernetes Nginx Ingress host. Be sure to set it to the FQDN of your Nginx Ingress Controller +# proxy server +# livy.server.kubernetes.ingress.host = localhost +# Kubernetes secret name for Nginx Ingress TLS. Is omitted if 'livy.server.kubernetes.ingress.protocol' +# is not https +# livy.server.kubernetes.ingress.tls.secretName = spark-cluster-tls +# Kubernetes Nginx Ingress additional configuration snippet for specific config options +# livy.server.kubernetes.ingress.additionalConfSnippet = +# Kubernetes Nginx Ingress additional annotations for specific config options, eg. for configuring +# basic auth of external oauth2 proxy. Format: annotation1=value1;annotation2=value2;... +# livy.server.kubernetes.ingress.additionalAnnotations = + +# Set to true to enable Grafana Loki integration and configure options below +livy.server.kubernetes.grafana.loki.enabled = false +# Grafana UI root endpoint to build links based on +# livy.server.kubernetes.grafana.url = http://localhost:3000 +# Grafana Datasource name for Loki +# livy.server.kubernetes.grafana.loki.datasource = loki +# Time range from now to past to get logs for +# livy.server.kubernetes.grafana.timeRange = 6h diff --git a/pom.xml b/pom.xml index d2e535a70..81588c675 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,7 @@ 2.4.5 2.4.5 ${spark.scala-2.11.version} + 4.6.4 3.0.0 1.9 4.5.3 @@ -316,6 +317,18 @@ ${metrics.version} + + io.fabric8 + kubernetes-client + ${kubernetes.client.version} + + + com.fasterxml.jackson.core + * + + + + io.netty netty-all @@ -1075,6 +1088,8 @@ https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz spark-3.0.0-bin-hadoop2.7 + 4.9.2 + 2.10.3 diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java index a8f31f79d..7fd62554a 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java @@ -169,6 +169,14 @@ private void initializeServer() throws Exception { // on the cluster, it would be tricky to solve that problem in a generic way. livyConf.set(RPC_SERVER_ADDRESS, null); + // If we are running on Kubernetes, get RPC_SERVER_ADDRESS from "spark.driver.host" option + // this option is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep: + // line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc" + if (conf.get("spark.master").startsWith("k8s")) { + livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host")); + } + + if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) { // Test flag is turned on so we will just infinite loop here. It should cause // timeout and we should still see yarn application being cleaned up. diff --git a/server/pom.xml b/server/pom.xml index 188158b03..992a32e8c 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -151,6 +151,11 @@ ${zookeeper.version} + + io.fabric8 + kubernetes-client + + org.apache.httpcomponents httpclient diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css b/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css index fc2ca3075..a7df0ec2a 100644 --- a/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css +++ b/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css @@ -41,6 +41,12 @@ td .progress { margin: 0; } +.with-scroll-bar { + display: block; + overflow-y: scroll; + max-height: 200px; +} + #session-summary { margin: 20px 0; } \ No newline at end of file diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js b/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js index 6e3570204..b0d0fc02e 100644 --- a/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js +++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js @@ -26,7 +26,7 @@ function loadSessionsTable(sessions) { tdWrap(session.proxyUser) + tdWrap(session.kind) + tdWrap(session.state) + - tdWrap(logLinks(session, "session")) + + tdWrapWithClass(logLinks(session, "session"), "with-scroll-bar") + "" ); }); @@ -42,7 +42,7 @@ function loadBatchesTable(sessions) { tdWrap(session.owner) + tdWrap(session.proxyUser) + tdWrap(session.state) + - tdWrap(logLinks(session, "batch")) + + tdWrapWithClass(logLinks(session, "batch"), "with-scroll-bar") + "" ); }); diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js b/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js index f2d743ae6..af9352512 100644 --- a/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js +++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js @@ -52,10 +52,23 @@ function driverLogLink(session) { } } +function executorsLogLinks(session) { + var executorLogUrls = session.appInfo.executorLogUrls; + if (executorLogUrls != null) { + return executorLogUrls.split(";").map(function (pair) { + var nameAndLink = pair.split("#"); + return divWrap(anchorLink(nameAndLink[1], nameAndLink[0])); + }).join(""); + } else { + return ""; + } +} + function logLinks(session, kind) { var sessionLog = divWrap(uiLink(kind + "/" + session.id + "/log", "session")); var driverLog = divWrap(driverLogLink(session)); - return sessionLog + driverLog; + var executorsLogs = executorsLogLinks(session); + return sessionLog + driverLog + executorsLogs; } function appIdLink(session) { @@ -75,6 +88,18 @@ function tdWrap(val) { return "" + inner + ""; } +function tdWrapWithClass(val, cl) { + var inner = ""; + if (val != null) { + inner = val; + } + var clVal = ""; + if (cl != null) { + clVal = " class=\"" + cl + "\""; + } + return "" + inner + ""; +} + function preWrap(inner) { return "
" + escapeHtml(inner) + "
"; } diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js b/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js index c87e5ca40..3a23dc982 100644 --- a/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js +++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js @@ -23,6 +23,18 @@ function sumWrap(name, val) { } } +function sumWrapWithClass(name, val, cl) { + var clVal = ""; + if (cl != null) { + clVal = " class=\"" + cl + "\""; + } + if (val != null) { + return "" + name + ": " + val + ""; + } else { + return ""; + } +} + function formatError(output) { var errStr = output.evalue + "\n"; var trace = output.traceback; @@ -93,7 +105,7 @@ function appendSummary(session) { sumWrap("Proxy User", session.proxyUser) + sumWrap("Session Kind", session.kind) + sumWrap("State", session.state) + - sumWrap("Logs", logLinks(session, "session")) + + sumWrapWithClass("Logs", logLinks(session, "session"), "with-scroll-bar") + "" ); } diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 1b06fdebd..f33397d41 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -63,6 +63,7 @@ object LivyConf { val SERVER_BASE_PATH = Entry("livy.ui.basePath", "") val UI_ENABLED = Entry("livy.ui.enabled", true) + val UI_HISTORY_SERVER_URL = Entry("livy.ui.history-server-url", "http://spark-history-server") val REQUEST_HEADER_SIZE = Entry("livy.server.request-header.size", 131072) val RESPONSE_HEADER_SIZE = Entry("livy.server.response-header.size", 131072) @@ -249,6 +250,52 @@ object LivyConf { // how often to check livy session leakage val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s") + // Kubernetes oauth token file path. + val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "") + // Kubernetes oauth token string value. + val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "") + // Kubernetes CA cert file path. + val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "") + // Kubernetes client key file path. + val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "") + // Kubernetes client cert file path. + val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "") + + // If Livy can't find the Kubernetes app within this time, consider it lost. + val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s") + // How often Livy polls Kubernetes to refresh Kubernetes app state. + val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s") + + // How long to check livy session leakage. + val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT = + Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s") + // How often to check livy session leakage. + val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL = + Entry("livy.server.kubernetes.app-leakage.check-interval", "60s") + + // Weather to create Kubernetes Nginx Ingress for Spark UI. + val KUBERNETES_INGRESS_CREATE = Entry("livy.server.kubernetes.ingress.create", false) + // Kubernetes Nginx Ingress protocol. + val KUBERNETES_INGRESS_PROTOCOL = Entry("livy.server.kubernetes.ingress.protocol", "http") + // Kubernetes Nginx Ingress host. + val KUBERNETES_INGRESS_HOST = Entry("livy.server.kubernetes.ingress.host", "localhost") + // Kubernetes Nginx Ingress additional configuration snippet. + val KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET = + Entry("livy.server.kubernetes.ingress.additionalConfSnippet", "") + // Kubernetes Nginx Ingress additional annotations: key1=value1;key2=value2;... . + val KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS = + Entry("livy.server.kubernetes.ingress.additionalAnnotations", "") + // Kubernetes secret name for Nginx Ingress TLS. + // Is omitted if 'livy.server.kubernetes.ingress.protocol' value doesn't end with 's' + val KUBERNETES_INGRESS_TLS_SECRET_NAME = + Entry("livy.server.kubernetes.ingress.tls.secretName", "spark-cluster-tls") + + val KUBERNETES_GRAFANA_LOKI_ENABLED = Entry("livy.server.kubernetes.grafana.loki.enabled", false) + val KUBERNETES_GRAFANA_URL = Entry("livy.server.kubernetes.grafana.url", "http://localhost:3000") + val KUBERNETES_GRAFANA_LOKI_DATASOURCE = + Entry("livy.server.kubernetes.grafana.loki.datasource", "loki") + val KUBERNETES_GRAFANA_TIME_RANGE = Entry("livy.server.kubernetes.grafana.timeRange", "6h") + // Whether session timeout should be checked, by default it will be checked, which means inactive // session will be stopped after "livy.server.session.timeout" val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) @@ -360,6 +407,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return true if spark master starts with yarn. */ def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn") + /** Return true if spark master starts with k8s. */ + def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s") + /** Return the spark deploy mode Livy sessions should use. */ def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 3e715bdf1..a21669d42 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -19,8 +19,8 @@ package org.apache.livy.server import java.io.{BufferedInputStream, InputStream} import java.net.InetAddress -import java.util.concurrent._ import java.util.EnumSet +import java.util.concurrent._ import javax.servlet._ import scala.collection.JavaConverters._ @@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManag import org.apache.livy.server.ui.UIServlet import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager} import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF +import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp} import org.apache.livy.utils.LivySparkUtils._ -import org.apache.livy.utils.SparkYarnApp class LivyServer extends Logging { @@ -142,10 +142,13 @@ class LivyServer extends Logging { testRecovery(livyConf) - // Initialize YarnClient ASAP to save time. + // Initialize YarnClient/KubernetesClient ASAP to save time. if (livyConf.isRunningOnYarn()) { SparkYarnApp.init(livyConf) Future { SparkYarnApp.yarnClient } + } else if (livyConf.isRunningOnKubernetes()) { + SparkKubernetesApp.init(livyConf) + Future { SparkKubernetesApp.kubernetesClient } } if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") { @@ -415,10 +418,10 @@ class LivyServer extends Logging { } private[livy] def testRecovery(livyConf: LivyConf): Unit = { - if (!livyConf.isRunningOnYarn()) { - // If recovery is turned on but we are not running on YARN, quit. + if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) { + // If recovery is turned on but we are not running on YARN or Kubernetes, quit. require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF, - "Session recovery requires YARN.") + s"Session recovery requires YARN or Kubernetes.") } } } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index c4c273acd..838160122 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -402,7 +402,9 @@ class InteractiveSession( val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) - if (livyConf.isRunningOnYarn() || driverProcess.isDefined) { + if (!livyConf.isRunningOnKubernetes()){ + driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + } else if (livyConf.isRunningOnYarn() || driverProcess.isDefined) { Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) } else { None @@ -481,6 +483,7 @@ class InteractiveSession( transition(SessionState.ShuttingDown) sessionStore.remove(RECOVERY_SESSION_TYPE, id) client.foreach { _.stop(true) } + if(livyConf.isRunningOnKubernetes()) app.foreach(_.kill()) } catch { case _: Exception => app.foreach { diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index 9afe28162..13ec692be 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -24,12 +24,20 @@ import org.apache.livy.LivyConf object AppInfo { val DRIVER_LOG_URL_NAME = "driverLogUrl" val SPARK_UI_URL_NAME = "sparkUiUrl" + val EXECUTORS_LOG_URLS_NAME = "executorLogUrls" } -case class AppInfo(var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None) { +case class AppInfo( + var driverLogUrl: Option[String] = None, + var sparkUiUrl: Option[String] = None, + var executorLogUrls: Option[String] = None) { import AppInfo._ def asJavaMap: java.util.Map[String, String] = - Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, SPARK_UI_URL_NAME -> sparkUiUrl.orNull).asJava + Map( + DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, + SPARK_UI_URL_NAME -> sparkUiUrl.orNull, + EXECUTORS_LOG_URLS_NAME -> executorLogUrls.orNull + ).asJava } trait SparkAppListener { @@ -71,13 +79,20 @@ object SparkApp { sparkConf ++ Map( SPARK_YARN_TAG_KEY -> mergedYarnTags, "spark.yarn.submit.waitAppCompletion" -> "false") + } else if (livyConf.isRunningOnKubernetes()) { + import KubernetesConstants._ + sparkConf ++ Map( + s"spark.kubernetes.driver.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag, + s"spark.kubernetes.executor.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag, + "spark.kubernetes.submission.waitAppCompletion" -> "false") } else { sparkConf } } /** - * Return a SparkApp object to control the underlying Spark application via YARN or spark-submit. + * Return a SparkApp object to control the underlying Spark application via YARN, Kubernetes + * or spark-submit. * * @param uniqueAppTag A tag that can uniquely identify the application. */ @@ -89,8 +104,11 @@ object SparkApp { listener: Option[SparkAppListener]): SparkApp = { if (livyConf.isRunningOnYarn()) { new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf) + } else if (livyConf.isRunningOnKubernetes()) { + new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf) } else { - require(process.isDefined, "process must not be None when Livy master is not YARN.") + require(process.isDefined, "process must not be None when Livy master is not YARN or" + + "Kubernetes.") new SparkProcApp(process.get, listener) } } diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala new file mode 100644 index 000000000..c3d0ad29e --- /dev/null +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -0,0 +1,679 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.utils + +import java.net.URLEncoder +import java.util.Collections +import java.util.concurrent.TimeoutException + +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.extensions.{Ingress, IngressBuilder} +import io.fabric8.kubernetes.client.{ConfigBuilder, _} +import org.apache.commons.lang.StringUtils + +import org.apache.livy.{LivyConf, Logging, Utils} + +object SparkKubernetesApp extends Logging { + + // KubernetesClient is thread safe. Create once, share it across threads. + lazy val kubernetesClient: DefaultKubernetesClient = + KubernetesClientFactory.createKubernetesClient(livyConf) + + private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() + + private val leakedAppsGCThread = new Thread() { + override def run(): Unit = { + import KubernetesExtensions._ + while (true) { + if (!leakedAppTags.isEmpty) { + // kill the app if found it and remove it if exceeding a threshold + val iter = leakedAppTags.entrySet().iterator() + var isRemoved = false + val now = System.currentTimeMillis() + val apps = withRetry(kubernetesClient.getApplications()) + while (iter.hasNext) { + val entry = iter.next() + apps.find(_.getApplicationTag.contains(entry.getKey)) + .foreach({ + app => + info(s"Kill leaked app ${app.getApplicationId}") + withRetry(kubernetesClient.killApplication(app)) + iter.remove() + isRemoved = true + }) + if (!isRemoved) { + if ((entry.getValue - now) > sessionLeakageCheckTimeout) { + iter.remove() + info(s"Remove leaked Kubernetes app tag ${entry.getKey}") + } + } + } + } + Thread.sleep(sessionLeakageCheckInterval) + } + } + } + + private var livyConf: LivyConf = _ + + private var cacheLogSize: Int = _ + private var appLookupTimeout: FiniteDuration = _ + private var pollInterval: FiniteDuration = _ + + private var sessionLeakageCheckTimeout: Long = _ + private var sessionLeakageCheckInterval: Long = _ + + def init(livyConf: LivyConf): Unit = { + this.livyConf = livyConf + + cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) + appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds + pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds + + sessionLeakageCheckInterval = + livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL) + sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT) + + leakedAppsGCThread.setDaemon(true) + leakedAppsGCThread.setName("LeakedAppsGCThread") + leakedAppsGCThread.start() + } + + // Returning T, throwing the exception on failure + @tailrec + private def withRetry[T](fn: => T, n: Int = 3, retryBackoff: Long = 1000): T = { + Try { fn } match { + case Success(x) => x + case _ if n > 1 => + Thread.sleep(Math.max(retryBackoff, 1000)) + withRetry(fn, n - 1) + case Failure(e) => throw e + } + } + +} + +class SparkKubernetesApp private[utils] ( + appTag: String, + appIdOption: Option[String], + process: Option[LineBufferedProcess], + listener: Option[SparkAppListener], + livyConf: LivyConf, + kubernetesClient: => KubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit test. + extends SparkApp + with Logging { + + import KubernetesExtensions._ + import SparkKubernetesApp._ + + private val appPromise: Promise[KubernetesApplication] = Promise() + private[utils] var state: SparkApp.State = SparkApp.State.STARTING + private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] + private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String] + + // Exposed for unit test. + // TODO Instead of spawning a thread for every session, create a centralized thread and + // batch Kubernetes queries. + private[utils] val kubernetesAppMonitorThread = Utils + .startDaemonThread(s"kubernetesAppMonitorThread-$this") { + try { + // Get KubernetesApplication by appTag. + val app: KubernetesApplication = try { + getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow) + } catch { + case e: Exception => + appPromise.failure(e) + throw e + } + appPromise.success(app) + val appId = app.getApplicationId + + Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId") + listener.foreach(_.appIdKnown(appId)) + + if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) { + withRetry(kubernetesClient.createSparkUIIngress(app, livyConf)) + } + + var appInfo = AppInfo() + while (isRunning) { + try { + Clock.sleep(pollInterval.toMillis) + + // Refresh application state + val appReport = withRetry { + kubernetesClient.getApplicationReport(livyConf, app, cacheLogSize = cacheLogSize) + } + + kubernetesAppLog = appReport.getApplicationLog + kubernetesDiagnostics = appReport.getApplicationDiagnostics + changeState(mapKubernetesState(appReport.getApplicationState, appTag)) + + val latestAppInfo = AppInfo( + appReport.getDriverLogUrl, + appReport.getTrackingUrl, + appReport.getExecutorsLogUrls + ) + if (appInfo != latestAppInfo) { + listener.foreach(_.infoChanged(latestAppInfo)) + appInfo = latestAppInfo + } + } catch { + // TODO analyse available exceptions + case e: Throwable => + throw e + } + } + debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}") + } catch { + case _: InterruptedException => + kubernetesDiagnostics = ArrayBuffer("Application stopped by user.") + changeState(SparkApp.State.KILLED) + case NonFatal(e) => + error(s"Error while refreshing Kubernetes state", e) + kubernetesDiagnostics = ArrayBuffer(e.getMessage) + changeState(SparkApp.State.FAILED) + } finally { + listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = Option(buildHistoryServerUiUrl( + livyConf, Try(appPromise.future.value.get.get.getApplicationId).getOrElse("unknown") + ))))) + } + } + + override def log(): IndexedSeq[String] = + ("stdout: " +: kubernetesAppLog) ++ + ("\nstderr: " +: (process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++ + process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++ + ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics) + + override def kill(): Unit = synchronized { + try { + withRetry(kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout))) + } catch { + // We cannot kill the Kubernetes app without the appTag. + // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure. + // We don't want a stuck session that can't be deleted. Emit a warning and move on. + case _: TimeoutException | _: InterruptedException => + warn("Deleting a session while its Kubernetes application is not found.") + kubernetesAppMonitorThread.interrupt() + } finally { + process.foreach(_.destroy()) + } + } + + private def isRunning: Boolean = { + state != SparkApp.State.FAILED && + state != SparkApp.State.FINISHED && + state != SparkApp.State.KILLED + } + + private def changeState(newState: SparkApp.State.Value): Unit = { + if (state != newState) { + listener.foreach(_.stateChanged(state, newState)) + state = newState + } + } + + /** + * Find the corresponding KubernetesApplication from an application tag. + * + * @param appTag The application tag tagged on the target application. + * If the tag is not unique, it returns the first application it found. + * @return KubernetesApplication or the failure. + */ + @tailrec + private def getAppFromTag( + appTag: String, + pollInterval: Duration, + deadline: Deadline): KubernetesApplication = { + import KubernetesExtensions._ + + withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag))) + match { + case Some(app) => app + case None => + if (deadline.isOverdue) { + process.foreach(_.destroy()) + leakedAppTags.put(appTag, System.currentTimeMillis()) + throw new IllegalStateException(s"No Kubernetes application is found with tag" + + s" $appTag in ${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" + + " seconds. This may be because 1) spark-submit fail to submit application to " + + "Kubernetes; or 2) Kubernetes cluster doesn't have enough resources to start the " + + "application in time. Please check Livy log and Kubernetes log to know the details.") + } else { + Clock.sleep(pollInterval.toMillis) + getAppFromTag(appTag, pollInterval, deadline) + } + } + } + + // Exposed for unit test. + private[utils] def mapKubernetesState( + kubernetesAppState: String, + appTag: String + ): SparkApp.State.Value = { + import KubernetesApplicationState._ + kubernetesAppState.toLowerCase match { + case PENDING | CONTAINER_CREATING => + SparkApp.State.STARTING + case RUNNING => + SparkApp.State.RUNNING + case COMPLETED | SUCCEEDED => + SparkApp.State.FINISHED + case FAILED | ERROR => + SparkApp.State.FAILED + case other => // any other combination is invalid, so FAIL the application. + error(s"Unknown Kubernetes state $other for app with tag $appTag.") + SparkApp.State.FAILED + } + } + + private def buildHistoryServerUiUrl(livyConf: LivyConf, appId: String): String = + s"${livyConf.get(LivyConf.UI_HISTORY_SERVER_URL)}/history/$appId/jobs/" + +} + +object KubernetesApplicationState { + val PENDING = "pending" + val CONTAINER_CREATING = "containercreating" + val RUNNING = "running" + val COMPLETED = "completed" + val SUCCEEDED = "succeeded" + val FAILED = "failed" + val ERROR = "error" +} + +object KubernetesConstants { + val SPARK_APP_ID_LABEL = "spark-app-selector" + val SPARK_APP_TAG_LABEL = "spark-app-tag" + val SPARK_ROLE_LABEL = "spark-role" + val SPARK_EXEC_ID_LABEL = "spark-exec-id" + val SPARK_ROLE_DRIVER = "driver" + val SPARK_ROLE_EXECUTOR = "executor" + val SPARK_UI_URL_LABEL = "spark-ui-url" + + val CREATED_BY_LIVY_LABEL = Map("created-by" -> "livy") + +} + +class KubernetesApplication(driverPod: Pod) { + + import KubernetesConstants._ + + private val appTag = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL) + private val appId = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) + private val namespace = driverPod.getMetadata.getNamespace + + def getApplicationTag: String = appTag + + def getApplicationId: String = appId + + def getApplicationNamespace: String = namespace + + def getApplicationPod: Pod = driverPod +} + +private[utils] case class KubernetesAppReport(driver: Option[Pod], executors: Seq[Pod], + appLog: IndexedSeq[String], ingress: Option[Ingress], livyConf: LivyConf) { + + import KubernetesConstants._ + + private val grafanaUrl = livyConf.get(LivyConf.KUBERNETES_GRAFANA_URL) + private val timeRange = livyConf.get(LivyConf.KUBERNETES_GRAFANA_TIME_RANGE) + private val lokiDatasource = livyConf.get(LivyConf.KUBERNETES_GRAFANA_LOKI_DATASOURCE) + private val sparkAppTagLogLabel = SPARK_APP_TAG_LABEL.replaceAll("-", "_") + private val sparkRoleLogLabel = SPARK_ROLE_LABEL.replaceAll("-", "_") + private val sparkExecIdLogLabel = SPARK_EXEC_ID_LABEL.replaceAll("-", "_") + + def getApplicationState: String = + driver.map(_.getStatus.getPhase.toLowerCase).getOrElse("unknown") + + def getApplicationLog: IndexedSeq[String] = appLog + + def getDriverLogUrl: Option[String] = { + if (livyConf.getBoolean(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED)) { + val appTag = driver.map(_.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL)) + if (appTag.isDefined && appTag.get != null) { + return Some( + s"""$grafanaUrl/explore?left=""" + URLEncoder.encode( + s"""["now-$timeRange","now","$lokiDatasource",""" + + s"""{"expr":"{$sparkAppTagLogLabel=\\"${appTag.get}\\",""" + + s"""$sparkRoleLogLabel=\\"$SPARK_ROLE_DRIVER\\"}"},""" + + s"""{"ui":[true,true,true,"exact"]}]""", "UTF-8") + ) + } + } + None + } + + def getExecutorsLogUrls: Option[String] = { + if (livyConf.getBoolean(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED)) { + val urls = executors.map(_.getMetadata.getLabels).flatMap(labels => { + val sparkAppTag = labels.get(SPARK_APP_TAG_LABEL) + val sparkExecId = labels.get(SPARK_EXEC_ID_LABEL) + if (sparkAppTag != null && sparkExecId != null) { + val sparkRole = labels.getOrDefault(SPARK_ROLE_LABEL, SPARK_ROLE_EXECUTOR) + Some(s"executor-$sparkExecId#$grafanaUrl/explore?left=" + URLEncoder.encode( + s"""["now-$timeRange","now","$lokiDatasource",""" + + s"""{"expr":"{$sparkAppTagLogLabel=\\"$sparkAppTag\\",""" + + s"""$sparkRoleLogLabel=\\"$sparkRole\\",""" + + s"""$sparkExecIdLogLabel=\\"$sparkExecId\\"}"},""" + + s"""{"ui":[true,true,true,"exact"]}]""", "UTF-8")) + } else { + None + } + }) + if (urls.nonEmpty) return Some(urls.mkString(";")) + } + None + } + + def getTrackingUrl: Option[String] = { + val host = ingress.flatMap(i => Try(i.getSpec.getRules.get(0).getHost).toOption) + val path = driver + .map(_.getMetadata.getLabels.getOrDefault(SPARK_APP_TAG_LABEL, "unknown")) + val protocol = livyConf.get(LivyConf.KUBERNETES_INGRESS_PROTOCOL) + if (host.isDefined && path.isDefined) Some(s"$protocol://${host.get}/${path.get}") + else None + } + + def getApplicationDiagnostics: IndexedSeq[String] = { + (Seq(driver) ++ executors.sortBy(_.getMetadata.getName).map(Some(_))) + .filter(_.nonEmpty) + .map(opt => buildSparkPodDiagnosticsPrettyString(opt.get)) + .flatMap(_.split("\n")).toIndexedSeq + } + + private def buildSparkPodDiagnosticsPrettyString(pod: Pod): String = { + import scala.collection.JavaConverters._ + def printMap(map: Map[_, _]): String = map.map { + case (key, value) => s"$key=$value" + }.mkString(", ") + + if (pod == null) return "unknown" + + s"${pod.getMetadata.getName}.${pod.getMetadata.getNamespace}:" + + s"\n\tnode: ${pod.getSpec.getNodeName}" + + s"\n\thostname: ${pod.getSpec.getHostname}" + + s"\n\tpodIp: ${pod.getStatus.getPodIP}" + + s"\n\tstartTime: ${pod.getStatus.getStartTime}" + + s"\n\tphase: ${pod.getStatus.getPhase}" + + s"\n\treason: ${pod.getStatus.getReason}" + + s"\n\tmessage: ${pod.getStatus.getMessage}" + + s"\n\tlabels: ${printMap(pod.getMetadata.getLabels.asScala.toMap)}" + + s"\n\tcontainers:" + + s"\n\t\t${ + pod.getSpec.getContainers.asScala.map(container => + s"${container.getName}:" + + s"\n\t\t\timage: ${container.getImage}" + + s"\n\t\t\trequests: ${printMap(container.getResources.getRequests.asScala.toMap)}" + + s"\n\t\t\tlimits: ${printMap(container.getResources.getLimits.asScala.toMap)}" + + s"\n\t\t\tcommand: ${container.getCommand} ${container.getArgs}" + ).mkString("\n\t\t") + }" + + s"\n\tconditions:" + + s"\n\t\t${pod.getStatus.getConditions.asScala.mkString("\n\t\t")}" + } + +} + +private[utils] object KubernetesExtensions { + import KubernetesConstants._ + + implicit class KubernetesClientExtensions(client: KubernetesClient) { + import scala.collection.JavaConverters._ + + private val NGINX_CONFIG_SNIPPET: String = + """ + |proxy_set_header Accept-Encoding ""; + |sub_filter_last_modified off; + |sub_filter '' ' '; + |sub_filter 'href="/' 'href="'; + |sub_filter 'src="/' 'src="'; + |sub_filter "/api/v1/applications" "/%s/api/v1/applications"; + |sub_filter "/static/executorspage-template.html" "/%s/static/executorspage-template.html"; + |sub_filter_once off; + |sub_filter_types text/html text/css text/javascript application/javascript; + """.stripMargin + + def getApplications( + labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER), + appTagLabel: String = SPARK_APP_TAG_LABEL, + appIdLabel: String = SPARK_APP_ID_LABEL + ): Seq[KubernetesApplication] = { + client.pods.inAnyNamespace + .withLabels(labels.asJava) + .withLabel(appTagLabel) + .withLabel(appIdLabel) + .list.getItems.asScala.map(new KubernetesApplication(_)) + } + + def killApplication(app: KubernetesApplication): Boolean = { + client.pods.inAnyNamespace.delete(app.getApplicationPod) + } + + def getApplicationReport( + livyConf: LivyConf, + app: KubernetesApplication, + cacheLogSize: Int, + appTagLabel: String = SPARK_APP_TAG_LABEL + ): KubernetesAppReport = { + val pods = client.pods.inNamespace(app.getApplicationNamespace) + .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava) + .list.getItems.asScala + val driver = pods.find(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER) + val executors = + pods.filter(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR) + val appLog = Try( + client.pods.inNamespace(app.getApplicationNamespace) + .withName(app.getApplicationPod.getMetadata.getName) + .tailingLines(cacheLogSize).getLog.split("\n").toIndexedSeq + ).getOrElse(IndexedSeq.empty) + val ingress = client.extensions.ingresses.inNamespace(app.getApplicationNamespace) + .withLabel(SPARK_APP_TAG_LABEL, app.getApplicationTag) + .list.getItems.asScala.headOption + KubernetesAppReport(driver, executors, appLog, ingress, livyConf) + } + + def createSparkUIIngress(app: KubernetesApplication, livyConf: LivyConf): Unit = { + val sparkUIService = buildSparkUIService(app) + + val annotationsString = livyConf.get(LivyConf.KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS) + var annotations: Seq[(String, String)] = Seq.empty + if (annotationsString != null && annotationsString.trim.nonEmpty) { + annotations = annotationsString + .split(";").map(_.split("=")) + .map(array => array.head -> array.tail.mkString("=")).toSeq + } + + val sparkUIIngress = buildSparkUIIngress( + app, + livyConf.get(LivyConf.KUBERNETES_INGRESS_PROTOCOL), + livyConf.get(LivyConf.KUBERNETES_INGRESS_HOST), + sparkUIService, + livyConf.get(LivyConf.KUBERNETES_INGRESS_TLS_SECRET_NAME), + livyConf.get(LivyConf.KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET), + annotations: _* + ) + val resources: Seq[HasMetadata] = Seq(sparkUIService, sparkUIIngress) + addOwnerReference(app.getApplicationPod, resources: _*) + client.resourceList(resources.asJava).createOrReplace() + } + + private def buildSparkUIIngress( + app: KubernetesApplication, protocol: String, host: String, service: Service, + tlsSecretName: String, additionalConfSnippet: String, additionalAnnotations: (String, String)* + ): Ingress = { + val appTag = app.getApplicationTag + + val annotations = Map( + "kubernetes.io/ingress.class" -> "nginx", + "nginx.ingress.kubernetes.io/rewrite-target" -> "/$1", + "nginx.ingress.kubernetes.io/proxy-redirect-from" -> s"http://$$host/", + "nginx.ingress.kubernetes.io/proxy-redirect-to" -> s"/$appTag/", + "nginx.ingress.kubernetes.io/configuration-snippet" -> + NGINX_CONFIG_SNIPPET.concat(additionalConfSnippet).format(appTag, appTag, appTag) + ) ++ additionalAnnotations + + val builder = new IngressBuilder() + .withApiVersion("extensions/v1beta1") + .withNewMetadata() + .withName(fixResourceName(s"${app.getApplicationPod.getMetadata.getName}-ui")) + .withNamespace(app.getApplicationNamespace) + .addToAnnotations(annotations.asJava) + .addToLabels(SPARK_APP_TAG_LABEL, appTag) + .addToLabels(CREATED_BY_LIVY_LABEL.asJava) + .endMetadata() + .withNewSpec() + .addNewRule() + .withHost(host) + .withNewHttp() + .addNewPath() + .withPath(s"/$appTag/?(.*)") + .withNewBackend() + .withServiceName(service.getMetadata.getName) + .withNewServicePort(service.getSpec.getPorts.get(0).getName) + .endBackend() + .endPath() + .endHttp() + .endRule() + if (protocol.endsWith("s") && tlsSecretName != null && tlsSecretName.nonEmpty) { + builder.addNewTl().withSecretName(tlsSecretName).addToHosts(host).endTl() + } + builder.endSpec().build() + } + + private def fixResourceName(name: String): String = + StringUtils.stripEnd(StringUtils.left(name, 63), "-").toLowerCase + + private def buildSparkUIService( + app: KubernetesApplication, + portName: String = "spark-ui", + port: Int = 4040 + ): Service = { + new ServiceBuilder() + .withNewMetadata() + .withName(fixResourceName(s"${app.getApplicationPod.getMetadata.getName}-ui")) + .withNamespace(app.getApplicationNamespace) + .addToLabels(SPARK_APP_TAG_LABEL, app.getApplicationTag) + .addToLabels(CREATED_BY_LIVY_LABEL.asJava) + .endMetadata() + .withNewSpec() + .withClusterIP("None") + .addToSelector(SPARK_APP_TAG_LABEL, app.getApplicationTag) + .addToSelector(SPARK_ROLE_LABEL, SPARK_ROLE_DRIVER) + .addNewPort() + .withName(portName) + .withPort(port) + .endPort() + .endSpec() + .build() + } + + // Add a OwnerReference to the given resources making the driver pod an owner of them so when + // the driver pod is deleted, the resources are garbage collected. + private def addOwnerReference(owner: Pod, resources: HasMetadata*): Unit = { + val driverPodOwnerReference = new OwnerReferenceBuilder() + .withName(owner.getMetadata.getName) + .withApiVersion(owner.getApiVersion) + .withUid(owner.getMetadata.getUid) + .withKind(owner.getKind) + .withController(true) + .build() + resources.foreach { + resource => + val originalMetadata = resource.getMetadata + originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference)) + } + } + + } + +} + +private[utils] object KubernetesClientFactory { + import java.io.File + import com.google.common.base.Charsets + import com.google.common.io.Files + + private implicit class OptionString(val string: String) extends AnyVal { + def toOption: Option[String] = if (string == null || string.isEmpty) None else Option(string) + } + + def createKubernetesClient(livyConf: LivyConf): DefaultKubernetesClient = { + val masterUrl = sparkMasterToKubernetesApi(livyConf.sparkMaster()) + + val oauthTokenFile = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE).toOption + val oauthTokenValue = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE).toOption + require(oauthTokenFile.isEmpty || oauthTokenValue.isEmpty, + s"Cannot specify OAuth token through both " + + s"a file $oauthTokenFile and a value $oauthTokenValue.") + + val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption + val clientKeyFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption + val clientCertFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_CERT_FILE).toOption + + val config = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(masterUrl) + .withOption(oauthTokenValue) { + (token, configBuilder) => configBuilder.withOauthToken(token) + } + .withOption(oauthTokenFile) { + (file, configBuilder) => + configBuilder + .withOauthToken(Files.toString(new File(file), Charsets.UTF_8)) + } + .withOption(caCertFile) { + (file, configBuilder) => configBuilder.withCaCertFile(file) + } + .withOption(clientKeyFile) { + (file, configBuilder) => configBuilder.withClientKeyFile(file) + } + .withOption(clientCertFile) { + (file, configBuilder) => configBuilder.withClientCertFile(file) + } + .build() + new DefaultKubernetesClient(config) + } + + def sparkMasterToKubernetesApi(sparkMaster: String): String = { + val replaced = sparkMaster.replaceFirst("k8s://", "") + if (!replaced.startsWith("http")) s"https://$replaced" + else replaced + } + + private implicit class OptionConfigurableConfigBuilder( + val configBuilder: ConfigBuilder) extends AnyVal { + def withOption[T] + (option: Option[T]) + (configurator: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder = { + option.map { + opt => configurator(opt, configBuilder) + }.getOrElse(configBuilder) + } + } + +} diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala new file mode 100644 index 000000000..b1a718506 --- /dev/null +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.utils + +import java.util.Objects._ + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.extensions.{Ingress, IngressRule, IngressSpec} +import org.mockito.Mockito.when +import org.scalatest.FunSpec +import org.scalatest.mock.MockitoSugar._ + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} + +class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { + + describe("KubernetesAppReport") { + import scala.collection.JavaConverters._ + + it("should return application state") { + val status = when(mock[PodStatus].getPhase).thenReturn("Status").getMock[PodStatus] + val driver = when(mock[Pod].getStatus).thenReturn(status).getMock[Pod] + assertResult("status") { + KubernetesAppReport(Option(driver), Seq.empty, IndexedSeq.empty, None, new LivyConf(false)) + .getApplicationState + } + assertResult("unknown") { + KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, new LivyConf(false)) + .getApplicationState + } + } + + def livyConf(lokiEnabled: Boolean): LivyConf = new LivyConf(false) + .set(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED, lokiEnabled) + + def podMockWithLabels(labelMap: Map[String, String]): Pod = { + val metaWithLabel = when(mock[ObjectMeta].getLabels).thenReturn(labelMap.asJava) + .getMock[ObjectMeta] + when(mock[Pod].getMetadata).thenReturn(metaWithLabel).getMock[Pod] + } + + def driverMock(labelExists: Boolean): Option[Pod] = { + val labels = if (labelExists) Map(KubernetesConstants.SPARK_APP_TAG_LABEL -> "app_tag") + else Map.empty[String, String] + Some(podMockWithLabels(labels)) + } + + it("should return driver log url") { + + def test(labelExists: Boolean, lokiEnabled: Boolean, shouldBeDefined: Boolean): Unit = + assertResult(shouldBeDefined) { + KubernetesAppReport( + driverMock(labelExists), Seq.empty, IndexedSeq.empty, None, livyConf(lokiEnabled) + ).getDriverLogUrl.isDefined + } + + test(labelExists = false, lokiEnabled = false, shouldBeDefined = false) + test(labelExists = false, lokiEnabled = true, shouldBeDefined = false) + test(labelExists = true, lokiEnabled = false, shouldBeDefined = false) + test(labelExists = true, lokiEnabled = true, shouldBeDefined = true) + assert(KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, livyConf(true)) + .getDriverLogUrl.isEmpty) + } + + it("should return executors log urls") { + def executorMock(labelsExist: Boolean): Option[Pod] = { + val labels = if (labelsExist) { + Map(KubernetesConstants.SPARK_APP_TAG_LABEL -> "app_tag", + KubernetesConstants.SPARK_EXEC_ID_LABEL -> "exec-1") + } else { + Map.empty[String, String] + } + Some(podMockWithLabels(labels)) + } + + def test(labelExists: Boolean, lokiEnabled: Boolean, shouldBeDefined: Boolean): Unit = + assertResult(shouldBeDefined) { + KubernetesAppReport( + None, Seq(executorMock(labelExists).get), IndexedSeq.empty, None, livyConf(lokiEnabled) + ).getExecutorsLogUrls.isDefined + } + + test(labelExists = false, lokiEnabled = false, shouldBeDefined = false) + test(labelExists = false, lokiEnabled = true, shouldBeDefined = false) + test(labelExists = true, lokiEnabled = false, shouldBeDefined = false) + test(labelExists = true, lokiEnabled = true, shouldBeDefined = true) + assert(KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, livyConf(true)) + .getExecutorsLogUrls.isEmpty) + } + + it("should return driver ingress url") { + + def livyConf(protocol: Option[String]): LivyConf = { + val conf = new LivyConf() + protocol.map(conf.set(LivyConf.KUBERNETES_INGRESS_PROTOCOL, _)).getOrElse(conf) + } + + def ingressMock(host: Option[String]): Ingress = { + val ingressRules = host.map(h => + List(when(mock[IngressRule].getHost).thenReturn(h).getMock[IngressRule])) + .getOrElse(List.empty).asJava + val ingressSpec = when(mock[IngressSpec].getRules) + .thenReturn(ingressRules).getMock[IngressSpec] + when(mock[Ingress].getSpec).thenReturn(ingressSpec).getMock[Ingress] + } + + def test(driver: Option[Pod], ingress: Option[Ingress], + protocol: Option[String], shouldBeDefined: Boolean): Unit = { + assertResult(shouldBeDefined) { + KubernetesAppReport(driver, Seq.empty, IndexedSeq.empty, ingress, livyConf(protocol)) + .getTrackingUrl.isDefined + } + } + + val hostname = Some("hostname") + val protocol = Some("protocol") + + test(None, None, None, shouldBeDefined = false) + test(None, None, protocol, shouldBeDefined = false) + test(None, Some(ingressMock(None)), None, shouldBeDefined = false) + test(None, Some(ingressMock(None)), protocol, shouldBeDefined = false) + test(None, Some(ingressMock(hostname)), None, shouldBeDefined = false) + test(None, Some(ingressMock(hostname)), protocol, shouldBeDefined = false) + + test(driverMock(true), None, None, shouldBeDefined = false) + test(driverMock(true), None, protocol, shouldBeDefined = false) + test(driverMock(true), Some(ingressMock(None)), None, shouldBeDefined = false) + test(driverMock(true), Some(ingressMock(None)), protocol, shouldBeDefined = false) + test(driverMock(true), Some(ingressMock(hostname)), None, shouldBeDefined = true) + test(driverMock(true), Some(ingressMock(hostname)), protocol, shouldBeDefined = true) + + test(driverMock(false), None, None, shouldBeDefined = false) + test(driverMock(false), None, protocol, shouldBeDefined = false) + test(driverMock(false), Some(ingressMock(None)), None, shouldBeDefined = false) + test(driverMock(false), Some(ingressMock(None)), protocol, shouldBeDefined = false) + test(driverMock(false), Some(ingressMock(hostname)), None, shouldBeDefined = true) + test(driverMock(false), Some(ingressMock(hostname)), protocol, shouldBeDefined = true) + + assertResult(s"${protocol.get}://${hostname.get}/app_tag") { + KubernetesAppReport(driverMock(true), Seq.empty, IndexedSeq.empty, + Some(ingressMock(hostname)), livyConf(protocol)).getTrackingUrl.get + } + assertResult(s"${protocol.get}://${hostname.get}/unknown") { + KubernetesAppReport(driverMock(false), Seq.empty, IndexedSeq.empty, + Some(ingressMock(hostname)), livyConf(protocol)).getTrackingUrl.get + } + } + + } + + describe("KubernetesClientFactory") { + it("should build KubernetesApi url from LivyConf masterUrl") { + def actual(sparkMaster: String): String = + KubernetesClientFactory.sparkMasterToKubernetesApi(sparkMaster) + + val masterUrl = "kubernetes.default.svc:443" + + assertResult(s"https://local")(actual(s"https://local")) + assertResult(s"https://$masterUrl")(actual(s"k8s://$masterUrl")) + assertResult(s"http://$masterUrl")(actual(s"k8s://http://$masterUrl")) + assertResult(s"https://$masterUrl")(actual(s"k8s://https://$masterUrl")) + assertResult(s"http://$masterUrl")(actual(s"http://$masterUrl")) + assertResult(s"https://$masterUrl")(actual(s"https://$masterUrl")) + } + + it("should create KubernetesClient with default configs") { + assert(nonNull(KubernetesClientFactory.createKubernetesClient(new LivyConf(false)))) + } + + it("should throw IllegalArgumentException in both oauth file and token provided") { + val conf = new LivyConf(false) + .set(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE, "dummy_path") + .set(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE, "dummy_value") + intercept[IllegalArgumentException] { + KubernetesClientFactory.createKubernetesClient(conf) + } + } + } + +}