2

I have a folder (7.7GB) with multiple pandas dataframes stored in parquet file format. I need to load all these dataframes in a python dictionary, but since I only have 32GB of RAM, I use the .loc method to just load the data that I need.

When all the dataframes are loaded in memory in the python dictory, I create a common index from the indexes all of the data, then I reindex all the dataframes with the new index.

I developed two scripts to do this, the first one is in a classic sequential way, the second one is using Dask in oder to get some performance improvement from all the cores of my Threadripper 1920x.

Sequential code:

# Standard library imports
import os
import pathlib
import time

# Third party imports
import pandas as pd

# Local application imports


class DataProvider:

def __init__(self):

    self.data = dict()

def load_parquet(self, source_dir: str, timeframe_start: str, timeframe_end: str) -> None:

    t = time.perf_counter()

    symbol_list = list(file for file in os.listdir(source_dir) if file.endswith('.parquet'))

    # updating containers
    for symbol in symbol_list:

        path = pathlib.Path.joinpath(pathlib.Path(source_dir), symbol)
        name = symbol.replace('.parquet', '')

        self.data[name] = pd.read_parquet(path).loc[timeframe_start:timeframe_end]

    print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')

    t = time.perf_counter()

    # building index
    index = None

    for symbol in self.data:

        if index is not None:
            index.union(self.data[symbol].index)
        else:
            index = self.data[symbol].index

    print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')

    t = time.perf_counter()

    # reindexing data
    for symbol in self.data:

        self.data[symbol] = self.data[symbol].reindex(index=index, method='pad').itertuples()

    print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')


if __name__ == '__main__' or __name__ == 'builtins':

    source = r'WindowsPath'

    x = DataProvider()
    x.load_parquet(source_dir=source, timeframe_start='2015', timeframe_end='2015')

Dask code:

# Standard library imports
import os
import pathlib
import time

# Third party imports
from dask.distributed import Client
import pandas as pd

# Local application imports


def __load_parquet__(directory, timeframe_start, timeframe_end):
    return pd.read_parquet(directory).loc[timeframe_start:timeframe_end]


def __reindex__(new_index, df):
    return df.reindex(index=new_index, method='pad').itertuples()


if __name__ == '__main__' or __name__ == 'builtins':

    client = Client()

    source = r'WindowsPath'
    start = '2015'
    end = '2015'

    t = time.perf_counter()

    file_list = [file for file in os.listdir(source) if file.endswith('.parquet')]

    # build data
    data = dict()
    for file in file_list:

        path = pathlib.Path.joinpath(pathlib.Path(source), file)
        symbol = file.replace('.parquet', '')

        data[symbol] = client.submit(__load_parquet__, path, start, end)

    print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')

    t = time.perf_counter()

    # build index
    index = None
    for symbol in data:
        if index is not None:
            index.union(data[symbol].result().index)
        else:
            index = data[symbol].result().index

    print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')

    t = time.perf_counter()

    # reindex
    for symbol in data:
        data[symbol] = client.submit(__reindex__, index, data[symbol].result())

    print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')

I found the results pretty weird.

Sequential code:

  • max memory consumption during computations: 30.2GB
  • memory consumption at the end of computations: 15.6GB
  • total memory consumption (without Windows and others): 11.6GB
  • Loaded data in 54.289 seconds.
  • Built index in 0.428 seconds.
  • Reindexed data in 9.666 seconds.

Dask code:

  • max memory consumption during computations: 25.2GB
  • memory consumption at the end of computations: 22.6GB
  • total memory consumption (without Windows and others): 18.9GB
  • Loaded data in 0.638 seconds.
  • Built index in 27.541 seconds.
  • Reindexed data in 30.179 seconds.

My questions:

  1. Why with Dask the memory consumption at the end of computation is so much higher?
  2. Why with Dask building the common index and reindexing all the dataframes takes so much time?

Also, when using the Dask code the console prints me the following error.

C:\Users\edit\Anaconda3\envs\edit\lib\site-packages\distribute\worker.py:901:UserWarning: Large object of size 5.41 MB detected in task graph: 
(DatetimeIndex(['2015-01-02 09:30:00', '2015-01-02 ... s x 5 columns])
Consider scattering large objects ahead of time with client.scatter to reduce  scheduler burden and keep data on workers
future = client.submit(func, big_data)    # bad
big_future = client.scatter(big_data)     # good
future = client.submit(func, big_future)  # good
% (format_bytes(len(b)), s))

Even if the error suggestions are really good, I don't get what's wrong with my code. Why is it saying keep data on workers? I thought that with submit method I'm sending all the data to my client, and so the workers have an easy access to all the data. Thank you all for the help.

ilpomo
  • 657
  • 2
  • 5
  • 19
  • 2
    why not use dask's [`read_parquet`](http://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.read_parquet) with the `filters` argument ? – moshevi Nov 12 '18 at 08:26
  • There are a few reasons why I'm not using dask.dataframes: 1) The documentations says ''This reads a directory of Parquet data into a Dask.dataframe, one file per partition.''. I'm not loading and entire folder as a single big dataframe, but single dataframes for each file in that folder. 2) I don't understand how to apply filters. How can I write something like dd.read_parquet(path=path, filters=[data.index = '2015')]? – ilpomo Nov 12 '18 at 09:33
  • Update: using any filter will raise NotImplementedError: Predicate pushdown not implemented. Looks like the filter option is not implemented in dask 0.20. – ilpomo Nov 12 '18 at 09:56
  • 2
    they are implemented, however I think only for fastparquet. are you using pyarrow ? – moshevi Nov 12 '18 at 10:00
  • 1
    please also note this [answer](https://stackoverflow.com/a/51245341/9253013) about the `filters` argument, – moshevi Nov 12 '18 at 10:03
  • I am using pyarrow. Why should I use fastparquet? What's the difference between the two? – ilpomo Nov 12 '18 at 10:08
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/183491/discussion-between-moshevi-and-ilpomo). – moshevi Nov 12 '18 at 10:14

1 Answers1

-1

I am not an expert at all, just try to help. you might want to try not using time.perf_counter , see if that changes anything.