When using current version of dask ('0.7.5', github: [a1]) due to large size of data, I was able to perform partitioned calculations by means of dask.dataframe api. But for a large DataFrame that was stored as record in bcolz ('0.12.1', github: [a2]) I got an IndexError when doing this:
import dask.dataframe as dd
import bcolz
ctable = bcolz.open('test.bcolz', mode='r')
df_dd = dd.from_bcolz(ctable, chunksize=int(1E6))
# some calculations
z1 = (df_dd[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
z2 = (df_dd[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)
df_dd_out = dd.concat([z1.to_frame('z1'), z2.to_frame('z2')], axis=1)
# actual computation
df_dd_out.compute()
Error was (abbreviated traceback output):
# ...
File "/usr/local/lib/python3.5/dist-packages/dask/async.py", line 481, in get_async
raise(remote_exception(res, tb))
dask.async.IndexError: index out of bounds
Actually the error was only there when doing the dd.concat action. Something like
out = (z1.to_frame('z1') + z2.to_frame('z2')).compute()
was working.
But also when parts of data were read in memory this error was in some cases there, at least for partition lengths (npartition) >1 and specific data sizes.
ctable_mem_b = ctable[:int(1E7)] # larger in-memory copy
df_dd_mem_b = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b),
npartitions=10)
See full testing code _test_dask_error.py, and full output with tracebacks _test_out.txt.
Actually at that step I stopped my investigation, because I have no clue how to debug this error in async.py to the root cause. Sure I will report this as bug (if there is no hint to user/usage error). But: How to do the debugging to find the root cause?
_[a1]: _https://github.com/blaze/dask/tree/077b1b82ad03f855a960d252df2aaaa72b5b1cc5
_[a2]: _https://github.com/Blosc/bcolz/tree/562fd3092d1fee17372c11cadca54d1dab10cf9a