5

I currently use Vaex to generate binned data for histograms and to decimate big time-series data. Essentially I reduce millions of time series points into a number of bins and compute the mean & max & min for each bin. I would like to compare Vaex (reading HDF files) and Dask (reading Parquet files) and to keep it 'out-of-core memory.

Update 3 (I've deleted previous updates):

Dask is 30% faster than Vaex for the 1st run but then Vaex 4.5 times faster with repeated runs. I believe Vaex gets this speed-up through memory mapping. Is there a way in Dask to improve the execution times of the repeated runs?

First, create some random data and generate some files, warning: this will generate 1.5GB of data.

import numpy as np
import vaex as vx
import pandas as pd
import dask.dataframe as dd
import os

#cwd = os.getcwd() # Change this to your directory for path to save hdf and parquet files 
cwd = r'F:\temp\DaskVaexx' # Write files to this directory.  Use a fast SSD for fast read calculations in Dask/Vaex

### Create random data
size = 20000000 # number of rows
scale = 1.
scaleLocal = 20
np.random.seed(0)
x_data = np.arange(size)
y_data = np.cumsum(np.random.randn(size)  * scale) + np.random.randn(size) * scaleLocal

np.random.seed(1)
scaleLocal2 = 3
y_data2 = np.cumsum(np.random.randn(size)  * scale) + np.random.randn(size) * scaleLocal2
df = pd.DataFrame({'t': x_data.astype(np.float32),'Channel1' : y_data.astype(np.float32),'Channel2' : y_data2.astype(np.float32)})
# df

#Create Dask dataframe
dask_df = dd.from_pandas(df, npartitions=1)

# Creat a Vaex dataset from pandas and then export to hdf5
dataVX = vx.from_pandas(df)
dataVX.export_hdf5(os.path.join(cwd, 'randomData.hdf'))

# Create a parquet folder and files from dask dataframe
dask_df.to_parquet(os.path.join(cwd, 'randomData.parquet'))

# Create a hdf file from dask dataframe
#dask_df.to_hdf(os.path.join(cwd, 'randomDataDask.hdf'), '/data')

Now do the Vaex and Dask processing:

import dask.dataframe as dd
import dask.array as da
import vaex as vx
import dask
import time
import os
import numpy as np
import pandas as pd

#

bins = 1000
minLimit = 0
maxLimit = 1000000
timeArrayName = 't'
column = 'Channel1'

# filePath = os.getcwd() # location of hdf and parquet data
filePath = r'F:\temp\DaskVaexx' # location of hdf and parquet data

# ------------------------------
# Vaex code

startTime = time.time()

dataVX = vx.open(os.path.join(filePath,r'randomData.hdf'))

#Calculate the min & max of a columnar dataset for each bin
minMaxVaexOutputArray = dataVX.minmax(column, binby=[timeArrayName],  shape=(bins,), limits=[minLimit,maxLimit])

VaexResults_df = pd.DataFrame(data = minMaxVaexOutputArray, columns = ['min','max'])

#Calculate the mean of a columnar dataset for each bin
VaexResults_df['mean'] = dataVX.mean(column, binby=[timeArrayName],  shape=(bins,), limits=[minLimit, maxLimit]) 

print('Vaex hdf computation time: ' + str(time.time() - startTime))

# dataVX.close_files() # option to close down the opened Vaex dataset
# ------------------------------

# ------------------------------
# Dask computation
startTime = time.time()

# Read parquet file or folder of files
parquet_dd = dd.read_parquet(os.path.join(filePath,r'randomData.parquet'))  

# Create a virtual column which assigns integers to the time signal according to its assigned bin
parquet_dd['timeGroups'] = parquet_dd[timeArrayName].where((parquet_dd[timeArrayName]>=minLimit) & (parquet_dd[timeArrayName]<maxLimit))   // ((maxLimit - minLimit ) / bins)

# Groupby using the virtual column
df3 = parquet_dd[column].groupby(parquet_dd['timeGroups']).aggregate(['min', 'max', 'mean'])

#Execute Dask and return results to a Pandas Dataframe
DaskResults_df = dask.compute(df3)[0]

print('Dask with parquet computation time: ' + str(time.time() - startTime))
Ophir Yoktan
  • 8,149
  • 7
  • 58
  • 106
DougR
  • 3,196
  • 1
  • 28
  • 29
  • 1
    Could you provide a sample of the data? How did you convert from hdf to parquet, and did you try dask with the hdf directly? – mdurant Jul 26 '18 at 16:27
  • @mdurant I have not tried dask with hdf directly. I have updated the post to generate some random data with a decent amount of rows. I would really appreciate you looking into this. – DougR Jul 26 '18 at 23:29
  • Running your code, I get an output for vaex of size (3, 1000) but the three dask outputs are pandas series of size (1000000,). – mdurant Jul 29 '18 at 21:48
  • @mdurant . Thanks for having a look at this. I have modified the code and updated the post above. I needed to add a 'where' filter on the groupby Dask command to get the same output array size. I have editted the comments above to update the new relative execution times. This is the important one "Dask is 30% faster than Vaex for the 1st run but then Vaex 4.5 times faster with repeated runs. I believe Vaex gets this speed-up through memory mapping. Is there a way in Dask to improve the execution times of the repeated runs? " – DougR Jul 30 '18 at 12:12
  • You could try `dask.persist()` run on various of the intermediate dataframes. – mdurant Jul 30 '18 at 13:33
  • Also, I suppose you are using the default scheduler, but you can try with the distributed one, sometimes better even on a single machine. – mdurant Jul 30 '18 at 15:32
  • Thanks, I will try your suggestions. Also, I have found that Dask is quicker with HDF5 than Parquet in this case. – DougR Jul 30 '18 at 16:10
  • 2
    It sounds like you are in a good position to write an answer to your own question with a summary of your findings. – mdurant Jul 30 '18 at 16:17

0 Answers0