From 00018fb4f6320168ee9f189fdc9c4d591e0785a4 Mon Sep 17 00:00:00 2001 From: epsilonwang Date: Thu, 14 May 2026 08:48:27 +0000 Subject: [PATCH 1/3] fix(shuffle): make shuffle service actors named and idempotent Prevent duplicate shuffle service actors on the same node by assigning unique names and reusing existing actors. Also enable fault tolerance with max restarts and task retries, and remove the now-unnecessary explicit start call. Signed-off-by: epsilonwang --- .../raydp/ExternalShuffleServiceUtils.java | 23 ++++++++++++++----- .../spark/deploy/raydp/RayAppMaster.scala | 16 +++++-------- .../raydp/RayExternalShuffleService.scala | 4 +++- 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java b/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java index ecaa747e..02f50f68 100644 --- a/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java +++ b/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java @@ -18,21 +18,32 @@ package org.apache.spark.deploy.raydp; import java.util.List; +import java.util.Optional; import io.ray.api.ActorHandle; import io.ray.api.Ray; public class ExternalShuffleServiceUtils { + + private static String getShuffleServiceActorName(String node) { + return "raydp-shuffle-service-" + node.replace('.', '-'); + } + public static ActorHandle createShuffleService( String node, List options) { + String actorName = getShuffleServiceActorName(node); + Optional> existing = Ray.getActor(actorName); + if (existing.isPresent()) { + return existing.get(); + } + return Ray.actor(RayExternalShuffleService::new) + .setName(actorName) .setResource("node:" + node, 0.01) - .setJvmOptions(options).remote(); - } - - public static void startShuffleService( - ActorHandle handle) { - handle.task(RayExternalShuffleService::start).remote(); + .setJvmOptions(options) + .setMaxRestarts(-1) + .setMaxTaskRetries(-1) + .remote(); } public static void stopShuffleService( diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index f4cc823d..f289c112 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -151,16 +151,12 @@ class RayAppMaster(host: String, case RegisterExecutor(executorId, executorIp) => val success = appInfo.registerExecutor(executorId) if (success) { - // external shuffle service is enabled - if (conf.getBoolean("spark.shuffle.service.enabled", false)) { - // the node executor is in has not started shuffle service - if (!nodesWithShuffleService.contains(executorIp)) { - logInfo(s"Starting shuffle service on ${executorIp}") - val service = ExternalShuffleServiceUtils.createShuffleService( - executorIp, shuffleServiceOptions.toBuffer.asJava) - ExternalShuffleServiceUtils.startShuffleService(service) - nodesWithShuffleService(executorIp) = service - } + if (conf.getBoolean("spark.shuffle.service.enabled", false) && + !nodesWithShuffleService.contains(executorIp)) { + logInfo(s"Starting shuffle service on ${executorIp}") + val service = ExternalShuffleServiceUtils.createShuffleService( + executorIp, shuffleServiceOptions.toBuffer.asJava) + nodesWithShuffleService(executorIp) = service } setUpExecutor(executorId) } diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala index 374143af..7a8bf74a 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.raydp -import io.ray.api.Ray; +import io.ray.api.Ray import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.ExternalShuffleService @@ -28,6 +28,8 @@ class RayExternalShuffleService() extends Logging { val mgr = new SecurityManager(conf) val instance = new ExternalShuffleService(conf, mgr) + start() + def start(): Unit = { instance.start() } From 02aafe986a436831dbc13ab6158e332837415ab8 Mon Sep 17 00:00:00 2001 From: epsilonwang Date: Mon, 18 May 2026 03:58:23 +0000 Subject: [PATCH 2/3] fix: address PR review feedback - Mark RayExternalShuffleService.start() as final to prevent overridable method call from constructor pitfall - Change log message from 'Starting shuffle service' to 'Ensuring shuffle service' to accurately reflect that the actor may already exist --- .../main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala | 2 +- .../apache/spark/deploy/raydp/RayExternalShuffleService.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index f289c112..275495b3 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -153,7 +153,7 @@ class RayAppMaster(host: String, if (success) { if (conf.getBoolean("spark.shuffle.service.enabled", false) && !nodesWithShuffleService.contains(executorIp)) { - logInfo(s"Starting shuffle service on ${executorIp}") + logInfo(s"Ensuring shuffle service on ${executorIp}") val service = ExternalShuffleServiceUtils.createShuffleService( executorIp, shuffleServiceOptions.toBuffer.asJava) nodesWithShuffleService(executorIp) = service diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala index 7a8bf74a..d5d978dc 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala @@ -30,7 +30,7 @@ class RayExternalShuffleService() extends Logging { start() - def start(): Unit = { + final def start(): Unit = { instance.start() } From 788c2d0653840104cb491b489baea536f2cdbc06 Mon Sep 17 00:00:00 2001 From: epsilonwang Date: Thu, 21 May 2026 03:13:58 +0000 Subject: [PATCH 3/3] retrigger CI