From d307601a50d657833d7835463805d95a0dbf50e5 Mon Sep 17 00:00:00 2001 From: JAGANNATHANJP Date: Sat, 23 May 2026 19:59:16 +0530 Subject: [PATCH 1/4] Prevent threaded Dask workers in DistRDF backend --- .../distrdf/python/DistRDF/Backends/Dask/Backend.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py index 84a9a0bbd4abc..3ff54999a06c4 100644 --- a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py +++ b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py @@ -104,6 +104,17 @@ def __init__(self, daskclient: Optional[Client] = None): # N is the number of cores on the local machine. self.client = (daskclient if daskclient is not None else Client(LocalCluster(n_workers=os.cpu_count(), threads_per_worker=1, processes=True))) + + workers = self.client.scheduler_info()["workers"] + + for worker in workers.values(): + threads = worker.get("nthreads", 1) + + if threads > 1: + raise RuntimeError( + "DistRDF with Dask does not support threaded workers. " + "Please use processes=True and threads_per_worker=1." + ) def optimize_npartitions(self) -> int: """ From ec5e56a24a48c075ed2717437f660de0902ee20e Mon Sep 17 00:00:00 2001 From: JAGANNATHANJP Date: Sat, 23 May 2026 20:14:39 +0530 Subject: [PATCH 2/4] Prevent threaded Dask workers in DistRDF backend --- bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py index 3ff54999a06c4..1274d13ac3d13 100644 --- a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py +++ b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py @@ -111,10 +111,10 @@ def __init__(self, daskclient: Optional[Client] = None): threads = worker.get("nthreads", 1) if threads > 1: - raise RuntimeError( - "DistRDF with Dask does not support threaded workers. " - "Please use processes=True and threads_per_worker=1." - ) + raise RuntimeError( + "DistRDF with Dask does not support threaded workers. " + "Please use processes=True and threads_per_worker=1." + ) def optimize_npartitions(self) -> int: """ From 0c91ac57a1af0488baaf0cc7b9db066bf78444b3 Mon Sep 17 00:00:00 2001 From: JAGANNATHANJP Date: Sat, 23 May 2026 20:19:12 +0530 Subject: [PATCH 3/4] Prevent threaded Dask workers in DistRDF backend --- bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py index 1274d13ac3d13..5e3da6ad5a836 100644 --- a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py +++ b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py @@ -105,7 +105,10 @@ def __init__(self, daskclient: Optional[Client] = None): self.client = (daskclient if daskclient is not None else Client(LocalCluster(n_workers=os.cpu_count(), threads_per_worker=1, processes=True))) - workers = self.client.scheduler_info()["workers"] + workers = self.client.scheduler_info().get("workers", None) + + if workers is None: + return for worker in workers.values(): threads = worker.get("nthreads", 1) @@ -114,7 +117,7 @@ def __init__(self, daskclient: Optional[Client] = None): raise RuntimeError( "DistRDF with Dask does not support threaded workers. " "Please use processes=True and threads_per_worker=1." - ) + ) def optimize_npartitions(self) -> int: """ From 5bdb97d9e337f968144d16b4d463ac9bcd43b4fb Mon Sep 17 00:00:00 2001 From: JAGANNATHANJP Date: Mon, 1 Jun 2026 22:12:17 +0530 Subject: [PATCH 4/4] Add test for missing Dask worker information --- .../python/distrdf/backends/check_backend.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/roottest/python/distrdf/backends/check_backend.py b/roottest/python/distrdf/backends/check_backend.py index b7ae271ca26ea..34e3fc8b737fd 100644 --- a/roottest/python/distrdf/backends/check_backend.py +++ b/roottest/python/distrdf/backends/check_backend.py @@ -59,6 +59,27 @@ def test_optimize_npartitions(self, payload): backend = Backend.SparkBackend(sparkcontext=connection) assert backend.optimize_npartitions() == 2 + def test_dask_backend_handles_missing_workers(self, payload): + """ + Check that DaskBackend initialization succeeds when scheduler_info + does not provide worker information. + """ + connection, backend = payload + + if backend != "dask": + return + + from ROOT._distrdf.Backends.Dask import Backend + + original_scheduler_info = connection.scheduler_info + + try: + connection.scheduler_info = lambda: {} + backend = Backend.DaskBackend(daskclient=connection) + assert backend.client is connection + finally: + connection.scheduler_info = original_scheduler_info + class TestInitialization: """Check initialization method in the Dask backend"""