1

I have a data set that contains a timeseries per file. I am really happy how dask handles ~1k files on our cluster (one directory in my case). But I have around 50 directories.

The funny thing that happens is, that the building the dask graph seems to consume more memory and CPU than the actual problem. This being only on the scheduler. The following bare minimum code should only create the graph, but seems to do already a lot of pandas stuff on the scheduler:

    df=intake.open_csv(TRAIN_PATH+"{folder_name}/{file_name}.csv",csv_kwargs={"dtype": dtypes.to_dict()}).to_dask()
    features=df.groupby(['folder_name','file_name']).agg(["min","max"])

Note: I am using intake for the patterns here. I have been also using read_csv from dask with include_path_column=True and path as group. I managed to make the above steps faster, but then features.compute() seems to expand the graph leading effectively to the same situation, that the scheduler hangs before the cluster starts running.

The easiest would be to actually do use a dask antipattern and do a loop. However I wonder, if that can be done better (it is for educational purposes, so style and simplicity is important)

Is there a nice way to read many files in one graph without making the graph size scale beyond linear.

till
  • 570
  • 1
  • 6
  • 22
  • Actually what makes it easier is the fact, that the aggregation always only runs on one file. However, two small things I wanted to add: it would be great if dask could handle parallism independent of the number of workers. We had folder-based chunks, but the folders are of different size, why some workers ran empty early. And again not exploiting to much of the specificity would be great, because the idea was to have an educational example. – till Oct 24 '20 at 11:51
  • This detail is quite detailed and hard to answer here without further delving. I suggest you might want to play with rechunk and/or set_index. – mdurant Oct 29 '20 at 17:36
  • can you elaborate on this. The problem seems that dask generates 50k initial read nodes. How can this be avoided here. How do I get a dask_array from read_csv? (the answer below using a bag, actually generates partitions and works), however does not interact well with dataframes. – till Nov 07 '20 at 13:34

1 Answers1

0

This is a solution I came up with using map_partitions, that seems to generate a decent graph.

import dask.bag as db

def get_features(file):
    data= pd.read_csv(file)
    data["path"]=file
    return data.groupby("path").agg(["min","max"])

csvs=db.from_sequence(files)
features=csvs.map_partitions(lambda x: [get_features(f) for f in x]).\
reduction(pd.concat,pd.concat).compute()

The solution does not generalize beyond the use case ,though: e.g. if features would span more than one file. It also does not build a dask dataframe: that would be bad, if there are too many groups per file.

till
  • 570
  • 1
  • 6
  • 22
  • Note that you can make a bag into a dask dataframe, if the contents are pandas dataframes (as is the case for you). Or you could use a delayed function per back of files to produce a pandas dataframe and dd.from_delayed to make a dask dataframe. – mdurant Oct 29 '20 at 17:33
  • Actually I think the data needs to be a dictionary to generates a dataframe from a bag. I saw the answer on the other stackoverflow post using from_delayed, but this again creates huge graphs, when I tried. – till Nov 07 '20 at 13:37