4

I'm working with a Dask Cluster on GCP. I'm using this code to deploy it:

from dask_cloudprovider.gcp import GCPCluster
from dask.distributed import Client

enviroment_vars = {
    'EXTRA_PIP_PACKAGES': '"gcsfs"'
}

cluster = GCPCluster(
    n_workers=32,
    docker_image='daskdev/dask:2021.2.0',
    env_vars=enviroment_vars,
    network='my-network',
    #filesystem_size=150,
    machine_type='e2-standard-16',
    projectid='my-project-id',
    zone='us-central1-a',
    on_host_maintenance="MIGRATE"

client = Client(cluster)

Then I read csv files, with the following code:

import dask.dataframe as dd
import csv

col_dtypes = {
    'var1': 'float64',
    'var2': 'object',
    'var3': 'object',
    'var4': 'float64'
}

df = dd.read_csv('gs://my_bucket/files-*.csv', blocksize=None, dtype= col_dtypes)
df = df.persist()

Everything works fine, but when I try to do some queries, or calculation, I get an error. For instance this piece of code:

df.var1.value_counts().compute()

This is the output:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-14-711a7c21ed42> in <module>
----> 1 df.var1.value_counts().compute()

/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    279         dask.base.compute
    280         """
--> 281         (result,) = compute(self, traverse=False, **kwargs)
    282         return result
    283 

/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    561         postcomputes.append(x.__dask_postcompute__())
    562 
--> 563     results = schedule(dsk, keys, **kwargs)
    564     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    565 

/opt/conda/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2653                     should_rejoin = False
   2654             try:
-> 2655                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2656             finally:
   2657                 for f in futures.values():

/opt/conda/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1962             else:
   1963                 local_worker = None
-> 1964             return self.sync(
   1965                 self._gather,
   1966                 futures,

/opt/conda/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    836             return future
    837         else:
--> 838             return sync(
    839                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    840             )

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1827                             exc = CancelledError(key)
   1828                         else:
-> 1829                             raise exception.with_traceback(traceback)
   1830                         raise exc
   1831                     if errors == "skip":

/opt/conda/lib/python3.8/site-packages/dask/optimization.py in __call__()
    961         if not len(args) == len(self.inkeys):
    962             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 963         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    964 
    965     def __reduce__(self):

/opt/conda/lib/python3.8/site-packages/dask/core.py in get()
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

/opt/conda/lib/python3.8/site-packages/dask/core.py in _execute_task()
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/opt/conda/lib/python3.8/site-packages/dask/utils.py in apply()
     33 def apply(func, args, kwargs=None):
     34     if kwargs:
---> 35         return func(*args, **kwargs)
     36     else:
     37         return func(*args)

/opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py in apply_and_enforce()
   5474             return meta
   5475         if is_dataframe_like(df):
-> 5476             check_matching_columns(meta, df)
   5477             c = meta.columns
   5478         else:

/opt/conda/lib/python3.8/site-packages/dask/dataframe/utils.py in check_matching_columns()
    690 def check_matching_columns(meta, actual):
    691     # Need nan_to_num otherwise nan comparison gives False
--> 692     if not np.array_equal(np.nan_to_num(meta.columns), np.nan_to_num(actual.columns)):
    693         extra = methods.tolist(actual.columns.difference(meta.columns))
    694         missing = methods.tolist(meta.columns.difference(actual.columns))

/opt/conda/lib/python3.8/site-packages/pandas/core/generic.py in __getattr__()
   5268             or name in self._accessors
   5269         ):
-> 5270             return object.__getattribute__(self, name)
   5271         else:
   5272             if self._info_axis._can_hold_identifiers_and_holds_name(name):

pandas/_libs/properties.pyx in pandas._libs.properties.AxisProperty.__get__()

/opt/conda/lib/python3.8/site-packages/pandas/core/generic.py in __getattr__()
   5268             or name in self._accessors
   5269         ):
-> 5270             return object.__getattribute__(self, name)
   5271         else:
   5272             if self._info_axis._can_hold_identifiers_and_holds_name(name):

AttributeError: 'DataFrame' object has no attribute '_data'

The version of Pandas in my docker file is 1.0.1, so I already try upgrading Pandas (to version 1.2.2), but it didn't work, what am I doing wrong?

Alexandre Moraes
  • 3,892
  • 1
  • 6
  • 13

1 Answers1

1

My guess is that you have a version mismatch somewhere. What does client.get_versions(check=True) say?

MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • says: ``` /opt/conda/lib/python3.8/site-packages/distributed/client.py:3656: UserWarning: Mismatched versions found | Package | client | scheduler | workers | +---------+---------------+---------------+---------------+ | blosc | 1.10.2 | 1.9.2 | 1.9.2 | | lz4 | 3.1.3 | 3.1.1 | 3.1.1 | | msgpack | 1.0.2 | 1.0.0 | 1.0.0 | | python | 3.8.6.final.0 | 3.8.0.final.0 | 3.8.0.final.0 | ``` – Paula Vallejo Feb 26 '21 at 16:09
  • looks awful my previous answer... but you are right, I have a version mismatch, how can I fix it? I've tried but I couldn't – Paula Vallejo Feb 26 '21 at 16:13
  • Hrm, that seems fine. I'm surprised. I expected this to be a pandas or dask version mismatch. I recommend raising an issue on github. – MRocklin Feb 26 '21 at 16:13
  • Do you know why in the Dask Dockerfile the version of Pandas is 1.0.1 instead of 1.2.2? Should I upgrade it or maintain it? – Paula Vallejo Feb 26 '21 at 17:51