2

In the below operation (adapted from the Dask DataFrame API docs), if I don't attach to a scheduler (leave the line assigning the client variable commented out), the operation completes successfully as expected.

from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd

connection_loc = 'foobar.net:8786'
# client = Client(connection_loc)

df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [1., 2., 3., 4., 5.]})
ddf = dd.from_pandas(df, npartitions=2)
foo = ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute()

The moment that same line is uncommented and a client connection is assigned, the following error occurs: TypeError: unorderable types: list() >= int() (see full traceback for more).

Examining the traceback, I can see that the bytestring it is trying to deserialize is not what I would expect it should be trying to deserialize (see first line in full traceback distributed.protocol.pickle - INFO - Failed to deserialize).

I've completely stopped and restarted the remote containers running both the worker and the scheduler to no avail. I've also used client.restart() with no luck. Any idea why this other task is being passed to the worker and throwing this error? Any solution to get Dask to stop doing this?

Full traceback:

dask_worker_1     | distributed.protocol.pickle - INFO - Failed to deserialize b"\x80\x04\x95+\x01\x00\x00\x00\x00\x00\x00(\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_fill_function\x94\x93\x94(h\x00\x8c\x0f_make_skel_func\x94\x93\x94h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x02KCC\x0e|\x00j\x00d\x01\x83\x01j\x01\x83\x00S\x00\x94NK\x02\x86\x94\x8c\x07rolling\x94\x8c\x03sum\x94\x86\x94\x8c\x02df\x94\x85\x94\x8c\x1fdask_method/dask_dist_matrix.py\x94\x8c\x08<lambda>\x94K\rC\x00\x94))t\x94R\x94]\x94}\x94\x87\x94R\x94}\x94N}\x94tRN\x8c3('from_pandas-ddc065084280667dd51853b144bdd4e8', 0)\x94NK\x02K\x00)}\x94t\x94."
dask_worker_1     | Traceback (most recent call last):
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
dask_worker_1     |     return pickle.loads(x)
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func
dask_worker_1     |     if cell_count >= 0 else
dask_worker_1     | TypeError: unorderable types: list() >= int()
dask_worker_1     | distributed.worker - WARNING - Could not deserialize task
dask_worker_1     | Traceback (most recent call last):
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 1113, in add_task
dask_worker_1     |     self.tasks[key] = _deserialize(function, args, kwargs, task)
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 573, in _deserialize
dask_worker_1     |     args = pickle.loads(args)
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
dask_worker_1     |     return pickle.loads(x)
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func
dask_worker_1     |     if cell_count >= 0 else
dask_worker_1     | TypeError: unorderable types: list() >= int()

Dask: 0.15.0 Distributed: 1.17.1 OS: Ubuntu 16.04.2 LTS

kuanb
  • 1,618
  • 2
  • 20
  • 42

2 Answers2

2

I suspect that you have a mismatch in your cloudpickle versions between workers and clients. You'll have to ensure that all of your workers and clients have the same software setup. You can try the following command to help:

client.get_versions(check=True)

I don't think that this includes cloudpickle in dask.distributed version 1.17.1 but should in all subsequent versions. (it works now in master)

MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • Thanks, `client.get_versions(check=True)` created the same error. – kuanb Jun 28 '17 at 00:57
  • 1
    Ah, ideally it would return an informative error telling you what versions are mismatched where. I guess this method itself depends on cloudpickle to function though. Distributed software environments are hard. Most large deployments I know of use something to help manage them to keep everything in sync. – MRocklin Jun 28 '17 at 00:58
  • Thanks. I have two other questions if possible. I ask them here as they may be related. Upon resolving the inconsistent dependencies, I encounter a Tornado error on get_versions (`distributed.utils - ERROR - Stream is closed: while trying to call remote method 'broadcast'`). If I skip running get_versions, I'll receive the following error: `distributed.utils - ERROR - ("('apply-29db5629d323ed627f7f91b2363edb30', 0)", 'tcp://10.0.0.248:39689')`. – kuanb Jun 28 '17 at 02:01
  • 1
    I suspect that some versions are still mismatched. You might try using an environment manager like conda or virtualenv and explicitly passing around an environment file. – MRocklin Jun 28 '17 at 13:57
1

As the other answer mentions, this is almost certainly a mismatch in software versions. I had the same problem.

I did several things to get it all working again. I was using dask_ec2, so I'll include those changes here, but I don't know how you're setting up your cluster.

First, as I was using ubuntu 16.04 locally, I imagined that it was more likely to have the same libraries etc if the distributed servers had the same version, but this had an issue (see https://github.com/dask/dask-ec2/issues/98 ). Summary: I modified dask_ec2/salt.py, setting it to download cherrypy==3.2.3 in the __install_salt_rest_api method (see the linked issue for more detail).

Second, I set dask_ec2 to use the newer versions of Anaconda. In dask_ec2/formulas/salt/conda/settings.sls, change the download_url lines to:

{% set download_url = 'https://repo.continuum.io/archive/Anaconda2-5.0.1-Linux-x86_64.sh' %}

{% set download_url = 'https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh' %}

Third, I ran an update on my own computer, to ensure my own libraries were up to date:

E.g. from: Upgrading all packages with pip

pip freeze --local | grep -v '^\-e' | cut -d = -f 1  | xargs -n1 pip install -U

and

conda update --all

I finally restarted the whole lot, and it worked fine.

lionfish
  • 33
  • 4