Question
How do I use Dask Distributed to parallelizing reading a directory of files into individual DataFrames, which I then process with a custom function? Assume n-files is something like 100,000
Background
I'm new to Dask and not quite how to ask this (which terms to use, etc.), so here's a picture of what I'm trying to accomplish:
I have lots of small, individual .txt "ledger" files (e.g., line-delimited files with a timestamp and attribute values at the time of the timestamp) stored in HDFS.
In parallel, I'd like to ...
Read each file into a DataFrame (note: I'm not trying to combine all the files into one, big df!);
To each DataFrame, apply a custom function (see below); and then
Merge each result (return from the custom function) into a final object & save it back to HDFS.
It seems like nearly every answer I find (when Googling related terms) is about loading multiple files into a single data frame.
What I'm processing, the function I'm using
Each ledger file / DataFrame:
+---------+------+-------------------+-----+
| location|status| timestamp|wh_id|
+---------+------+-------------------+-----+
| PUTAWAY| I|2019-04-01 03:14:00| 20|
|PICKABLE1| X|2019-04-01 04:24:00| 20|
|PICKABLE2| X|2019-04-01 05:33:00| 20|
|PICKABLE2| A|2019-04-01 06:42:00| 20|
| HOTPICK| A|2019-04-10 05:51:00| 20|
| ICEXCEPT| A|2019-04-10 07:04:00| 20|
| ICEXCEPT| X|2019-04-11 09:28:00| 20|
+---------+------+-------------------+-----+
Analysis function:
from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp
def analyze(df):
columns_with_age = ("location", "status")
columns_without_age = ("wh_id")
# Get the most-recent values (from the last row of the df)
row_count = df.count()
last_row = df.collect()[row_count-1]
# Create an empty "final row" dictionary
final_row = {}
# For each column for which we want to calculate an age value ...
for c in columns_with_age:
# Initialize loop values
target_value = last_row.__getitem__(c)
final_row[c] = target_value
timestamp_at_lookback = last_row.__getitem__("timestamp")
look_back = 1
different = False
while not different:
previous_row = df.collect()[row_count - 1 - look_back]
if previous_row.__getitem__(c) == target_value:
timestamp_at_lookback = previous_row.__getitem__("timestamp")
look_back += 1
else:
different = True
# At this point, a difference has been found, so calculate the age
final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days
As such, the ledger data / DataFrame would reduce to (assuming the calculation was run on 2019-04-14):
{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }