Here is another approach, but using list comprehension to slice by the index, and with verification (at the end) that the slicing is done correctly.
Imports
from datetime import datetime
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask import compute
Specify adjustable inputs
# Start date from which to create dummy data to use
data_start_date = "1700-01-01"
# Frequency of dummy data created (hourly)
data_freq = "H"
# number of rows of data to generate
nrows = 3_000_000
# Dask DataFrame chunk size; will be used later to determine how many files
# (of the dummy data generated here) will be exported to disk
chunksize = 75_000
Generate df1
with slicing boundary dates
df1 = pd.DataFrame.from_records(
[
{"start": datetime(1850, 1, 6, 0, 0, 0), "end": datetime(1870, 9, 4, 23, 0, 0)},
{"start": datetime(1880, 7, 6, 0, 0, 0), "end": datetime(1895, 4, 9, 23, 0, 0)},
{"start": datetime(1910, 11, 25, 0, 0, 0), "end": datetime(1915, 5, 5, 23, 0, 0)},
{"start": datetime(1930, 10, 8, 0, 0, 0), "end": datetime(1940, 2, 8, 23, 0, 0)},
{"start": datetime(1945, 9, 9, 0, 0, 0), "end": datetime(1950, 1, 3, 23, 0, 0)},
]
)
print(df1)
start end
0 1850-01-06 1870-09-04 23:00:00
1 1880-07-06 1895-04-09 23:00:00
2 1910-11-25 1915-05-05 23:00:00
3 1930-10-08 1940-02-08 23:00:00
4 1945-09-09 1950-01-03 23:00:00
Create dummy data
- we'll assign a column here named
wanted
with all rows being False
df = pd.DataFrame(
np.random.rand(nrows),
index=pd.date_range(data_start_date, periods=nrows, freq="h"),
columns=["value1"],
)
df.index.name = "date"
df["wanted"] = False
print(df.head())
value1 wanted
date
1700-01-01 00:00:00 0.504119 False
1700-01-01 01:00:00 0.582796 False
1700-01-01 02:00:00 0.383905 False
1700-01-01 03:00:00 0.995389 False
1700-01-01 04:00:00 0.592130 False
Now, we'll change the wanted rows to True
if the rows have the same dates as those in df1
- the reason for doing this is so that we can check later that our slicing is correct
- this step and the
wanted
column are not necessary in your real use-case, but are only required to check our work
for _, row in df1.iterrows():
df.loc[row['start']: row['end'], "wanted"] = True
df = df.reset_index()
print(df.head())
print(df["wanted"].value_counts().to_frame())
date value1 wanted
0 1700-01-01 00:00:00 0.504119 False
1 1700-01-01 01:00:00 0.582796 False
2 1700-01-01 02:00:00 0.383905 False
3 1700-01-01 03:00:00 0.995389 False
4 1700-01-01 04:00:00 0.592130 False
wanted
False 2530800
True 469200
Note that calling .value_counts()
on the wanted
column shows the number of True
values in this column that we should expect if we've correctly sliced our data. This was done using data in a pandas.DataFrame
, but later we'll do this with this same data in a dask.DataFrame
.
Now, we'll export the data to multiple .parquet
files locally
- often, we'll want to start with the data loaded directly from disk into
dask
- to export the data to multiple
.parquet
flies, we'll convert the pandas.DataFrame
to a dask.DataFrame
and then set the chunksize
parameter which will determine how many files are created (chunksize
rows will be placed in each exported file - source)
ddf = dd.from_pandas(df, chunksize=chunksize)
ddf.to_parquet("data", engine="auto")
Now load all the .parquet
files directly into a single dask.DataFrame
and set the date
column as the index
- it is computationally expensive to set the index, but we're only specifying this when reading the file directly into a
dask.DataFrame
and not changing it after that
ddf = dd.read_parquet(
"data/",
dtype={"value1": "float64"},
index="date",
parse_dates=["date"],
)
print(ddf)
Dask DataFrame Structure:
value1 wanted
npartitions=40
1700-01-01 00:00:00 float64 bool
1708-07-23 00:00:00 ... ...
... ... ...
2033-09-07 00:00:00 ... ...
2042-03-28 23:00:00 ... ...
Dask Name: read-parquet, 40 tasks
Now, we're ready to slice using the dates in df1
. We'll do this with list comprehension to iterate over each row in df1
, use the row to slice the data (in the dask.DataFrame
) and then call dd.concat
(as @joebeeson did)
slices = dd.concat([ddf.loc[row['start']: row['end']] for _, row in df1.iterrows()])
Finally, compute on this list of delayed dask
objects to get a single pandas.DataFrame
sliced to give the required dates
ddf_sliced_computed = compute(slices)[0].reset_index()
print(ddf_sliced_computed.head())
print(ddf_sliced_computed["wanted"].value_counts().to_frame())
date value1 wanted
0 1850-01-06 00:00:00 0.671781 True
1 1850-01-06 01:00:00 0.455022 True
2 1850-01-06 02:00:00 0.490212 True
3 1850-01-06 03:00:00 0.240171 True
4 1850-01-06 04:00:00 0.162088 True
wanted
True 469200
As you can see, we've sliced out rows with the correct number of True
values in the wanted
column. We can explicitly verify this using the pandas.DataFrame
that we used earlier to generate the dummy data that was later written to disk
assert all(ddf_sliced_computed["wanted"] == True)
assert (
df[df["wanted"] == True]
.reset_index(drop=True)
.equals(ddf_sliced_computed[ddf_sliced_computed["wanted"] == True])
)
Notes
- This uses 3M rows. You are working with 30M rows, so you'll have to modify the dummy data generated at the start if you want to check timings, etc.