2

I've written a program in Python and pandas which takes a very large dataset (~4 million rows per month for 6 months), groups it by 2 of the columns (date and a label), and then applies a function to each group of rows. There are a variable number of rows in each grouping - anywhere from a handful of rows to thousands of rows. There are thousands of groups per month (label-date combos).

My current program uses multiprocessing, so it's pretty efficient, and I thought would map well to Spark. I've worked with map-reduce before, but am having trouble implementing this in Spark. I'm sure I'm missing some concept in the pipelining, but everything I've read appears to focus on key-value processing, or splitting a distributed dataset by arbitrary partitions, rather than what I'm trying to do. Is there a simple example or paradigm for doing this? Any help would be greatly appreciated.

EDIT: Here's some pseudo-code for what I'm currently doing:

reader = pd.read_csv()
pool = mp.Pool(processes=4)
labels = <list of unique labels>
for label in labels:
    dates = reader[(reader.label == label)]
    for date in dates:
        df = reader[(reader.label==label) && (reader.date==date)]
        pool.apply_async(process, df, callback=callbackFunc)
pool.close()
pool.join()

When I say asynchronous, I mean something analogous to pool.apply_async().

Dave
  • 1,420
  • 3
  • 17
  • 25
  • `process` is an arbitrary function or do you have anything particular in mind? Is it commutative and associative? – zero323 Sep 25 '15 at 18:15
  • @zero323 For this program, `process` is a function that reads external data for that date, and then iterates over all of the rows for the label on that date and calculates several different values to associate with each row. – Dave Sep 25 '15 at 18:17

1 Answers1

1

As for now (PySpark 1.5.0) is see only two three options:

  1. You can try to express your logic using SQL operations and UDFs. Unfortunately Python API doesn't support UDAFs (User Defined Aggregate Functions) but it is still expressive enough, especially with window functions, to cover wide range of scenarios.

    Access to the external data sources can be handled in a couple of ways including:

    • access inside UDF with optional memoization
    • loading to a data frame and using join operation
    • using broadcast variable
  2. Converting data frame to PairRDD and using on of the following:

    • partitionBy + mapPartitions
    • reduceByKey / aggregateByKey

If Python is not a strong requirement Scala API > 1.5.0 supports UDAFs which enable something like this:

df.groupBy(some_columns: _*).agg(some_udaf)
  1. Partitioning data by key and using local Pandas data frames per partition
zero323
  • 322,348
  • 103
  • 959
  • 935
  • That's something I didn't think about - Python is not a strong requirement, I could certainly switch over to Scala to try this out. For your Scala example, can you provide a little more detail (or a link to somewhere I can get started)? Thanks! – Dave Sep 28 '15 at 15:15
  • Two simple examples: http://stackoverflow.com/a/32750733/1560062, http://stackoverflow.com/a/32101530/1560062. You'll some other in the Spark source. – zero323 Sep 29 '15 at 18:05