0

I've noticed that spark's function, collect is extremely slow on large sets of data so I'm trying to fix this using parallelize.

My main method creates the spark session and passes that to the get_data func.

def main():
    spark = SparkSession.builder.appName('app_name').getOrCreate()
    return get_data(spark)

Here is where I try to parallelize my collect function

def get_data(spark):
    df = all_data(spark)
    data = spark.sparkContext.parallelize(df.select('my_column').distinct().collect())
    return map(lambda row: row['my_column'], data)

This does not work and returns this error:

TypeError: 'RDD' object is not iterable

Does anyone have any ideas on how to parallelize or increase performance on the get_data function.

ChaseHardin
  • 2,189
  • 7
  • 29
  • 50
  • There is no way to parallelize a `collect` operation. It is by definition a method that serializes your data through the master node. Don't try to `collect` large amounts of data- let Spark handle the heavy lifting and only collect aggregated results. – pault Sep 19 '19 at 17:34
  • @pault so how would you handle this if I need the `collect` so that I can map over the data? – ChaseHardin Sep 19 '19 at 17:37
  • @pault to provide some additional context, I'm trying to put the row values in some sort of a list/array so that I can pass them as values to the pyspark `pivot` function. – ChaseHardin Sep 19 '19 at 17:52
  • https://stackoverflow.com/questions/30244910/how-to-pivot-spark-dataframe – mazaneicha Sep 19 '19 at 21:02
  • @ChaseHardin - if you have a fixed list that doesn't change you can ```broadcast``` that list and pass it into the ```.pivot("col", list_variable)```. if your list is dynamic (i have faced this challenge) the only way i know is running a ```collect()``` with a ```flatMap``` to flatten into a list and then ```broadcast``` – thePurplePython Sep 19 '19 at 21:26
  • @thePurplePython, unfortunately, it's a dynamic list. How is the `flatMap` more performant than an original `map` function? Do you have an example of `broadcast` with the `pivot`? – ChaseHardin Sep 19 '19 at 21:28
  • @thePurplePython, do you have cluster configurations that work well when working with large data sets and pivots? Mainly thinking worker/driver types. – ChaseHardin Sep 19 '19 at 21:33
  • @ChaseHardin - provided an example below; cluster configurations mostly depend on the cluster hardware specs and cluster manager – thePurplePython Sep 19 '19 at 23:32

1 Answers1

2

Here are examples of static and dynamic approaches using a broadcast variable (read-only variable persisted in each executor memory; avoids transferring a copy of the list on the driver machine for every distributed task) to retrieve the distinct values of a column. Also, if you don't provide a hard-coded value during the pivot it will trigger an extra job (wide transformation shuffle) to get the distinct values for that column.

Disclaimer => there may be a better alternative out there performance wise for the dynamic approach

print(spark.version)
2.4.3

import pyspark.sql.functions as F

# sample data
rawData = [(1, "a"),
           (1, "b"),
           (1, "c"),
           (2, "a"),
           (2, "b"),
           (2, "c"),
           (3, "a"),
           (3, "b"),
           (3, "c")]

df = spark.createDataFrame(rawData).toDF("id","value")

# static list example
l = ["a", "b", "c"]
l = spark.sparkContext.broadcast(l)

pivot_static_df = df\
  .groupby("id")\
  .pivot("value", l.value)\
  .agg(F.expr("first(value)"))

pivot_static_df.show()
+---+---+---+---+
| id|  a|  b|  c|
+---+---+---+---+
|  1|  a|  b|  c|
|  3|  a|  b|  c|
|  2|  a|  b|  c|
+---+---+---+---+

# dynamic list example
v = df.select("value").distinct().rdd.flatMap(lambda x: x).collect()
v = spark.sparkContext.broadcast(v)

print(v.value)

pivot_dynamic_df = df\
  .groupby("id")\
  .pivot("value", l.value)\
  .agg(F.expr("first(value)"))

pivot_dynamic_df.show()
['c', 'b', 'a']
+---+---+---+---+
| id|  a|  b|  c|
+---+---+---+---+
|  1|  a|  b|  c|
|  3|  a|  b|  c|
|  2|  a|  b|  c|
+---+---+---+---+
thePurplePython
  • 2,621
  • 1
  • 13
  • 34