I currently use Vaex to generate binned data for histograms and to decimate big time-series data. Essentially I reduce millions of time series points into a number of bins and compute the mean & max & min for each bin. I would like to compare Vaex (reading HDF files) and Dask (reading Parquet files) and to keep it 'out-of-core memory.
Update 3 (I've deleted previous updates):
Dask is 30% faster than Vaex for the 1st run but then Vaex 4.5 times faster with repeated runs. I believe Vaex gets this speed-up through memory mapping. Is there a way in Dask to improve the execution times of the repeated runs?
First, create some random data and generate some files, warning: this will generate 1.5GB of data.
import numpy as np
import vaex as vx
import pandas as pd
import dask.dataframe as dd
import os
#cwd = os.getcwd() # Change this to your directory for path to save hdf and parquet files
cwd = r'F:\temp\DaskVaexx' # Write files to this directory. Use a fast SSD for fast read calculations in Dask/Vaex
### Create random data
size = 20000000 # number of rows
scale = 1.
scaleLocal = 20
np.random.seed(0)
x_data = np.arange(size)
y_data = np.cumsum(np.random.randn(size) * scale) + np.random.randn(size) * scaleLocal
np.random.seed(1)
scaleLocal2 = 3
y_data2 = np.cumsum(np.random.randn(size) * scale) + np.random.randn(size) * scaleLocal2
df = pd.DataFrame({'t': x_data.astype(np.float32),'Channel1' : y_data.astype(np.float32),'Channel2' : y_data2.astype(np.float32)})
# df
#Create Dask dataframe
dask_df = dd.from_pandas(df, npartitions=1)
# Creat a Vaex dataset from pandas and then export to hdf5
dataVX = vx.from_pandas(df)
dataVX.export_hdf5(os.path.join(cwd, 'randomData.hdf'))
# Create a parquet folder and files from dask dataframe
dask_df.to_parquet(os.path.join(cwd, 'randomData.parquet'))
# Create a hdf file from dask dataframe
#dask_df.to_hdf(os.path.join(cwd, 'randomDataDask.hdf'), '/data')
Now do the Vaex and Dask processing:
import dask.dataframe as dd
import dask.array as da
import vaex as vx
import dask
import time
import os
import numpy as np
import pandas as pd
#
bins = 1000
minLimit = 0
maxLimit = 1000000
timeArrayName = 't'
column = 'Channel1'
# filePath = os.getcwd() # location of hdf and parquet data
filePath = r'F:\temp\DaskVaexx' # location of hdf and parquet data
# ------------------------------
# Vaex code
startTime = time.time()
dataVX = vx.open(os.path.join(filePath,r'randomData.hdf'))
#Calculate the min & max of a columnar dataset for each bin
minMaxVaexOutputArray = dataVX.minmax(column, binby=[timeArrayName], shape=(bins,), limits=[minLimit,maxLimit])
VaexResults_df = pd.DataFrame(data = minMaxVaexOutputArray, columns = ['min','max'])
#Calculate the mean of a columnar dataset for each bin
VaexResults_df['mean'] = dataVX.mean(column, binby=[timeArrayName], shape=(bins,), limits=[minLimit, maxLimit])
print('Vaex hdf computation time: ' + str(time.time() - startTime))
# dataVX.close_files() # option to close down the opened Vaex dataset
# ------------------------------
# ------------------------------
# Dask computation
startTime = time.time()
# Read parquet file or folder of files
parquet_dd = dd.read_parquet(os.path.join(filePath,r'randomData.parquet'))
# Create a virtual column which assigns integers to the time signal according to its assigned bin
parquet_dd['timeGroups'] = parquet_dd[timeArrayName].where((parquet_dd[timeArrayName]>=minLimit) & (parquet_dd[timeArrayName]<maxLimit)) // ((maxLimit - minLimit ) / bins)
# Groupby using the virtual column
df3 = parquet_dd[column].groupby(parquet_dd['timeGroups']).aggregate(['min', 'max', 'mean'])
#Execute Dask and return results to a Pandas Dataframe
DaskResults_df = dask.compute(df3)[0]
print('Dask with parquet computation time: ' + str(time.time() - startTime))