I have a folder (7.7GB) with multiple pandas dataframes stored in parquet file format. I need to load all these dataframes in a python dictionary, but since I only have 32GB of RAM, I use the .loc method to just load the data that I need.
When all the dataframes are loaded in memory in the python dictory, I create a common index from the indexes all of the data, then I reindex all the dataframes with the new index.
I developed two scripts to do this, the first one is in a classic sequential way, the second one is using Dask in oder to get some performance improvement from all the cores of my Threadripper 1920x.
Sequential code:
# Standard library imports
import os
import pathlib
import time
# Third party imports
import pandas as pd
# Local application imports
class DataProvider:
def __init__(self):
self.data = dict()
def load_parquet(self, source_dir: str, timeframe_start: str, timeframe_end: str) -> None:
t = time.perf_counter()
symbol_list = list(file for file in os.listdir(source_dir) if file.endswith('.parquet'))
# updating containers
for symbol in symbol_list:
path = pathlib.Path.joinpath(pathlib.Path(source_dir), symbol)
name = symbol.replace('.parquet', '')
self.data[name] = pd.read_parquet(path).loc[timeframe_start:timeframe_end]
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# building index
index = None
for symbol in self.data:
if index is not None:
index.union(self.data[symbol].index)
else:
index = self.data[symbol].index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindexing data
for symbol in self.data:
self.data[symbol] = self.data[symbol].reindex(index=index, method='pad').itertuples()
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
if __name__ == '__main__' or __name__ == 'builtins':
source = r'WindowsPath'
x = DataProvider()
x.load_parquet(source_dir=source, timeframe_start='2015', timeframe_end='2015')
Dask code:
# Standard library imports
import os
import pathlib
import time
# Third party imports
from dask.distributed import Client
import pandas as pd
# Local application imports
def __load_parquet__(directory, timeframe_start, timeframe_end):
return pd.read_parquet(directory).loc[timeframe_start:timeframe_end]
def __reindex__(new_index, df):
return df.reindex(index=new_index, method='pad').itertuples()
if __name__ == '__main__' or __name__ == 'builtins':
client = Client()
source = r'WindowsPath'
start = '2015'
end = '2015'
t = time.perf_counter()
file_list = [file for file in os.listdir(source) if file.endswith('.parquet')]
# build data
data = dict()
for file in file_list:
path = pathlib.Path.joinpath(pathlib.Path(source), file)
symbol = file.replace('.parquet', '')
data[symbol] = client.submit(__load_parquet__, path, start, end)
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# build index
index = None
for symbol in data:
if index is not None:
index.union(data[symbol].result().index)
else:
index = data[symbol].result().index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindex
for symbol in data:
data[symbol] = client.submit(__reindex__, index, data[symbol].result())
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
I found the results pretty weird.
Sequential code:
- max memory consumption during computations: 30.2GB
- memory consumption at the end of computations: 15.6GB
- total memory consumption (without Windows and others): 11.6GB
- Loaded data in 54.289 seconds.
- Built index in 0.428 seconds.
- Reindexed data in 9.666 seconds.
Dask code:
- max memory consumption during computations: 25.2GB
- memory consumption at the end of computations: 22.6GB
- total memory consumption (without Windows and others): 18.9GB
- Loaded data in 0.638 seconds.
- Built index in 27.541 seconds.
- Reindexed data in 30.179 seconds.
My questions:
- Why with Dask the memory consumption at the end of computation is so much higher?
- Why with Dask building the common index and reindexing all the dataframes takes so much time?
Also, when using the Dask code the console prints me the following error.
C:\Users\edit\Anaconda3\envs\edit\lib\site-packages\distribute\worker.py:901:UserWarning: Large object of size 5.41 MB detected in task graph:
(DatetimeIndex(['2015-01-02 09:30:00', '2015-01-02 ... s x 5 columns])
Consider scattering large objects ahead of time with client.scatter to reduce scheduler burden and keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
Even if the error suggestions are really good, I don't get what's wrong with my code. Why is it saying keep data on workers? I thought that with submit method I'm sending all the data to my client, and so the workers have an easy access to all the data. Thank you all for the help.