Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<id>shade</id>
Expand Down
15 changes: 12 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<spark.scala-2.11.version>2.4.5</spark.scala-2.11.version>
<spark.scala-2.12.version>2.4.5</spark.scala-2.12.version>
<spark.version>${spark.scala-2.11.version}</spark.version>
<kubernetes.client.version>5.6.0</kubernetes.client.version>
<kubernetes.client.version>6.8.1</kubernetes.client.version>
<hive.version>3.0.0</hive.version>
<commons-codec.version>1.9</commons-codec.version>
<httpclient.version>4.5.13</httpclient.version>
Expand Down Expand Up @@ -1145,6 +1145,13 @@
</reporting>

<profiles>
<profile>
<id>hadoop3</id>
<properties>
<hadoop.major-minor.version>3</hadoop.major-minor.version>
<hadoop.version>3.4.0</hadoop.version>
</properties>
</profile>
<profile>
<id>hadoop2</id>
<properties>
Expand Down Expand Up @@ -1192,11 +1199,13 @@
<profile>
<id>spark3</id>
<properties>
<spark.version>3.2.3</spark.version>
<spark.version>3.5.0</spark.version>
<java.version>1.8</java.version>
<py4j.version>0.10.9.7</py4j.version>
<json4s.version>3.7.0-M11</json4s.version>
<netty.version>4.1.92.Final</netty.version>
<netty.version>4.1.108.Final</netty.version>
<jackson.version>2.15.2</jackson.version>
<jackson-databind.version>2.15.2</jackson-databind.version>
<spark.bin.name>spark-${spark.version}-bin-hadoop${hadoop.major-minor.version}</spark.bin.name>
<spark.bin.download.url>
https://archive.apache.org/dist/spark/spark-${spark.version}/${spark.bin.name}.tgz
Expand Down
1 change: 1 addition & 0 deletions repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<id>shade</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class SQLInterpreterSpec extends BaseInterpreterSpec {
assert(resp.isInstanceOf[Interpreter.ExecuteError])
val error = resp.asInstanceOf[Interpreter.ExecuteError]
error.ename should be ("Error")
assert(error.evalue.contains("not found"))
assert(error.evalue.contains("TABLE_OR_VIEW_NOT_FOUND"))
}

it should "fail if submitting multiple queries" in withInterpreter { interpreter =>
Expand Down
2 changes: 2 additions & 0 deletions rsc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -114,6 +115,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
6 changes: 6 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>

<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-test-lib</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@ package org.apache.livy.server.batch

import java.lang.ProcessBuilder.Redirect
import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.Random

import com.fasterxml.jackson.annotation.JsonIgnoreProperties

import org.apache.livy.{LivyConf, Logging, Utils}
import org.apache.livy.server.AccessManager
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.{FinishedSessionState, Session, SessionState}
import org.apache.livy.sessions.Session._
import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}


@JsonIgnoreProperties(ignoreUnknown = true)
case class BatchRecoveryMetadata(
id: Int,
Expand All @@ -40,6 +38,7 @@ case class BatchRecoveryMetadata(
appTag: String,
owner: String,
proxyUser: Option[String],
namespace: String,
version: Int = 1)
extends RecoveryMetadata

Expand All @@ -64,7 +63,7 @@ object BatchSession extends Logging {
mockApp: Option[SparkApp] = None): BatchSession = {
val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)

val namespace = SparkApp.getNamespace(request.conf, livyConf)
def createSparkApp(s: BatchSession): SparkApp = {
val conf = SparkApp.prepareSparkConf(
appTag,
Expand Down Expand Up @@ -106,7 +105,8 @@ object BatchSession extends Logging {
childProcesses.decrementAndGet()
}
}
SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s))
val extrasMap: Map[String, String] = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace)
SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s), extrasMap)
}

info(s"Creating batch session $id: [owner: $owner, request: $request]")
Expand All @@ -120,6 +120,7 @@ object BatchSession extends Logging {
owner,
impersonatedUser,
sessionStore,
namespace,
mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
}

Expand All @@ -137,8 +138,9 @@ object BatchSession extends Logging {
m.owner,
m.proxyUser,
sessionStore,
m.namespace,
mockApp.map { m => (_: BatchSession) => m }.getOrElse { s =>
SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s))
SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s), Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> m.namespace))
})
}
}
Expand All @@ -152,6 +154,7 @@ class BatchSession(
owner: String,
override val proxyUser: Option[String],
sessionStore: SessionStore,
namespace: String,
sparkApp: BatchSession => SparkApp)
extends Session(id, name, owner, livyConf) with SparkAppListener {
import BatchSession._
Expand Down Expand Up @@ -204,5 +207,5 @@ class BatchSession(
override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }

override def recoveryMetadata: RecoveryMetadata =
BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser)
BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser, namespace)
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ case class InteractiveRecoveryMetadata(
// proxyUser is deprecated. It is available here only for backward compatibility
proxyUser: Option[String],
rscDriverUri: Option[URI],
namespace: String,
version: Int = 1)
extends RecoveryMetadata

Expand All @@ -93,6 +94,7 @@ object InteractiveSession extends Logging {
mockClient: Option[RSCClient] = None): InteractiveSession = {
val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)
val namespace = SparkApp.getNamespace(request.conf, livyConf)

val client = mockClient.orElse {
val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(
Expand Down Expand Up @@ -153,6 +155,7 @@ object InteractiveSession extends Logging {
request.numExecutors,
request.pyFiles,
request.queue,
namespace,
mockApp)
}

Expand Down Expand Up @@ -193,6 +196,7 @@ object InteractiveSession extends Logging {
metadata.numExecutors,
metadata.pyFiles,
metadata.queue,
metadata.namespace,
mockApp)
}

Expand Down Expand Up @@ -433,6 +437,7 @@ class InteractiveSession(
val numExecutors: Option[Int],
val pyFiles: List[String],
val queue: Option[String],
val namespace: String,
mockApp: Option[SparkApp]) // For unit test.
extends Session(id, name, owner, ttl, idleTimeout, livyConf)
with SessionHeartbeat
Expand Down Expand Up @@ -462,11 +467,13 @@ class InteractiveSession(
app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
if (!livyConf.isRunningOnKubernetes()) {
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
val namespace = SparkApp.getNamespace(conf, livyConf)
val extrasMap: Map[String, String] = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace)
if (!livyConf.isRunningOnKubernetes()) {
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap))
} else {
// Create SparkApp for Kubernetes anyway
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap))
}
}

Expand Down Expand Up @@ -535,7 +542,7 @@ class InteractiveSession(
heartbeatTimeout.toSeconds.toInt, owner, ttl, idleTimeout,
driverMemory, driverCores, executorMemory, executorCores, conf,
archives, files, jars, numExecutors, pyFiles, queue,
proxyUser, rscDriverUri)
proxyUser, rscDriverUri, namespace)

override def state: SessionState = {
if (serverSideState == SessionState.Running) {
Expand Down
21 changes: 18 additions & 3 deletions server/src/main/scala/org/apache/livy/utils/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import scala.collection.JavaConverters._

import org.apache.livy.LivyConf

import java.io.{File, FileInputStream}
import java.util.Properties

object AppInfo {
val DRIVER_LOG_URL_NAME = "driverLogUrl"
val SPARK_UI_URL_NAME = "sparkUiUrl"
Expand Down Expand Up @@ -56,12 +59,23 @@ trait SparkAppListener {
*/
object SparkApp {
private val SPARK_YARN_TAG_KEY = "spark.yarn.tags"

val SPARK_KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
object State extends Enumeration {
val STARTING, RUNNING, FINISHED, FAILED, KILLED = Value
}
type State = State.Value

def getNamespace(conf: Map[String, String], livyConf: LivyConf): String = {
var namespace:String = conf.getOrElse(SPARK_KUBERNETES_NAMESPACE_KEY, "")
if(namespace == "") {
val sparkHome = livyConf.sparkHome().get //SPARK_HOME is mandatory for Livy
val sparkDefaultsPath = sparkHome + File.separator + "conf" + File.separator + "spark-defaults.conf"
val properties = new Properties()
properties.load(new FileInputStream(sparkDefaultsPath))
namespace = properties.getProperty(SPARK_KUBERNETES_NAMESPACE_KEY,"default")
}
namespace
}
/**
* Return cluster manager dependent SparkConf.
*
Expand Down Expand Up @@ -102,11 +116,12 @@ object SparkApp {
appId: Option[String],
process: Option[LineBufferedProcess],
livyConf: LivyConf,
listener: Option[SparkAppListener]): SparkApp = {
listener: Option[SparkAppListener],
extrasMap: Map[String, String]): SparkApp = {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
} else if (livyConf.isRunningOnKubernetes()) {
new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf)
new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf, extrasMap)
} else {
require(process.isDefined, "process must not be None when Livy master is not YARN or" +
"Kubernetes.")
Expand Down
Loading