diff --git a/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java b/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java index ecaa747e..02f50f68 100644 --- a/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java +++ b/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java @@ -18,21 +18,32 @@ package org.apache.spark.deploy.raydp; import java.util.List; +import java.util.Optional; import io.ray.api.ActorHandle; import io.ray.api.Ray; public class ExternalShuffleServiceUtils { + + private static String getShuffleServiceActorName(String node) { + return "raydp-shuffle-service-" + node.replace('.', '-'); + } + public static ActorHandle createShuffleService( String node, List options) { + String actorName = getShuffleServiceActorName(node); + Optional> existing = Ray.getActor(actorName); + if (existing.isPresent()) { + return existing.get(); + } + return Ray.actor(RayExternalShuffleService::new) + .setName(actorName) .setResource("node:" + node, 0.01) - .setJvmOptions(options).remote(); - } - - public static void startShuffleService( - ActorHandle handle) { - handle.task(RayExternalShuffleService::start).remote(); + .setJvmOptions(options) + .setMaxRestarts(-1) + .setMaxTaskRetries(-1) + .remote(); } public static void stopShuffleService( diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index f4cc823d..275495b3 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -151,16 +151,12 @@ class RayAppMaster(host: String, case RegisterExecutor(executorId, executorIp) => val success = appInfo.registerExecutor(executorId) if (success) { - // external shuffle service is enabled - if (conf.getBoolean("spark.shuffle.service.enabled", false)) { - // the node executor is in has not started shuffle service - if (!nodesWithShuffleService.contains(executorIp)) { - logInfo(s"Starting shuffle service on ${executorIp}") - val service = ExternalShuffleServiceUtils.createShuffleService( - executorIp, shuffleServiceOptions.toBuffer.asJava) - ExternalShuffleServiceUtils.startShuffleService(service) - nodesWithShuffleService(executorIp) = service - } + if (conf.getBoolean("spark.shuffle.service.enabled", false) && + !nodesWithShuffleService.contains(executorIp)) { + logInfo(s"Ensuring shuffle service on ${executorIp}") + val service = ExternalShuffleServiceUtils.createShuffleService( + executorIp, shuffleServiceOptions.toBuffer.asJava) + nodesWithShuffleService(executorIp) = service } setUpExecutor(executorId) } diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala index 374143af..d5d978dc 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.raydp -import io.ray.api.Ray; +import io.ray.api.Ray import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.ExternalShuffleService @@ -28,7 +28,9 @@ class RayExternalShuffleService() extends Logging { val mgr = new SecurityManager(conf) val instance = new ExternalShuffleService(conf, mgr) - def start(): Unit = { + start() + + final def start(): Unit = { instance.start() }