I have a data set that contains a timeseries per file. I am really happy how dask handles ~1k files on our cluster (one directory in my case). But I have around 50 directories.
The funny thing that happens is, that the building the dask graph seems to consume more memory and CPU than the actual problem. This being only on the scheduler. The following bare minimum code should only create the graph, but seems to do already a lot of pandas stuff on the scheduler:
df=intake.open_csv(TRAIN_PATH+"{folder_name}/{file_name}.csv",csv_kwargs={"dtype": dtypes.to_dict()}).to_dask()
features=df.groupby(['folder_name','file_name']).agg(["min","max"])
Note: I am using intake for the patterns here. I have been also using read_csv
from dask with include_path_column=True
and path
as group. I managed to make the above steps faster, but then features.compute()
seems to expand the graph leading effectively to the same situation, that the scheduler hangs before the cluster starts running.
The easiest would be to actually do use a dask antipattern and do a loop. However I wonder, if that can be done better (it is for educational purposes, so style and simplicity is important)
Is there a nice way to read many files in one graph without making the graph size scale beyond linear.