diff --git a/conf/livy.conf.template b/conf/livy.conf.template index b4dd2f436..0299dca7f 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -172,6 +172,15 @@ # JAAS login context name for ZooKeeper SASL authentication. # livy.server.zk.sasl.login-context = Client +# The SSL configuration for zookeeper. When livy.server.zk.client.secure=true, the following +# are required: livy.keystore, livy.server.zk.ssl.keyStore.password, +# livy.server.zk.ssl.truststore.location, livy.server.zk.ssl.truststore.password. +# livy.server.zk.client.secure=false +# livy.server.zk.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty +# livy.server.zk.ssl.keyStore.password= +# livy.server.zk.ssl.truststore.location= +# livy.server.zk.ssl.truststore.password= + # If Livy can't find the yarn app within this time, consider it lost. # livy.server.yarn.app-lookup-timeout = 120s # When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 03ba46fca..006ab5dff 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -367,6 +367,13 @@ object LivyConf { val SESSION_ALLOW_CUSTOM_CLASSPATH = Entry("livy.server.session.allow-custom-classpath", false) + val LIVY_ZK_CLIENT_SOCKET = Entry("livy.server.zk.clientCnxnSocket", + "org.apache.zookeeper.ClientCnxnSocketNetty") + val LIVY_ZK_KEYSTORE_PASS = Entry("livy.server.zk.ssl.keyStore.password", null) + val LIVY_ZK_TRUSTSTORE_FILE = Entry("livy.server.zk.ssl.truststore.location", null) + val LIVY_ZK_TRUSTSTORE_PASS = Entry("livy.server.zk.ssl.truststore.password", null) + val LIVY_ZK_CLIENT_SECURE = Entry("livy.server.zk.client.secure", false) + val SPARK_MASTER = "spark.master" val SPARK_DEPLOY_MODE = "spark.submit.deployMode" val SPARK_JARS = "spark.jars" diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala index eb10f6312..ff483e5df 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala @@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener} import org.apache.curator.retry.RetryNTimes import org.apache.zookeeper.KeeperException.NoNodeException +import org.apache.zookeeper.client.ZKClientConfig import org.apache.livy.LivyConf import org.apache.livy.Logging @@ -63,6 +64,38 @@ class ZooKeeperManager( "Correct format is ,. e.g. 5,100") } + if (livyConf.getBoolean(LivyConf.LIVY_ZK_CLIENT_SECURE)) { + Seq( + (LivyConf.SSL_KEYSTORE, livyConf.get(LivyConf.SSL_KEYSTORE)), + (LivyConf.LIVY_ZK_KEYSTORE_PASS, livyConf.get(LivyConf.LIVY_ZK_KEYSTORE_PASS)), + (LivyConf.LIVY_ZK_TRUSTSTORE_FILE, livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_FILE)), + (LivyConf.LIVY_ZK_TRUSTSTORE_PASS, livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_PASS)) + ).foreach { case (entry, value) => + require(value != null && !value.trim.isEmpty, + s"Please config ${entry.key} when ${LivyConf.LIVY_ZK_CLIENT_SECURE.key}=true.") + } + } + + private[recovery] def createZKClientConfig = { + val clientConfig = new ZKClientConfig + clientConfig.setProperty("zookeeper.client.secure", "true") + clientConfig.setProperty("zookeeper.clientCnxnSocket", + livyConf.get(LivyConf.LIVY_ZK_CLIENT_SOCKET)) + clientConfig.setProperty("zookeeper.ssl.keyStore.location", + livyConf.get(LivyConf.SSL_KEYSTORE)) + clientConfig.setProperty("zookeeper.ssl.keyStore.password", + livyConf.get(LivyConf.LIVY_ZK_KEYSTORE_PASS)) + clientConfig.setProperty("zookeeper.ssl.keyStore.type", + livyConf.get(LivyConf.SSL_KEYSTORE_TYPE)) + clientConfig.setProperty("zookeeper.ssl.trustStore.location", + livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_FILE)) + clientConfig.setProperty("zookeeper.ssl.trustStore.password", + livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_PASS)) + clientConfig.setProperty("zookeeper.ssl.trustStore.type", + livyConf.get(LivyConf.SSL_KEYSTORE_TYPE)) + clientConfig + } + private val curatorClient = mockCuratorClient.getOrElse { if (livyConf.getBoolean(LivyConf.ZK_SASL_ENABLED)) { System.setProperty("zookeeper.sasl.client", "true") @@ -73,10 +106,13 @@ class ZooKeeperManager( info(s"ZooKeeper SASL authentication enabled with login context: " + s"${Option(loginContext).getOrElse("Client")}") } - CuratorFrameworkFactory.builder() - .connectString(zkAddress) - .retryPolicy(retryPolicy) - .build() + val builder = CuratorFrameworkFactory.builder() + builder.connectString(zkAddress) + builder.retryPolicy(retryPolicy) + if (livyConf.getBoolean(LivyConf.LIVY_ZK_CLIENT_SECURE)) { + builder.zkClientConfig(createZKClientConfig) + } + builder.build() } curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener { diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala index f3ffd9b66..e8222d09a 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -41,12 +41,17 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { val key = "key" val prefixedKey = s"/livy/$key" - def withMock[R](testBody: TestFixture => R): R = { - val curatorClient = mock[CuratorFramework] - when(curatorClient.getUnhandledErrorListenable()) + def mockCurator(): CuratorFramework = { + val cc = mock[CuratorFramework] + when(cc.getUnhandledErrorListenable()) .thenReturn(mock[Listenable[UnhandledErrorListener]]) - when(curatorClient.getConnectionStateListenable()) + when(cc.getConnectionStateListenable()) .thenReturn(mock[Listenable[ConnectionStateListener]]) + cc + } + + def withMock[R](testBody: TestFixture => R): R = { + val curatorClient = mockCurator() val zkManager = new ZooKeeperManager(conf, Some(curatorClient)) zkManager.start() val stateStore = new ZooKeeperStateStore(conf, zkManager) @@ -191,9 +196,7 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { } it("should register a ConnectionStateListener that handles all connection states") { - val curatorClient = mock[CuratorFramework] - when(curatorClient.getUnhandledErrorListenable()) - .thenReturn(mock[Listenable[UnhandledErrorListener]]) + val curatorClient = mockCurator() val listenable = mock[Listenable[ConnectionStateListener]] when(curatorClient.getConnectionStateListenable()).thenReturn(listenable) @@ -219,5 +222,99 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { System.getProperty("zookeeper.sasl.client") shouldBe null System.getProperty("zookeeper.sasl.clientconfig") shouldBe null } + + describe("SSL config") { + case class SslTestFixture(zkManager: ZooKeeperManager, curatorClient: CuratorFramework) + + def makeSslConf(): LivyConf = { + val c = new LivyConf() + c.set(LivyConf.RECOVERY_STATE_STORE_URL, "/tmp/livy") + c.set(LivyConf.SSL_KEYSTORE, "/tmp/keystore.jks") + c.set(LivyConf.SSL_KEYSTORE_PASSWORD, "keystorePass") + c.set(LivyConf.SSL_KEY_PASSWORD, "keyPass") + c.set(LivyConf.SSL_KEYSTORE_TYPE, "JKS") + c.set(LivyConf.LIVY_ZK_KEYSTORE_PASS, "keystorePass") + c.set(LivyConf.LIVY_ZK_TRUSTSTORE_FILE, "/tmp/truststore.jks") + c.set(LivyConf.LIVY_ZK_TRUSTSTORE_PASS, "truststorePass") + c + } + + def withSslMock[R](sslConf: LivyConf)(testBody: SslTestFixture => R): R = { + val curatorClient = mockCurator() + val zkManager = new ZooKeeperManager(sslConf, Some(curatorClient)) + zkManager.start() + testBody(SslTestFixture(zkManager, curatorClient)) + } + + it("createZKClientConfig should set secure flag and socket class") { + withSslMock(makeSslConf()) { f => + verify(f.curatorClient).start() + val zkConfig = f.zkManager.createZKClientConfig + zkConfig.getProperty("zookeeper.client.secure") shouldBe "true" + zkConfig.getProperty("zookeeper.clientCnxnSocket") shouldBe + "org.apache.zookeeper.ClientCnxnSocketNetty" + } + } + + it("createZKClientConfig should set keystore location, password and type from LivyConf") { + withSslMock(makeSslConf()) { f => + verify(f.curatorClient).start() + val zkConfig = f.zkManager.createZKClientConfig + zkConfig.getProperty("zookeeper.ssl.keyStore.location") shouldBe "/tmp/keystore.jks" + zkConfig.getProperty("zookeeper.ssl.keyStore.password") shouldBe "keystorePass" + zkConfig.getProperty("zookeeper.ssl.keyStore.type") shouldBe "JKS" + } + } + + it("createZKClientConfig should set truststore location, password and type from LivyConf") { + withSslMock(makeSslConf()) { f => + verify(f.curatorClient).start() + val zkConfig = f.zkManager.createZKClientConfig + zkConfig.getProperty("zookeeper.ssl.trustStore.location") shouldBe + "/tmp/truststore.jks" + zkConfig.getProperty("zookeeper.ssl.trustStore.password") shouldBe "truststorePass" + // trustStore.type reuses SSL_KEYSTORE_TYPE + zkConfig.getProperty("zookeeper.ssl.trustStore.type") shouldBe "JKS" + } + } + + it("should build successfully when LIVY_ZK_CLIENT_SECURE is enabled") { + val sslConf = makeSslConf() + sslConf.set(LivyConf.LIVY_ZK_CLIENT_SECURE, true) + noException should be thrownBy { + val zkManager = new ZooKeeperManager(sslConf, Some(mockCurator())) + zkManager.start() + zkManager.stop() + } + } + + it("should build successfully when LIVY_ZK_CLIENT_SECURE is disabled") { + val noSslConf = new LivyConf() + noSslConf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host") + noSslConf.set(LivyConf.LIVY_ZK_CLIENT_SECURE, false) + noException should be thrownBy { + val zkManager = new ZooKeeperManager(noSslConf, Some(mockCurator())) + zkManager.start() + zkManager.stop() + } + } + + Seq( + (LivyConf.SSL_KEYSTORE, "keystore location"), + (LivyConf.LIVY_ZK_KEYSTORE_PASS, "keystore password"), + (LivyConf.LIVY_ZK_TRUSTSTORE_FILE, "truststore location"), + (LivyConf.LIVY_ZK_TRUSTSTORE_PASS, "truststore password") + ).foreach { case (entry, label) => + it(s"should fail fast when LIVY_ZK_CLIENT_SECURE is enabled but $label is missing") { + val sslConf = makeSslConf() + sslConf.set(LivyConf.LIVY_ZK_CLIENT_SECURE, true) + sslConf.set(entry, null) + val thrown = the[IllegalArgumentException] thrownBy { + new ZooKeeperManager(sslConf) + } + thrown.getMessage should include(entry.key) + } + } + } } }