Skip to content

CancelledError in successfully created AWS ECS Fargate Cluster #427

@Thodorissio

Description

@Thodorissio

Issue:

I am creating an AWS ECS Fargate Cluster using the Dask Cloudprovider library, following . Although the cluster is successfully created (status is active) and the workers are triggered, the dask operation fails providing CancelledError.

A similar error is occured even if I try different dask array operations (e.g. add operation).

Minimal Complete Verifiable Example:

import dask.array as da
import logging
from dask_cloudprovider.aws import FargateCluster
from distributed import Client

SCHEDULER_CPU = 1024
SCHEDULER_MEM = 4096
WORKER_CPU = 4096
WORKER_MEM = 16384
NUM_WORKERS = 4

logging.basicConfig(level=logging.INFO)


def main():
    cluster_name_template = "dask-fargate-test"

    logging.info("Creating Fargate cluster...")
    cluster = FargateCluster(
        cluster_name_template=cluster_name_template,
        n_workers=NUM_WORKERS,
        worker_cpu=WORKER_CPU,
        worker_mem=WORKER_MEM,
        scheduler_cpu=SCHEDULER_CPU,
        scheduler_mem=SCHEDULER_MEM,
    )
    logging.info("Fargate cluster created.")

    logging.info(f"Scheduler address: {cluster.scheduler_address}")
    # client = Client(address=cluster.scheduler_address, timeout="60s")
    client = Client(cluster)
    logging.info(f"Connected to Dask client: {client}")

    x = da.random.random((10000, 10000), chunks=(1000, 1000))
    y = x + x.T
    z = y[::2, 5000:].mean(axis=1).mean()
    res = z.compute()
    logging.info(f"Result: {res}")


if __name__ == "__main__":
    main()

Runtime logs

INFO:root:Creating Fargate cluster...
INFO:aiobotocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:aiobotocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
C:\Users\thodo\anaconda3\lib\contextlib.py:126: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources on AWS. Hang tight!
  next(self.gen)
INFO:root:Fargate cluster created.
INFO:root:Scheduler address: tcp://3.73.63.81:8786
INFO:numexpr.utils:NumExpr defaulting to 8 threads.
C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py:1393: VersionMismatchWarning: Mismatched versions found

+-------------+---------------+-----------------+-----------------+
| Package     | Client        | Scheduler       | Workers         |
+-------------+---------------+-----------------+-----------------+
| cloudpickle | 2.0.0         | 3.0.0           | 3.0.0           |
| dask        | 2024.5.0      | 2024.5.2        | 2024.5.2        |
| distributed | 2024.5.0      | 2024.5.2        | 2024.5.2        |
| lz4         | None          | 4.3.3           | 4.3.3           |
| msgpack     | 1.0.2         | 1.0.8           | 1.0.8           |
| pandas      | 1.3.4         | 2.2.2           | 2.2.2           |
| python      | 3.9.7.final.0 | 3.10.12.final.0 | 3.10.12.final.0 |
| toolz       | 0.11.1        | 0.12.0          | 0.12.0          |
| tornado     | 6.1           | 6.4             | 6.4             |
+-------------+---------------+-----------------+-----------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
INFO:root:Connected to Dask client: <Client: 'tcp://172.31.4.49:8786' processes=20 threads=80, memory=298.02 GiB>
Traceback (most recent call last):
  File "C:\Users\thodo\Documents\dask-fargate-demo\dask_fargate.py", line 42, in <module>
    main()
  File "C:\Users\thodo\Documents\dask-fargate-demo\dask_fargate.py", line 37, in main
    res = z.compute()
  File "C:\Users\thodo\anaconda3\lib\site-packages\dask\base.py", line 375, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "C:\Users\thodo\anaconda3\lib\site-packages\dask\base.py", line 661, in compute
    results = schedule(dsk, keys, **kwargs)
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py", line 2233, in _gather
    raise exc
concurrent.futures._base.CancelledError: ('mean_agg-aggregate-d9d3bd3022a1c263a4fd8de53f2c280c',)

Additional Issue:

A similar CancelledError is triggered, when I try to reconnect to the scheduler address of the created cluster (after the initial connection when I created the cluster):

import logging
from distributed import Client

logging.basicConfig(level=logging.INFO)


def main():
    logging.info("Connecting to existing cluster address...")
    client = Client(address="tcp://3.73.63.81:8786", timeout="60s")
    logging.info(f"Connected to Dask client: {client}")


if __name__ == "__main__":
    main()

Output:

INFO:root:Connecting to existing cluster address...
Traceback (most recent call last):
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\comm\tcp.py", line 546, in connect
    stream = await self.client.connect(
  File "C:\Users\thodo\anaconda3\lib\site-packages\tornado\tcpclient.py", line 275, in connect
    af, addr, stream = await connector.start(connect_timeout=timeout)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\thodo\anaconda3\lib\asyncio\tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\comm\core.py", line 342, in connect
    comm = await wait_for(
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\utils.py", line 1961, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "C:\Users\thodo\anaconda3\lib\asyncio\tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\thodo\Documents\dask-fargate-demo\conn_demo.py", line 14, in <module>
    main()
  File "C:\Users\thodo\Documents\dask-fargate-demo\conn_demo.py", line 9, in main
    client = Client(address="tcp://3.73.63.81:8786", timeout="60s")
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py", line 1017, in __init__
    self.start(timeout=timeout)
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py", line 1219, in start
    sync(self.loop, self._start, **kwargs)
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\utils.py", line 434, in sync
    raise error
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\utils.py", line 408, in f
    result = yield future
  File "C:\Users\thodo\anaconda3\lib\site-packages\tornado\gen.py", line 762, in run
    value = future.result()
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py", line 1298, in _start
    await self._ensure_connected(timeout=timeout)
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py", line 1360, in _ensure_connected
    comm = await connect(
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\comm\core.py", line 368, in connect
    raise OSError(
OSError: Timed out trying to connect to tcp://3.73.63.81:8786 after 60 s

Local Environment:

  • dask: 2024.5.0
  • dask-cloudprovider: 2022.10.0
  • distributed: 2024.5.0
  • Python version: 3.9.7
  • Operating System: Windows
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions