1

I am trying to use Dask in an effort to perform feature extraction on a very large dataset (feature extraction using tsfresh), however I am having trouble with very long processing times.

My data looks as follows.

pd dataframe

I have it all stored in Parquet files on my hard-drive.

To begin with I import the data into a Dask dataframe using the following code.

import dask
from dask import dataframe as dd

df = dd.read_parquet("/Users/oskar/Library/Mobile Documents/com~apple~CloudDocs/Documents/Studies/BSc Sem 7/Bachelor Project/programs/python/data/*/data.parquet")

I then initialise a Dask cluster.

from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=8,
                       threads_per_worker=1, 
                       scheduler_port=8786, 
                       memory_limit='2GB')

cluster.scheduler_address

After that I start a Dask client.

from tsfresh.utilities.distribution import ClusterDaskDistributor

dask_distributor = ClusterDaskDistributor(address="127.0.0.1:8786")

dask_distributor.client

I then melt 'df'...

dfm = df.melt(id_vars=["id", "time"],
              value_vars=['FP1-F7','F7-T7','T7-P7','P7-O1','FP1-F3','F3-C3','C3-P3','P3-O1','FP2-F4','F4-C4',
                          'C4-P4','P4-O2','FP2-F8','F8-T8','T8-P8','P8-O2','FZ-CZ','CZ-PZ','T7-FT9','FT9-FT10',
                          'FT10-T8'],
              var_name="kind",
              value_name="value")

... and group it.

dfm_grouped = dfm.groupby(["id", "kind"])

I then create an instance of 'dask_feature_extraction_on_chunk'.

from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk
from tsfresh.feature_extraction.settings import MinimalFCParameters

features = dask_feature_extraction_on_chunk(dfm_grouped, 
                                            column_id="id", 
                                            column_kind="kind",
                                            column_sort="time",
                                            column_value="value", 
                                            default_fc_parameters=MinimalFCParameters())

And I then finally try to categorize it. Now this is what takes absolutely forever. And I'm wondering if it is possible to speed up this process?

features = features.categorize(columns=["variable"])

After that I intend on resetting the index and pivoting the table; presumably this will take forever also.

features = features.reset_index(drop=True)

feature_table = features.pivot_table(index="id",
                                     columns="variable",
                                     values="value",
                                     aggfunc="sum")

Not to mention the actual computation..

df_features = feature_table.compute()

Again - is there any way I can set up my Dask to allow for faster computation? My computer has 16GB of memory. Thank you.

Oskar
  • 27
  • 4
  • what does `dask_feature_extraction_on_chunk` return? – Paul H Oct 28 '22 at 19:25
  • 1
    presumably there's an implicit `.compute` called when you use `.catgorize`, and so all of the lazy functions and methods that were called before that have to be executed. it would be interesting to see the performance if you wrote the output of `dask_feature_extraction_on_chunk` directly to a parquet file, read that file, and then measured the performance of `.categorize` in isolation – Paul H Oct 28 '22 at 19:29
  • yes exactly - see the docs for [dask.dataframe Categoricals](//docs.dask.org/en/stable/dataframe-categoricals.html). Calling `.categorize` triggers a compute of the full pipeline in order to get the set of categories. what's more - this doesn't result in persisting or computing the dataframe, so any subsequent operations would need to redo the previous steps once a compute was triggered. to schedule a lazy categorization, use `.astype({'mycol': 'category', ...})`. this won't offer some of the optimizations available when categories are known, but it will preserve the laziness of the operation. – Michael Delgado Oct 28 '22 at 19:41
  • generally, though, you're falling prey to a common misconception for new users. when you call pd.read_parquet, or any of the subsequent operations, you're only scheduling the operation. you're not actually doing the work. so at some point when you hit a blocking operation like plot, write, or `.categorize()`, it suddenly takes a long time, and that's because you're doing everything you've scheduled. – Michael Delgado Oct 28 '22 at 19:45

0 Answers0