From 892f591975194190b039512998e74efd6077d5f8 Mon Sep 17 00:00:00 2001 From: Gabor Roczei <1918366+roczei@users.noreply.github.com> Date: Wed, 8 Nov 2023 20:01:41 +0530 Subject: [PATCH 1/2] [LIVY-1042] Add SSL/TLS support for ZooKeeper connection Introduced an optional TLS encryption for the Curator based ZooKeeper client used during session recovery. Changes by Asif Khatri: - Added five LivyConf entries (LIVY_ZK_CLIENT_SECURE, LIVY_ZK_CLIENT_SOCKET, LIVY_ZK_KEYSTORE_PASS, LIVY_ZK_TRUSTSTORE_FILE, LIVY_ZK_TRUSTSTORE_PASS) and document them in livy.conf.template. - Added ZooKeeperManager.createZKClientConfig that assembles a ZKClientConfig from keystore/truststore settings; applied to the CuratorFramework builder only when livy.server.zk.client.secure=true. Test improvements: - Extracted mockCurator() helper in ZooKeeperStateStoreSpec to eliminate repeated CuratorFramework mock setup boilerplate. - Added an "SSL config" describe block with withSslMock helper (SslTestFixture) and tests covering SSL property mapping and both the secure and non-secure construction paths. Was this patch authored or co-authored using generative AI tooling? Yes, this was co-authored using Cursor to help generate the new test cases. Generated-by: Cursor 3.5.17 Co-authored-by: Asif Khatri <123077165+askhatri@users.noreply.github.com> --- conf/livy.conf.template | 7 ++ .../main/scala/org/apache/livy/LivyConf.scala | 7 ++ .../server/recovery/ZooKeeperManager.scala | 32 ++++++- .../recovery/ZooKeeperStateStoreSpec.scala | 94 +++++++++++++++++-- 4 files changed, 129 insertions(+), 11 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index b4dd2f436..0947d005a 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -172,6 +172,13 @@ # JAAS login context name for ZooKeeper SASL authentication. # livy.server.zk.sasl.login-context = Client +# The SSL configuration for zookeeper. +# livy.server.zk.client.secure=false +# livy.server.zk.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty +# livy.server.zk.ssl.keyStore.password= +# livy.server.zk.ssl.truststore.location= +# livy.server.zk.ssl.truststore.password= + # If Livy can't find the yarn app within this time, consider it lost. # livy.server.yarn.app-lookup-timeout = 120s # When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 03ba46fca..006ab5dff 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -367,6 +367,13 @@ object LivyConf { val SESSION_ALLOW_CUSTOM_CLASSPATH = Entry("livy.server.session.allow-custom-classpath", false) + val LIVY_ZK_CLIENT_SOCKET = Entry("livy.server.zk.clientCnxnSocket", + "org.apache.zookeeper.ClientCnxnSocketNetty") + val LIVY_ZK_KEYSTORE_PASS = Entry("livy.server.zk.ssl.keyStore.password", null) + val LIVY_ZK_TRUSTSTORE_FILE = Entry("livy.server.zk.ssl.truststore.location", null) + val LIVY_ZK_TRUSTSTORE_PASS = Entry("livy.server.zk.ssl.truststore.password", null) + val LIVY_ZK_CLIENT_SECURE = Entry("livy.server.zk.client.secure", false) + val SPARK_MASTER = "spark.master" val SPARK_DEPLOY_MODE = "spark.submit.deployMode" val SPARK_JARS = "spark.jars" diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala index eb10f6312..e4499dd6a 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala @@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener} import org.apache.curator.retry.RetryNTimes import org.apache.zookeeper.KeeperException.NoNodeException +import org.apache.zookeeper.client.ZKClientConfig import org.apache.livy.LivyConf import org.apache.livy.Logging @@ -63,6 +64,26 @@ class ZooKeeperManager( "Correct format is ,. e.g. 5,100") } + private[recovery] def createZKClientConfig = { + val clientConfig = new ZKClientConfig + clientConfig.setProperty("zookeeper.client.secure", "true") + clientConfig.setProperty("zookeeper.clientCnxnSocket", + livyConf.get(LivyConf.LIVY_ZK_CLIENT_SOCKET)) + clientConfig.setProperty("zookeeper.ssl.keyStore.location", + livyConf.get(LivyConf.SSL_KEYSTORE)) + clientConfig.setProperty("zookeeper.ssl.keyStore.password", + livyConf.get(LivyConf.LIVY_ZK_KEYSTORE_PASS)) + clientConfig.setProperty("zookeeper.ssl.keyStore.type", + livyConf.get(LivyConf.SSL_KEYSTORE_TYPE)) + clientConfig.setProperty("zookeeper.ssl.trustStore.location", + livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_FILE)) + clientConfig.setProperty("zookeeper.ssl.trustStore.password", + livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_PASS)) + clientConfig.setProperty("zookeeper.ssl.trustStore.type", + livyConf.get(LivyConf.SSL_KEYSTORE_TYPE)) + clientConfig + } + private val curatorClient = mockCuratorClient.getOrElse { if (livyConf.getBoolean(LivyConf.ZK_SASL_ENABLED)) { System.setProperty("zookeeper.sasl.client", "true") @@ -73,10 +94,13 @@ class ZooKeeperManager( info(s"ZooKeeper SASL authentication enabled with login context: " + s"${Option(loginContext).getOrElse("Client")}") } - CuratorFrameworkFactory.builder() - .connectString(zkAddress) - .retryPolicy(retryPolicy) - .build() + val builder = CuratorFrameworkFactory.builder() + builder.connectString(zkAddress) + builder.retryPolicy(retryPolicy) + if (livyConf.getBoolean(LivyConf.LIVY_ZK_CLIENT_SECURE)) { + builder.zkClientConfig(createZKClientConfig) + } + builder.build() } curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener { diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala index f3ffd9b66..d005dcfd6 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -41,12 +41,17 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { val key = "key" val prefixedKey = s"/livy/$key" - def withMock[R](testBody: TestFixture => R): R = { - val curatorClient = mock[CuratorFramework] - when(curatorClient.getUnhandledErrorListenable()) + def mockCurator(): CuratorFramework = { + val cc = mock[CuratorFramework] + when(cc.getUnhandledErrorListenable()) .thenReturn(mock[Listenable[UnhandledErrorListener]]) - when(curatorClient.getConnectionStateListenable()) + when(cc.getConnectionStateListenable()) .thenReturn(mock[Listenable[ConnectionStateListener]]) + cc + } + + def withMock[R](testBody: TestFixture => R): R = { + val curatorClient = mockCurator() val zkManager = new ZooKeeperManager(conf, Some(curatorClient)) zkManager.start() val stateStore = new ZooKeeperStateStore(conf, zkManager) @@ -191,9 +196,7 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { } it("should register a ConnectionStateListener that handles all connection states") { - val curatorClient = mock[CuratorFramework] - when(curatorClient.getUnhandledErrorListenable()) - .thenReturn(mock[Listenable[UnhandledErrorListener]]) + val curatorClient = mockCurator() val listenable = mock[Listenable[ConnectionStateListener]] when(curatorClient.getConnectionStateListenable()).thenReturn(listenable) @@ -219,5 +222,82 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { System.getProperty("zookeeper.sasl.client") shouldBe null System.getProperty("zookeeper.sasl.clientconfig") shouldBe null } + + describe("SSL config") { + case class SslTestFixture(zkManager: ZooKeeperManager, curatorClient: CuratorFramework) + + def makeSslConf(): LivyConf = { + val c = new LivyConf() + c.set(LivyConf.RECOVERY_STATE_STORE_URL, "/tmp/livy") + c.set(LivyConf.SSL_KEYSTORE, "/tmp/keystore.jks") + c.set(LivyConf.SSL_KEYSTORE_PASSWORD, "keystorePass") + c.set(LivyConf.SSL_KEY_PASSWORD, "keyPass") + c.set(LivyConf.SSL_KEYSTORE_TYPE, "JKS") + c.set(LivyConf.LIVY_ZK_KEYSTORE_PASS, "keystorePass") + c.set(LivyConf.LIVY_ZK_TRUSTSTORE_FILE, "/tmp/truststore.jks") + c.set(LivyConf.LIVY_ZK_TRUSTSTORE_PASS, "truststorePass") + c + } + + def withSslMock[R](sslConf: LivyConf)(testBody: SslTestFixture => R): R = { + val curatorClient = mockCurator() + val zkManager = new ZooKeeperManager(sslConf, Some(curatorClient)) + zkManager.start() + testBody(SslTestFixture(zkManager, curatorClient)) + } + + it("createZKClientConfig should set secure flag and socket class") { + withSslMock(makeSslConf()) { f => + verify(f.curatorClient).start() + val zkConfig = f.zkManager.createZKClientConfig + zkConfig.getProperty("zookeeper.client.secure") shouldBe "true" + zkConfig.getProperty("zookeeper.clientCnxnSocket") shouldBe + "org.apache.zookeeper.ClientCnxnSocketNetty" + } + } + + it("createZKClientConfig should set keystore location, password and type from LivyConf") { + withSslMock(makeSslConf()) { f => + verify(f.curatorClient).start() + val zkConfig = f.zkManager.createZKClientConfig + zkConfig.getProperty("zookeeper.ssl.keyStore.location") shouldBe "/tmp/keystore.jks" + zkConfig.getProperty("zookeeper.ssl.keyStore.password") shouldBe "keystorePass" + zkConfig.getProperty("zookeeper.ssl.keyStore.type") shouldBe "JKS" + } + } + + it("createZKClientConfig should set truststore location, password and type from LivyConf") { + withSslMock(makeSslConf()) { f => + verify(f.curatorClient).start() + val zkConfig = f.zkManager.createZKClientConfig + zkConfig.getProperty("zookeeper.ssl.trustStore.location") shouldBe + "/tmp/truststore.jks" + zkConfig.getProperty("zookeeper.ssl.trustStore.password") shouldBe "truststorePass" + // trustStore.type reuses SSL_KEYSTORE_TYPE + zkConfig.getProperty("zookeeper.ssl.trustStore.type") shouldBe "JKS" + } + } + + it("should build successfully when LIVY_ZK_CLIENT_SECURE is enabled") { + val sslConf = makeSslConf() + sslConf.set(LivyConf.LIVY_ZK_CLIENT_SECURE, true) + noException should be thrownBy { + val zkManager = new ZooKeeperManager(sslConf, Some(mockCurator())) + zkManager.start() + zkManager.stop() + } + } + + it("should build successfully when LIVY_ZK_CLIENT_SECURE is disabled") { + val noSslConf = new LivyConf() + noSslConf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host") + noSslConf.set(LivyConf.LIVY_ZK_CLIENT_SECURE, false) + noException should be thrownBy { + val zkManager = new ZooKeeperManager(noSslConf, Some(mockCurator())) + zkManager.start() + zkManager.stop() + } + } + } } } From 1305b0e90e01d19d271cec029dc34223be3e0f74 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Tue, 9 Jun 2026 18:59:54 +0200 Subject: [PATCH 2/2] Implemented Arnav's suggestion When livy.server.zk.client.secure=true, the following are required: livy.keystore, livy.server.zk.ssl.keyStore.password livy.server.zk.ssl.truststore.location, livy.server.zk.ssl.truststore.password. --- conf/livy.conf.template | 4 +++- .../livy/server/recovery/ZooKeeperManager.scala | 12 ++++++++++++ .../recovery/ZooKeeperStateStoreSpec.scala | 17 +++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 0947d005a..0299dca7f 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -172,7 +172,9 @@ # JAAS login context name for ZooKeeper SASL authentication. # livy.server.zk.sasl.login-context = Client -# The SSL configuration for zookeeper. +# The SSL configuration for zookeeper. When livy.server.zk.client.secure=true, the following +# are required: livy.keystore, livy.server.zk.ssl.keyStore.password, +# livy.server.zk.ssl.truststore.location, livy.server.zk.ssl.truststore.password. # livy.server.zk.client.secure=false # livy.server.zk.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty # livy.server.zk.ssl.keyStore.password= diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala index e4499dd6a..ff483e5df 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala @@ -64,6 +64,18 @@ class ZooKeeperManager( "Correct format is ,. e.g. 5,100") } + if (livyConf.getBoolean(LivyConf.LIVY_ZK_CLIENT_SECURE)) { + Seq( + (LivyConf.SSL_KEYSTORE, livyConf.get(LivyConf.SSL_KEYSTORE)), + (LivyConf.LIVY_ZK_KEYSTORE_PASS, livyConf.get(LivyConf.LIVY_ZK_KEYSTORE_PASS)), + (LivyConf.LIVY_ZK_TRUSTSTORE_FILE, livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_FILE)), + (LivyConf.LIVY_ZK_TRUSTSTORE_PASS, livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_PASS)) + ).foreach { case (entry, value) => + require(value != null && !value.trim.isEmpty, + s"Please config ${entry.key} when ${LivyConf.LIVY_ZK_CLIENT_SECURE.key}=true.") + } + } + private[recovery] def createZKClientConfig = { val clientConfig = new ZKClientConfig clientConfig.setProperty("zookeeper.client.secure", "true") diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala index d005dcfd6..e8222d09a 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -298,6 +298,23 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { zkManager.stop() } } + + Seq( + (LivyConf.SSL_KEYSTORE, "keystore location"), + (LivyConf.LIVY_ZK_KEYSTORE_PASS, "keystore password"), + (LivyConf.LIVY_ZK_TRUSTSTORE_FILE, "truststore location"), + (LivyConf.LIVY_ZK_TRUSTSTORE_PASS, "truststore password") + ).foreach { case (entry, label) => + it(s"should fail fast when LIVY_ZK_CLIENT_SECURE is enabled but $label is missing") { + val sslConf = makeSslConf() + sslConf.set(LivyConf.LIVY_ZK_CLIENT_SECURE, true) + sslConf.set(entry, null) + val thrown = the[IllegalArgumentException] thrownBy { + new ZooKeeperManager(sslConf) + } + thrown.getMessage should include(entry.key) + } + } } } }