diff --git a/api/src/main/java/org/apache/livy/LivyClientBuilder.java b/api/src/main/java/org/apache/livy/LivyClientBuilder.java index 1d7ec0183..654b5502a 100644 --- a/api/src/main/java/org/apache/livy/LivyClientBuilder.java +++ b/api/src/main/java/org/apache/livy/LivyClientBuilder.java @@ -36,6 +36,7 @@ public final class LivyClientBuilder { public static final String LIVY_URI_KEY = "livy.uri"; + public static final String LIVY_SESSION_ID_KEY = "livy.sessionId"; private static final ServiceLoader CLIENT_FACTORY_LOADER = ServiceLoader.load(LivyClientFactory.class, classLoader()); @@ -96,11 +97,32 @@ public LivyClientBuilder(boolean loadDefaults) throws IOException { } } + /** + * Sets the URI of the Livy server the client will connect to. If the URI contains + *
sessions/{sessionId}
, the client will connect to the specified existing session; + * otherwise it will create a new session. + * + * @param uri The URI of Livy server. + * @return The builder itself. + */ public LivyClientBuilder setURI(URI uri) { config.setProperty(LIVY_URI_KEY, uri.toString()); return this; } + /** + * Sets the session ID the client will connect to. If a session ID is set, the chosen session + * will be used with its own configurations, so Spark configurations will be ignored. If not set, + * a new session will be created when the client is built. + * + * @param sessionId The ID of the session to attach to. + * @return the builder itself. + */ + public LivyClientBuilder setSessionId(int sessionId) { + config.setProperty(LIVY_SESSION_ID_KEY, String.valueOf(sessionId)); + return this; + } + public LivyClientBuilder setConf(String key, String value) { if (value != null) { config.setProperty(key, value); diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index f40148f94..82d76d5ee 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -33,6 +33,7 @@ import org.apache.livy.Job; import org.apache.livy.JobHandle; import org.apache.livy.LivyClient; +import org.apache.livy.LivyClientBuilder; import org.apache.livy.client.common.Serializer; import static org.apache.livy.client.common.HttpMessages.*; @@ -59,9 +60,17 @@ class HttpClient implements LivyClient { // unused. Matcher m = Pattern.compile("(.*)" + LivyConnection.SESSIONS_URI + "/([0-9]+)") .matcher(uri.getPath()); + String sessionIdFromConf = httpConf.get(LivyClientBuilder.LIVY_SESSION_ID_KEY); try { - if (m.matches()) { + if (sessionIdFromConf != null && m.matches()) { + throw new IllegalArgumentException( + "Cannot set existing session both from URI and configuration"); + } else if (sessionIdFromConf != null) { + this.conn = new LivyConnection(uri, httpConf); + this.sessionId = Integer.parseInt(sessionIdFromConf); + conn.post(null, SessionInfo.class, "/%d/connect", sessionId); + } else if (m.matches()) { URI base = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), m.group(1), uri.getQuery(), uri.getFragment()); diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala index f53d9f5b4..7ca4f796a 100644 --- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala @@ -181,8 +181,8 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni testJob(false, response = Some(null)) } - withClient("should connect to existing sessions") { - var sid = client.asInstanceOf[HttpClient].getSessionId() + withClient("should connect to existing sessions using the URI") { + val sid = client.asInstanceOf[HttpClient].getSessionId() val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" + s"${LivyConnection.SESSIONS_URI}/$sid" val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).build() @@ -190,6 +190,25 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni verify(session, never()).stop() } + withClient("should connect to existing sessions using the conf") { + val sid = client.asInstanceOf[HttpClient].getSessionId() + val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" + val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).setSessionId(sid).build() + newClient.stop(false) + verify(session, never()).stop() + } + + withClient("should throw an exception if the sessionId is set through conf and URI") { + val sid = client.asInstanceOf[HttpClient].getSessionId() + val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" + + s"${LivyConnection.SESSIONS_URI}/$sid" + intercept[IllegalArgumentException]{ + val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).setSessionId(sid).build() + newClient.stop(false) + verify(session, never()).stop() + } + } + withClient("should tear down clients") { client.stop(true) verify(session, times(1)).stop()