I am trying to use Dask in an effort to perform feature extraction on a very large dataset (feature extraction using tsfresh), however I am having trouble with very long processing times.
My data looks as follows.
I have it all stored in Parquet files on my hard-drive.
To begin with I import the data into a Dask dataframe using the following code.
import dask
from dask import dataframe as dd
df = dd.read_parquet("/Users/oskar/Library/Mobile Documents/com~apple~CloudDocs/Documents/Studies/BSc Sem 7/Bachelor Project/programs/python/data/*/data.parquet")
I then initialise a Dask cluster.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=8,
threads_per_worker=1,
scheduler_port=8786,
memory_limit='2GB')
cluster.scheduler_address
After that I start a Dask client.
from tsfresh.utilities.distribution import ClusterDaskDistributor
dask_distributor = ClusterDaskDistributor(address="127.0.0.1:8786")
dask_distributor.client
I then melt 'df'...
dfm = df.melt(id_vars=["id", "time"],
value_vars=['FP1-F7','F7-T7','T7-P7','P7-O1','FP1-F3','F3-C3','C3-P3','P3-O1','FP2-F4','F4-C4',
'C4-P4','P4-O2','FP2-F8','F8-T8','T8-P8','P8-O2','FZ-CZ','CZ-PZ','T7-FT9','FT9-FT10',
'FT10-T8'],
var_name="kind",
value_name="value")
... and group it.
dfm_grouped = dfm.groupby(["id", "kind"])
I then create an instance of 'dask_feature_extraction_on_chunk'.
from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk
from tsfresh.feature_extraction.settings import MinimalFCParameters
features = dask_feature_extraction_on_chunk(dfm_grouped,
column_id="id",
column_kind="kind",
column_sort="time",
column_value="value",
default_fc_parameters=MinimalFCParameters())
And I then finally try to categorize it. Now this is what takes absolutely forever. And I'm wondering if it is possible to speed up this process?
features = features.categorize(columns=["variable"])
After that I intend on resetting the index and pivoting the table; presumably this will take forever also.
features = features.reset_index(drop=True)
feature_table = features.pivot_table(index="id",
columns="variable",
values="value",
aggfunc="sum")
Not to mention the actual computation..
df_features = feature_table.compute()
Again - is there any way I can set up my Dask to allow for faster computation? My computer has 16GB of memory. Thank you.