diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 2a55c0493..63f0f4fa9 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -69,8 +69,8 @@ object BatchSession extends Logging { val conf = SparkApp.prepareSparkConf( appTag, livyConf, - prepareConf( - request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf)) + prepareConf(owner, request.conf, + request.jars, request.files, request.archives, request.pyFiles, livyConf)) require(request.file != null, "File is required.") val builder = new SparkProcessBuilder(livyConf) diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index bccdb4d92..204f8d403 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -79,8 +79,8 @@ object InteractiveSession extends Logging { val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner) val client = mockClient.orElse { - val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf( - request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf)) + val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(owner, request.conf, + request.jars, request.files, request.archives, request.pyFiles, livyConf)) val builderProperties = prepareBuilderProp(conf, request.kind, livyConf) diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala b/server/src/main/scala/org/apache/livy/sessions/Session.scala index 67f78d4d2..4a8c7696b 100644 --- a/server/src/main/scala/org/apache/livy/sessions/Session.scala +++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala @@ -48,6 +48,7 @@ object Session { * - Verify that file URIs don't reference non-whitelisted local resources */ def prepareConf( + owner: String, conf: Map[String, String], jars: Seq[String], files: Seq[String], @@ -90,7 +91,9 @@ object Session { val masterConfList = Map(LivyConf.SPARK_MASTER -> livyConf.sparkMaster()) ++ livyConf.sparkDeployMode().map(LivyConf.SPARK_DEPLOY_MODE -> _).toMap - conf ++ masterConfList ++ merged + val ownerConf = Map("spark.livy.owner" -> (if (owner == null) "" else owner)) + + conf ++ masterConfList ++ merged ++ ownerConf } /** diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala index 05b41bb05..bb7946697 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala @@ -59,30 +59,40 @@ class SessionSpec extends FunSuite with LivyBaseUnitTestSuite { conf.set(LivyConf.LOCAL_FS_WHITELIST, "/allowed") // Test baseline. - assert(Session.prepareConf(Map(), Nil, Nil, Nil, Nil, conf) === Map("spark.master" -> "local")) + assert(Session.prepareConf("", Map(), Nil, Nil, Nil, Nil, conf) + === Map("spark.master" -> "local", "spark.livy.owner" -> "")) // Test validations. intercept[IllegalArgumentException] { - Session.prepareConf(Map("spark.do_not_set" -> "1"), Nil, Nil, Nil, Nil, conf) + Session.prepareConf("", Map("spark.do_not_set" -> "1"), Nil, Nil, Nil, Nil, conf) } conf.sparkFileLists.foreach { key => intercept[IllegalArgumentException] { - Session.prepareConf(Map(key -> "file:/not_allowed"), Nil, Nil, Nil, Nil, conf) + Session.prepareConf("", Map(key -> "file:/not_allowed"), Nil, Nil, Nil, Nil, conf) } } intercept[IllegalArgumentException] { - Session.prepareConf(Map(), Seq("file:/not_allowed"), Nil, Nil, Nil, conf) + Session.prepareConf("", Map(), Seq("file:/not_allowed"), Nil, Nil, Nil, conf) } intercept[IllegalArgumentException] { - Session.prepareConf(Map(), Nil, Seq("file:/not_allowed"), Nil, Nil, conf) + Session.prepareConf("", Map(), Nil, Seq("file:/not_allowed"), Nil, Nil, conf) } intercept[IllegalArgumentException] { - Session.prepareConf(Map(), Nil, Nil, Seq("file:/not_allowed"), Nil, conf) + Session.prepareConf("", Map(), Nil, Nil, Seq("file:/not_allowed"), Nil, conf) } intercept[IllegalArgumentException] { - Session.prepareConf(Map(), Nil, Nil, Nil, Seq("file:/not_allowed"), conf) + Session.prepareConf("", Map(), Nil, Nil, Nil, Seq("file:/not_allowed"), conf) } + // Test owner + assert(Session.prepareConf("MyOwner", Map(), Nil, Nil, Nil, Nil, conf) + .get("spark.livy.owner") === Some("MyOwner")) + assert(Session.prepareConf("", Map(), Nil, Nil, Nil, Nil, conf) + .get("spark.livy.owner") === Some("")) + assert(Session.prepareConf(null, Map(), Nil, Nil, Nil, Nil, conf) + .get("spark.livy.owner") === Some("")) + + // Test that file lists are merged and resolved. val base = "/file1.txt" val other = Seq("/file2.txt") @@ -91,7 +101,7 @@ class SessionSpec extends FunSuite with LivyBaseUnitTestSuite { val userLists = Seq(LivyConf.SPARK_JARS, LivyConf.SPARK_FILES, LivyConf.SPARK_ARCHIVES, LivyConf.SPARK_PY_FILES) val baseConf = userLists.map { key => (key -> base) }.toMap - val result = Session.prepareConf(baseConf, other, other, other, other, conf) + val result = Session.prepareConf("", baseConf, other, other, other, other, conf) userLists.foreach { key => assert(result.get(key) === expected) } }