From c720de8fbf93f137f00a75c1f4e0af8babc02414 Mon Sep 17 00:00:00 2001 From: "jianzhen.wu" Date: Tue, 13 May 2025 14:44:30 +0800 Subject: [PATCH 1/5] SPDI-165014. RscClient stop should fail driverRpc --- rsc/src/main/java/org/apache/livy/rsc/RSCClient.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index ee9c9012f..2c112201b 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -230,19 +230,23 @@ public synchronized void stop(boolean shutdownContext) { LOG.warn("Exception while waiting for end session reply.", e); Utils.propagate(e); } finally { + IOException ex = new IOException("RSCClient instance stopped."); if (driverRpc.isSuccess()) { try { driverRpc.get().close(); } catch (Exception e) { LOG.warn("Error stopping RPC.", e); } + } else { + driverRpc.setFailure(ex); + LOG.warn("Set driverRpc as failure in stopping RSCClient."); } // Report failure for all pending jobs, so that clients can react. for (Map.Entry> e : jobs.entrySet()) { LOG.info("Failing pending job {} due to shutdown.", e.getKey()); try { - e.getValue().setFailure(new IOException("RSCClient instance stopped.")); + e.getValue().setFailure(ex); } catch (Exception e2) { LOG.info("Job " + e.getKey() + " already failed.", e2); } From 1ec6b35bacf7d663ac69002c4eb6e3ed5e5828b0 Mon Sep 17 00:00:00 2001 From: "jianzhen.wu" Date: Tue, 13 May 2025 14:49:02 +0800 Subject: [PATCH 2/5] should set failure when driverRpc is not done --- rsc/src/main/java/org/apache/livy/rsc/RSCClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index 2c112201b..36b10924f 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -237,7 +237,7 @@ public synchronized void stop(boolean shutdownContext) { } catch (Exception e) { LOG.warn("Error stopping RPC.", e); } - } else { + } else if (!driverRpc.isDone()){ driverRpc.setFailure(ex); LOG.warn("Set driverRpc as failure in stopping RSCClient."); } From ada250519c5d016f1b88d3f7d40a4a9456c02fe0 Mon Sep 17 00:00:00 2001 From: "jianzhen.wu" Date: Fri, 9 May 2025 18:40:30 +0800 Subject: [PATCH 3/5] SPDI-165011. should ensureRunning before building RPC to spark --- .../server/interactive/InteractiveSession.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 4250794dc..ae9d6c19e 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -566,13 +566,13 @@ class InteractiveSession( } def statements: IndexedSeq[Statement] = { - ensureActive() + ensureRunning() val r = client.get.getReplJobResults().get() r.statements.toIndexedSeq } def getStatement(stmtId: Int): Option[Statement] = { - ensureActive() + ensureRunning() val r = client.get.getReplJobResults(stmtId, 1).get() if (r.statements.length < 1) { None @@ -625,19 +625,19 @@ class InteractiveSession( } def addFile(uri: URI): Unit = { - ensureActive() + ensureRunning() recordActivity() client.get.addFile(resolveURI(uri, livyConf)).get() } def addJar(uri: URI): Unit = { - ensureActive() + ensureRunning() recordActivity() client.get.addJar(resolveURI(uri, livyConf)).get() } def jobStatus(id: Long): Any = { - ensureActive() + ensureRunning() val clientJobId = operations(id) recordActivity() // TODO: don't block indefinitely? @@ -646,7 +646,7 @@ class InteractiveSession( } def cancelJob(id: Long): Unit = { - ensureActive() + ensureRunning() recordActivity() operations.remove(id).foreach { client.get.cancel } } @@ -689,7 +689,7 @@ class InteractiveSession( } private def performOperation(job: Array[Byte], jobType: String, sync: Boolean): Long = { - ensureActive() + ensureRunning() recordActivity() val future = client.get.bypass(ByteBuffer.wrap(job), jobType, sync) val opId = operationCounter.incrementAndGet() From 6b3903598c7ed3d57edd555e8186bea870d54be9 Mon Sep 17 00:00:00 2001 From: "jianzhen.wu" Date: Tue, 13 May 2025 17:19:35 +0800 Subject: [PATCH 4/5] statements api should not stuck --- server/src/main/scala/org/apache/livy/LivyConf.scala | 2 ++ .../apache/livy/server/interactive/InteractiveSession.scala | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 8346b4b5b..d1d16366f 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -356,6 +356,8 @@ object LivyConf { val SESSION_ALLOW_CUSTOM_CLASSPATH = Entry("livy.server.session.allow-custom-classpath", false) + val REQUEST_TIMEOUT = Entry("livy.server.request.timeout", "3s") + val SPARK_MASTER = "spark.master" val SPARK_DEPLOY_MODE = "spark.submit.deployMode" val SPARK_JARS = "spark.jars" diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index ae9d6c19e..b4d645745 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -567,13 +567,15 @@ class InteractiveSession( def statements: IndexedSeq[Statement] = { ensureRunning() - val r = client.get.getReplJobResults().get() + val r = client.get.getReplJobResults().get( + livyConf.getTimeAsMs(LivyConf.REQUEST_TIMEOUT), TimeUnit.MILLISECONDS) r.statements.toIndexedSeq } def getStatement(stmtId: Int): Option[Statement] = { ensureRunning() - val r = client.get.getReplJobResults(stmtId, 1).get() + val r = client.get.getReplJobResults(stmtId, 1).get( + livyConf.getTimeAsMs(LivyConf.REQUEST_TIMEOUT), TimeUnit.MILLISECONDS) if (r.statements.length < 1) { None } else { From bb499387ba6319f76958bfa0dc11bb3b19bacf25 Mon Sep 17 00:00:00 2001 From: "jianzhen.wu" Date: Tue, 13 May 2025 17:23:03 +0800 Subject: [PATCH 5/5] job api should not stuck --- .../livy/server/interactive/InteractiveSession.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index b4d645745..2471815a2 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -629,13 +629,15 @@ class InteractiveSession( def addFile(uri: URI): Unit = { ensureRunning() recordActivity() - client.get.addFile(resolveURI(uri, livyConf)).get() + client.get.addFile(resolveURI(uri, livyConf)).get( + livyConf.getTimeAsMs(LivyConf.REQUEST_TIMEOUT), TimeUnit.MILLISECONDS) } def addJar(uri: URI): Unit = { ensureRunning() recordActivity() - client.get.addJar(resolveURI(uri, livyConf)).get() + client.get.addJar(resolveURI(uri, livyConf)).get( + livyConf.getTimeAsMs(LivyConf.REQUEST_TIMEOUT), TimeUnit.MILLISECONDS) } def jobStatus(id: Long): Any = { @@ -643,7 +645,8 @@ class InteractiveSession( val clientJobId = operations(id) recordActivity() // TODO: don't block indefinitely? - val status = client.get.getBypassJobStatus(clientJobId).get() + val status = client.get.getBypassJobStatus(clientJobId).get( + livyConf.getTimeAsMs(LivyConf.REQUEST_TIMEOUT), TimeUnit.MILLISECONDS) new JobStatus(id, status.state, status.result, status.error) }