2

There is already a nice question about it in SO but the best answer is now 5years old, So I think there should be better option(s) in 2018.

I am currently looking for a feature engineering pipeline for larger than memory dataset (using suitable dtypes).

The initial file is a csv that doesn't fit in memory. Here are my needs:

  1. Create features (mainly using groupby operations on multiple columns.)
  2. Merge the new feature to the previous data (on disk because it doesn't fit in memory)
  3. Use a subset (or all) columns/index for some ML applications
  4. Repeat 1/2/3 (This is an iterative process like day1: create 4 features, day2: create 4 more ...)

Attempt with parquet and dask:

First, I splitted the big csv file in multiple small "parquet" files. With this, dask is very efficient for the calculation of new features but then, I need to merge them to the initial dataset and atm, we cannot add new columns to parquet files. Reading the csv by chunk, merging and resaving to multiple parquet files is too time consuming as feature engineering is an iterative process in this project.

Attempt with HDF and dask:

I then turned to HDF because we can add columns and also use special queries and it is still a binary file storage. Once again I splitted the big csv file to multiple HDF with the same key='base' for the base features, in order to use the concurrent writing with DASK (not allowed by HDF).

data = data.repartition(npartitions=10) # otherwise it was saving 8Mo files using to_hdf
data.to_hdf('./hdf/data-*.hdf', key='base', format='table', data_columns=['day'], get=dask.threaded.get)

(Annex quetion: specifying data_columns seems useless for dask as there is no "where" in dask.read_hdf?)

Unlike what I expected, I am not able to merge the new feature to the multiples small files with code like this:

data = dd.read_hdf('./hdf/data-*.hdf', key='base')
data['day_pow2'] = data['day']**2
data['day_pow2'].to_hdf('./hdf/data-*.hdf', key='added', get=dask.threaded.get) 

with dask.threaded I get "python stopped working" after 2%. With dask.multiprocessing.get it takes forever and create new files

What are the most appropriated tools (storage and processing) for this workflow?

Community
  • 1
  • 1
Florian Mutel
  • 1,044
  • 1
  • 6
  • 13

2 Answers2

1

I will just make a copy of a comment from the related issue on fastparquet: it is technically possible to add columns to existing parquet data-sets, but this is not implemented in fastparquet and possibly not in any other parquet implementation either.

Making code to do this might not be too onerous (but it is not currently planned): the calls to write columns happen sequentially, so new columns for writing would need to percolate down to this function, together with the file position corresponding to the current first byte of the metadata in the footer. I addition, the schema would need to be updated separately (this is simple). The process would need to be repeated for every file of a data-set. This is not an "answer" to the question, but perhaps someone fancies taking on the task.

mdurant
  • 27,272
  • 5
  • 45
  • 74
0

I would seriously consider using database (indexed access) as a storage or even using Apache Spark (for processing data in a distributed / clustered way) and Hive / Impala as a backend ...

MaxU - stand with Ukraine
  • 205,989
  • 36
  • 386
  • 419
  • I am working in local setup, no cluster access. Would SQLite be suitable ? I am afraid about the read/write time – Florian Mutel Mar 29 '18 at 13:11
  • @FlorianMutel, i would try at least MySQL - it's more appropriate for the DB that doesn't fit into your RAM... – MaxU - stand with Ukraine Mar 29 '18 at 14:59
  • postgresql seem the most common for open-source solutions. However, you are adding columns, i.e., evolving the schema, which is a complex operation for traditional databases. – mdurant Apr 01 '18 at 19:57