0

I'm parallelizing a function over 32 cores and am having some trouble accessing a shared dataframe dask_paths. All the code works correctly when I get rid of the line (and lines that depend on it) dask_paths[dask_paths['od'] == (o,d)].compute(). Incredibly, if I compute this for some fixed o, d outside of the distributed code, then use that result, I get what I want (for that o, d). This means it really is just the actual accessing of dask_paths that is failing in the parallel computation. I am using the logic given here for "embarassingly paralellizable for loops" in the dask docs. Moreover, I used to use get_group on a global pd.DataFrame grouped for this logic, and that suffered from the same problem of glogbal (even though this is serially done in a couple seconds, the computation stalls before giving a cryptic error message, given at the bottom). I don't know why this is.

Note that dask_paths is a Dask.dataframe. This is the most basic of logic in parallellizing with dask, so not sure why it's failing. I am working on a Vertex AI jupyter notebook on Google Cloud. There is no error trace, because the program simply stalls. All the data/dataframes have been defined in the global environment of the jupyter notebook in the cells above, and are working fine. The vertex AI notebook has 16 vCPUs and 100GB RAM and is run on Google Cloud's VM. There is no reading or writing to any files, so that's not the issue.

# dask_paths['od'] takes on values like (100, 150)
# popular takes the form of a [[20, 25], [100, 150], [67, 83],...]
   # and is of length 2000 elements, every element is a list of len 2
def main():
    def pop2unique(pop): 
        df = dask_paths[dask_paths['od'] == (pop[0], pop[1])].compute()
        return df['last'].sum()
        
    lzs = []
    ncores = 32 
    dask_client.cluster.scale(10)

    futures = dask_client.map(pop2unique, popular[:10]) # this stalls 
    results = dask_client.gather(futures)

And dask_paths takes the form

index       (o, d)       last
605096     (22, 25)       10
999336     (103, 88)      31
1304512    (101, 33)      9
1432383    (768, 21)      16

The client being used everywhere is given by

from dask.distributed import Client, progress
dask_client = Client(threads_per_worker=4, n_workers=8)

The error message I get is long and cryptic:

distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-9y17gy_r', purging
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 33
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 35
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 33', 'time': 1658860560.8544912}
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 36
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 31
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 35', 'time': 1658860560.8563268}
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 36', 'time': 1658860560.8576422}
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 34
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 31', 'time': 1658860560.8595085}
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 32
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 34', 'time': 1658860560.8609138}
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860560.862359}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 36', 'time': 1658860560.8576422}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 31', 'time': 1658860560.8595085}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 35', 'time': 1658860560.8563268}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 33', 'time': 1658860560.8544912}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 34', 'time': 1658860560.8609138}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860560.862359}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860560.862359}")>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
    response = await self.instantiate()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
    result = await self.process.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
    msg = await self._wait_until_connected(uid)
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
    raise msg["exception"]
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860560.862359}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 36', 'time': 1658860560.8576422}")>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
    response = await self.instantiate()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
    result = await self.process.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
    msg = await self._wait_until_connected(uid)
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
    raise msg["exception"]
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 36', 'time': 1658860560.8576422}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 35', 'time': 1658860560.8563268}")>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
    response = await self.instantiate()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
    result = await self.process.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
    msg = await self._wait_until_connected(uid)
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
    raise msg["exception"]
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 35', 'time': 1658860560.8563268}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 34', 'time': 1658860560.8609138}")>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
    response = await self.instantiate()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
    result = await self.process.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
    msg = await self._wait_until_connected(uid)
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
    raise msg["exception"]
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 34', 'time': 1658860560.8609138}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 33', 'time': 1658860560.8544912}")>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
    response = await self.instantiate()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
    result = await self.process.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
    msg = await self._wait_until_connected(uid)
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
    raise msg["exception"]
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 33', 'time': 1658860560.8544912}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 31', 'time': 1658860560.8595085}")>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
    response = await self.instantiate()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
    result = await self.process.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
    msg = await self._wait_until_connected(uid)
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
    raise msg["exception"]
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 31', 'time': 1658860560.8595085}
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-xd5jxrin', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-w_fmefrs', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-djg8ki4m', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-ho1hw10b', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-mbdw10vg', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-whk890cp', purging
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 32
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860564.692771}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860564.692771}
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f82cf7eba10>>, <Task finished coro=<SpecCluster._correct_state_internal() done, defined at /opt/conda/lib/python3.7/site-packages/distributed/deploy/spec.py:310> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860564.692771}")>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.7/site-packages/distributed/deploy/spec.py", line 348, in _correct_state_internal
    await w  # for tornado gen.coroutine support
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
    response = await self.instantiate()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
    result = await self.process.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
    msg = await self._wait_until_connected(uid)
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
    raise msg["exception"]
  File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
    await worker
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
    await self.start()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
    await self._register_with_scheduler()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
    raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860564.692771}
Tanishq Kumar
  • 263
  • 1
  • 13
  • any chance you could turn this into an [mre] or at the very least simplify your example so that all variables are defined/described and ideally you're not using any data structures specific to your workflow? we have no idea what `o, d` means. it would be a lot easier for us to understand your workflow if you could reproduce your problem using a much simpler `client.map(simple_read_function, list_of_filepaths)` or something. Is the problem in how your dask environment is set up and has access to files? or is it just a bug in whatever `popular` is? it's really hard for us to tell. – Michael Delgado Jul 26 '22 at 18:11
  • Sure! Give me 2 mins! – Tanishq Kumar Jul 26 '22 at 18:12
  • also it would be really helpful if your paths were defined in the question and if you gave more detail about your dask setup, where the data is, and how your workers are accessing it. are you on an AWS Kubernetes cluster reading from s3? or an academic computing facility reading from NFS? or your laptop on a LocalCluster? all this stuff matters for debugging data locality issues. Finally, if you're getting a specific error, please include the [full traceback](//realpython.com/python-traceback) - it includes lots of valuable debugging information. – Michael Delgado Jul 26 '22 at 18:15
  • Done! @MichaelDelgado Let me know what you think! I'm very puzzled and new to dask - thanks :) – Tanishq Kumar Jul 26 '22 at 18:20
  • @MichaelDelgado I have also added what the data looks like and the full stack trace of the error message. Help much appreciated, thanks! – Tanishq Kumar Jul 26 '22 at 18:40
  • yep definitely agree with MDurant's answer - generally you should be submitting/controlling tasks from your local process, and workers should not be working directly with dask collections. So you should never be calling compute within a mapped task. Also, mixing client.map and dask.dataframe generally isn't a good idea. I still don't really understand your issue, and what your example data has to do with filepaths and reading data. but changing around your workflow so that the local process is the only one working with dask collections would be my #1 tip – Michael Delgado Jul 26 '22 at 19:10
  • There is nothing to do with filepaths and reading data, "read" in the title just refers to `pop2unique` accessing the global `dask_paths` dataframe, and that causing failure. – Tanishq Kumar Jul 26 '22 at 19:11
  • got it. also definitely best to avoid passing around globals into dask tasks using [closures](https://stackoverflow.com/questions/13857/can-you-explain-closures-as-they-relate-to-python). instead, try to make dask tasks pure, idempotent functions whenever possible. – Michael Delgado Jul 26 '22 at 19:38

1 Answers1

1

The errors you are seeing might not be related to your workflow - maybe a version conflict or similar.

However, you are mixing dask API paradigms. You have created a dask-dataframe - which understand how to partitionify operations for dask to compute - but then chosen to create tasks manually yourself. This is a bad idea. Dask tasks should generally be operating on one partition of a normal data structure (in this case, dataframe) not on a dask collection (in this case, dask dataframe). It may be (I am not sure) that the attempt to serialise the dask-dataframe and deserialise it on the workers is what is causing them to fail to start properly.

Your workflow at first glance looks like a full shuffle, but indeed i does parallelise OK, because you can groupby in each partition, and sum the results.

def per_partition_op(df):
    return df.groupby("od")["last"].sum()

df2 = df.map_partitions(per_partition_op)

At this point, you can just compute and work with the partials series, since this should already be of a manageable size

partials = df2.compute()
results = partials.groupby(level=0).sum()
mdurant
  • 27,272
  • 5
  • 45
  • 74