When I run essentially the same calculations with dask against zarr data and parquet data, the zarr-based calculations are significantly faster. Why? Is it maybe because I did something wrong when I created the parquet files?
I've replicated the issue with fake data (see below) in a jupyter notebook to illustrate the kind of behavior I'm seeing. I'd appreciate any insight anyone has into why the zarr-based calculation is orders of magnitude faster than the parquet-based calculation.
The data I'm working with in real life is earth science model data. The particular data parameters are not important, but each parameter can be thought of as an array with latitude, longitude, and time dimensions.
To generate zarr files, I simply write out the multi-dimensional structure of my parameter and its dimensions.
To generate parquet, I first "flatten" the 3-D parameter array into a 1-D array, which becomes a single column in my data frame. I then add latitude, longitude, and time columns before writing the data frame out as parquet.
This cell has all the imports needed for the rest of the code:
import pandas as pd
import numpy as np
import xarray as xr
import dask
import dask.array as da
import intake
from textwrap import dedent
This cell generates the fake data files, which total a bit more than 3 Gigabytes in size:
def build_data(lat_resolution, lon_resolution, ntimes):
"""Build a fake geographical dataset with ntimes time steps and
resolution lat_resolution x lon_resolution"""
lats = np.linspace(-90.0+lat_resolution/2,
90.0-lat_resolution/2,
np.round(180/lat_resolution))
lons = np.linspace(-180.0+lon_resolution/2,
180-lon_resolution/2,
np.round(360/lon_resolution))
times = np.arange(start=1,stop=ntimes+1)
data = np.random.randn(len(lats),len(lons),len(times))
return lats,lons,times,data
def create_zarr_from_data_set(lats,lons,times,data,zarr_dir):
"""Write zarr from a data set corresponding to the data passed in."""
dar = xr.DataArray(data,
dims=('lat','lon','time'),
coords={'lat':lats,'lon':lons,'time':times},
name="data")
ds = xr.Dataset({'data':dar,
'lat':('lat',lats),
'lon':('lon',lons),
'time':('time',times)})
ds.to_zarr(zarr_dir)
def create_parquet_from_data_frame(lats,lons,times,data,parquet_file):
"""Write a parquet file from a dataframe corresponding to the data passed in."""
total_points = len(lats)*len(lons)*len(times)
# Flatten the data array
data_flat = np.reshape(data,(total_points,1))
# use meshgrid to create the corresponding latitude, longitude, and time
# columns
mesh = np.meshgrid(lats,lons,times,indexing='ij')
lats_flat = np.reshape(mesh[0],(total_points,1))
lons_flat = np.reshape(mesh[1],(total_points,1))
times_flat = np.reshape(mesh[2],(total_points,1))
df = pd.DataFrame(data = np.concatenate((lats_flat,
lons_flat,
times_flat,
data_flat),axis=1),
columns = ["lat","lon","time","data"])
df.to_parquet(parquet_file,engine="fastparquet")
def create_fake_data_files():
"""Create zarr and parquet files with fake data"""
zarr_dir = "zarr"
parquet_file = "data.parquet"
lats,lons,times,data = build_data(0.1,0.1,31)
create_zarr_from_data_set(lats,lons,times,data,zarr_dir)
create_parquet_from_data_frame(lats,lons,times,data,parquet_file)
with open("data_catalog.yaml",'w') as f:
catalog_str = dedent("""\
sources:
zarr:
args:
urlpath: "./{}"
description: "data in zarr format"
driver: intake_xarray.xzarr.ZarrSource
metadata: {{}}
parquet:
args:
urlpath: "./{}"
description: "data in parquet format"
driver: parquet
""".format(zarr_dir,parquet_file))
f.write(catalog_str)
##
# Generate the fake data
##
create_fake_data_files()
I ran several different kinds of calculations against the parquet and zarr files, but for simplicity in this example, I'll just pull a single parameter value out at a particular time, latitude, and longitude.
This cell builds the zarr and parquet directed acyclic graphs (DAGs) for the calculation:
# pick some arbitrary point to pull out of the data
lat_value = -0.05
lon_value = 10.95
time_value = 5
# open the data
cat = intake.open_catalog("data_catalog.yaml")
data_zarr = cat.zarr.to_dask()
data_df = cat.parquet.to_dask()
# build the DAG for getting a single point out of the zarr data
time_subset = data_zarr.where(data_zarr.time==time_value,drop=True)
lat_condition = da.logical_and(time_subset.lat < lat_value + 1e-9, time_subset.lat > lat_value - 1e-9)
lon_condition = da.logical_and(time_subset.lon < lon_value + 1e-9, time_subset.lon > lon_value - 1e-9)
geo_condition = da.logical_and(lat_condition,lon_condition)
zarr_subset = time_subset.where(geo_condition,drop=True)
# build the DAG for getting a single point out of the parquet data
parquet_subset = data_df[(data_df.lat > lat_value - 1e-9) &
(data_df.lat < lat_value + 1e-9) &
(data_df.lon > lon_value - 1e-9) &
(data_df.lon < lon_value + 1e-9) &
(data_df.time == time_value)]
When I run time against the compute for each of the DAGs, I get wildly different times. The zarr-based subset takes less than a second. The parquet-based subset takes 15-30 seconds.
This cell does the zarr-based calculation:
%%time
zarr_point = zarr_subset.compute()
Zarr-based calculation time:
CPU times: user 6.19 ms, sys: 5.49 ms, total: 11.7 ms
Wall time: 12.8 ms
This cell does the parquet-based calculation:
%%time
parquet_point = parquet_subset.compute()
Parquet-based calculation time:
CPU times: user 18.2 s, sys: 28.1 s, total: 46.2 s
Wall time: 29.3 s
As you can see, the zarr-based calculation is much, much faster. Why?