3

I have a massive pyspark dataframe. I have to perform a group by however I am getting serious performance issues. I need to optimise the code so I have been reading that a Reduce by Key is much more efficient.

This an example of the data frame.

a = [('Bob', 562,"Food", "12 May 2018"), ('Bob',880,"Food","01 June 2018"), ('Bob',380,'Household'," 16 June 2018"),  ('Sue',85,'Household'," 16 July 2018"), ('Sue',963,'Household'," 16 Sept 2018")]
df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])

Output:

+------+------+---------+-------------+
|Person|Amount|   Budget|         Date|
+------+------+---------+-------------+
|   Bob|   562|     Food|  12 May 2018|
|   Bob|   880|     Food| 01 June 2018|
|   Bob|   380|Household| 16 June 2018|
|   Sue|    85|Household| 16 July 2018|
|   Sue|   963|Household| 16 Sept 2018|
+------+------+---------+-------------+

I have implemented the following code, however as mentioned before, the actual data frame is massive.

df_grouped = df.groupby('person').agg(F.collect_list(F.struct("Amount", "Budget", "Date")).alias("data"))

Ouput:

+------+--------------------------------------------------------------------------------+
|person|data                                                                            |
+------+--------------------------------------------------------------------------------+
|Sue   |[[85,Household, 16 July 2018], [963,Household, 16 Sept 2018]]                   |
|Bob   |[[562,Food,12 May 2018], [880,Food,01 June 2018], [380,Household, 16 June 2018]]|
+------+--------------------------------------------------------------------------------+

With the schema being:

root
 |-- person: string (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Amount: long (nullable = true)
 |    |    |-- Budget: string (nullable = true)
 |    |    |-- Date: string (nullable = true)

I need to convert the group by into a reduce by key such that I can create the same schema as above.

Bryce Ramgovind
  • 3,127
  • 10
  • 41
  • 72

1 Answers1

-1

How about this,

def flatten(l, ltypes=(tuple)):
    ltype = type(l)
    l = list(l)
    i = 0
    while i < len(l):
        while isinstance(l[i], ltypes):
            if not l[i]:
                l.pop(i)
                i -= 1
                break
            else:
                l[i:i + 1] = l[i]
        i += 1
    return ltype(l)

def nested_change(item, func):
    if isinstance(item, list):
        return [nested_change(x, func) for x in item]
    return func(item)


def convert(*args):
    return args

df_final = df.rdd.map(lambda x: ((x['Person']),([x[y] for y in cols if y != 'Person']))).reduceByKey(convert)\
                 .map(lambda x:(x[0],nested_change(list(flatten(x[1])),str)))\
                 .toDF(['person','data'])

df_final.show()
mayank agrawal
  • 2,495
  • 2
  • 13
  • 32