14

I have a PySpark DataFrame with one column as one hot encoded vectors. I want to aggregate the different one hot encoded vectors by vector addition after groupby

e.g. df[userid,action] Row1: ["1234","[1,0,0]] Row2: ["1234", [0 1 0]]

I want the output as row: ["1234", [ 1 1 0]] so the vector is a sum of all vectors grouped by userid.

How can I achieve this? PySpark sum aggregate operation does not support the vector addition.

zero323
  • 322,348
  • 103
  • 959
  • 935
user2242666
  • 211
  • 1
  • 2
  • 7
  • 1
    Related [Applying UDFs on GroupedData in PySpark (with functioning python example)](https://stackoverflow.com/q/40006395/6910411) and [How to define a custom aggregation function to sum a column of Vectors?](https://stackoverflow.com/q/33899977/6910411) – zero323 Jan 14 '19 at 18:56

1 Answers1

18

You have several options:

  1. Create a user defined aggregate function. The problem is that you will need to write the user defined aggregate function in scala and wrap it to use in python.
  2. You can use the collect_list function to collect all values to a list and then write a UDF to combine them.
  3. You can move to RDD and use aggregate or aggregate by key.

Both options 2 & 3 would be relatively inefficient (costing both cpu and memory).

alan
  • 3,246
  • 1
  • 32
  • 36
Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • 1
    I understand why (2) is inefficient, as it shuffles all the data to collect the items in actual lists. But why is (3) inefficient? – Thomas B. Dec 11 '17 at 16:58
  • 4
    @ThomasB. 3 is inefficient because you would lose all the dataframe optimizations (e.g. catalyst optimizations, whole stage codegen etc.), since your input is dataframe it will also need to convert the data from dataframe representation to RDD representation. Lastly, in python all RDD operations basically serialize the data to python, have python operate on them and return results which is relatively slow. – Assaf Mendelson Dec 12 '17 at 06:49