2

I got the following error on the scheduler while running Dask on a distributed job:

distributed.core - ERROR -
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/distributed/core.py", line 269, in write
    frames = protocol.dumps(msg)
  File "/usr/local/lib/python3.4/dist-packages/distributed/protocol.py", line 81, in dumps
    frames = dumps_msgpack(small)
  File "/usr/local/lib/python3.4/dist-packages/distributed/protocol.py", line 153, in dumps_msgpack
    payload = msgpack.dumps(msg, use_bin_type=True)
  File "/usr/local/lib/python3.4/dist-packages/msgpack/__init__.py", line 47, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 231, in msgpack._packer.Packer.pack (msgpack/_packer.cpp:231)
  File "msgpack/_packer.pyx", line 239, in msgpack._packer.Packer.pack (msgpack/_packer.cpp:239)
MemoryError

Is this running out of memory on the scheduler or on one of the workers? Or both??

Moon Cheesez
  • 2,489
  • 3
  • 24
  • 38
JRR
  • 6,014
  • 6
  • 39
  • 59
  • Given that the error comes from the scheduler my guess is that it's the scheduler that is running out of memory. This is pretty rare under normal operation. It would be interesting to know in your question what you're doing when this occurs. – MRocklin Jul 23 '16 at 11:50
  • One common case is that you're gathering a huge amount of data back from the workers to the client in a call to `e.gather(...)` or `collection.compute()` – MRocklin Jul 23 '16 at 12:55

1 Answers1

2

The most common cause of this error is trying to collect too much data, such as occurs in the following example using dask.dataframe:

df = dd.read_csv('s3://bucket/lots-of-data-*.csv')
df.compute()

This loads all of the data into RAM across the cluster (which is fine), and then tries to bring the entire result back to the local machine by way of the scheduler (which probably can't handle your 100's of GB of data all in one place.) Worker-to-client communications pass through the Scheduler, so it is the first single machine to receive all of the data and the first machine likely to fail.

If this is the case then you instead probably want to use the Executor.persist method, to trigger computation but leave it on the cluster.

df = dd.read_csv('s3://bucket/lots-of-data-*.csv')
df = e.persist(df)

Generally we only use df.compute() for small results that we want to view in our local session.

MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • In my case the scheduler and the client are running on the same machine. So how can I retrieve `df` after using `persist` then? Wouldn't it cause the same problem as before as `df` needs to be streamed back to the scheduler / client? – JRR Jul 23 '16 at 16:25
  • 2
    If you're on the same machine and have a dataset that fits in memory then maybe just use Pandas instead? I wouldn't use dask.dataframe if its not necessary. Alternatively consider the single-machine scheduler. – MRocklin Jul 24 '16 at 11:14