2

Question

How do I use Dask Distributed to parallelizing reading a directory of files into individual DataFrames, which I then process with a custom function? Assume n-files is something like 100,000

Background

I'm new to Dask and not quite how to ask this (which terms to use, etc.), so here's a picture of what I'm trying to accomplish:

Overview

I have lots of small, individual .txt "ledger" files (e.g., line-delimited files with a timestamp and attribute values at the time of the timestamp) stored in HDFS.

In parallel, I'd like to ...

  1. Read each file into a DataFrame (note: I'm not trying to combine all the files into one, big df!);

  2. To each DataFrame, apply a custom function (see below); and then

  3. Merge each result (return from the custom function) into a final object & save it back to HDFS.

It seems like nearly every answer I find (when Googling related terms) is about loading multiple files into a single data frame.

What I'm processing, the function I'm using

Each ledger file / DataFrame:

+---------+------+-------------------+-----+
| location|status|          timestamp|wh_id|
+---------+------+-------------------+-----+
|  PUTAWAY|     I|2019-04-01 03:14:00|   20|
|PICKABLE1|     X|2019-04-01 04:24:00|   20|
|PICKABLE2|     X|2019-04-01 05:33:00|   20|
|PICKABLE2|     A|2019-04-01 06:42:00|   20|
|  HOTPICK|     A|2019-04-10 05:51:00|   20|
| ICEXCEPT|     A|2019-04-10 07:04:00|   20|
| ICEXCEPT|     X|2019-04-11 09:28:00|   20|
+---------+------+-------------------+-----+

Analysis function:

from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp

def analyze(df):

  columns_with_age = ("location", "status")
  columns_without_age = ("wh_id")

  # Get the most-recent values (from the last row of the df)
  row_count = df.count()
  last_row = df.collect()[row_count-1]

  # Create an empty "final row" dictionary
  final_row = {}

  # For each column for which we want to calculate an age value ...
  for c in columns_with_age:

      # Initialize loop values
      target_value = last_row.__getitem__(c)
      final_row[c] = target_value
      timestamp_at_lookback = last_row.__getitem__("timestamp")
      look_back = 1
      different = False

      while not different:
          previous_row = df.collect()[row_count - 1 - look_back]
          if previous_row.__getitem__(c) == target_value:
              timestamp_at_lookback = previous_row.__getitem__("timestamp")
              look_back += 1

          else:
              different = True

      # At this point, a difference has been found, so calculate the age
      final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days

As such, the ledger data / DataFrame would reduce to (assuming the calculation was run on 2019-04-14):

{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }
Dan
  • 4,197
  • 6
  • 34
  • 52
  • You can combine the reading and the file level evaluation into a function then use: https://stackoverflow.com/questions/9786102/how-do-i-parallelize-a-simple-python-loop after all files are processessed you can combine the results into your final output – Lucas Apr 16 '19 at 14:19
  • 1
    You can read about `delayed`. Check out this [asnwer](https://stackoverflow.com/a/42597019/4819376). – rpanai Apr 16 '19 at 15:23
  • Seems like I might want to do a hybrid of the two above comments -- that is, combine the file read / evaluation into one function and then use Dask Distributed's `client.map(eval_func, [list of HDFS filepaths])`. – Dan Apr 17 '19 at 12:44

1 Answers1

0

Writing in parallel from many processes into a single output file is not really possible, because you don't know how long each of the results will be beforehand, so you don't know where in the file to place other results. furthermore, HDFS really likes to receive big blocks of contiguous data rather (maybe 64MB) than incremental updates.

There are a couple of things you could do:

  • write all of your outputs to separate files, and then run a separate job to concatenate them; this is a totally fine thing to do if the processing of the dataframes is large compared to the reading/writing time
  • use the distributed client.submit API and as_completed to write results to the output file from your main process. Note that you could make this respect the original ordering, if that is important, but it would take some extra work.
mdurant
  • 27,272
  • 5
  • 45
  • 74