3

I have a problem with a user defined function built for concatenating values from one dataframe that matches index value from the other dataframe.

Here are the simplified dataframes that I am trying to match:

a_df:
+-------+------+
| index | name |
+-------+------+    
| 1     | aaa  |
| 2     | bbb  |
| 3     | ccc  |
| 4     | ddd  |
| 5     | eee  |
+-------+------+

b_df:
+-------+------+
| index | code |
+-------+------+    
| 1     | 101  |
| 2     | 102  |
| 3     | 101  |
| 3     | 102  |
| 4     | 103  |
| 4     | 104  |
| 5     | 101  |
+-------+------+

udf function & call:

> def concatcodes(index, dataframe):
>   res = dataframe.where(dataframe.index == index).collect()
>   reslist = "|".join([value.code for value in res])
>   return reslist
> 
> spark.udf.register("concatcodes", concatcodes, StringType())
> 
> resultDF = a_DF.withColumn("codes", lit(concatcodes(a_DF.index, b_df)))

I expect the function to be called per each row of the a_DF dataframe, resulting in the following output:

+-------+------+-------+
| index | name |codes  |
+-------+------+-------+    
| 1     | aaa  |101    |
| 2     | bbb  |102    |
| 3     | ccc  |101|102|
| 4     | ddd  |103|104|
| 5     | eee  |101    |
+-------+------+-------+

However, the funtion seems to be called just once with the whole column passed as its argument, resulting in the following output:

+-------+------+---------------------------+
| index | name |codes                      |
+-------+------+---------------------------+    
| 1     | aaa  |101|102|101|102|103|104|101|    |
| 2     | bbb  |101|102|101|102|103|104|101|
| 3     | ccc  |101|102|101|102|103|104|101|
| 4     | ddd  |101|102|101|102|103|104|101|
| 5     | eee  |101|102|101|102|103|104|101|
+-------+------+---------------------------+

I suppose I am doing something fundamentally wrong when it comes to calling UDF in the .withColum method but I could not figure out what - I would very much appreciate someone pointing out what is wrong with my logic.

2 Answers2

3

Firstly, you don't need a udf for this. The heart of your question is essentially Concatenating string by rows in pyspark and a join. The following will produce the desired output:

from pyspark.sql.functions import collect_list, concat_ws

resultDF = a_df.join(
    b_df.groupBy("index").agg(concat_ws("|", collect_list("code")).alias("code")), 
    on="index"
)

resultDF .show()
#+-----+----+-------+
#|index|name|   code|
#+-----+----+-------+
#|    3| ccc|101|102|
#|    5| eee|    101|
#|    1| aaa|    101|
#|    4| ddd|103|104|
#|    2| bbb|    102|
#+-----+----+-------+

Remember that spark DataFrames are inherently unordered, unless you explicitly introduce order with a sort or orderBy.


To address the issue with your attempt:

I suppose I am doing something fundamentally wrong when it comes to calling UDF in the .withColum method but I could not figure out what

If you look at the execution plan for your code, you will see that the where(dataframe.index == index) portion is essentially being ignored.

resultDF = a_DF.withColumn("codes", lit(concatcodes(a_DF.index, b_df)))
resultDF.explain()
#== Physical Plan ==
#*(1) Project [index#0, name#1, 101|102|101|102|103|104|101 AS codes#64]
#+- Scan ExistingRDD[index#0,name#1]

My suspicion is that this is due to the python udf being applied in batch mode, rather than on a Row basis. You can't use a Dataframe inside a udf, so what must be happening is that the optimizer is running the collect once and using that for all rows.

The bigger issue here is that the approach of calling collect inside the udf defeats the purpose of spark (this is your fundamental misunderstanding). The whole point of using spark is to distribute your computations in parallel across multiple executors. When you use collect operations, this brings all the data into the local memory of the driver. (And in your case, it seems it's then broadcast back to the executors).

Instead, use joins when you need to reference data from multiple spark DataFrames. For udfs, you can think of them as essentially only being meant to operate on a single Row of a single spark DataFrame.

pault
  • 41,343
  • 15
  • 107
  • 149
  • 1
    Fantastic answer, thanks for the spot on references and for touching on the broader context of what was wrong with my attempt - the suggested solution obviously works! – thegreatmoonmoon Aug 13 '19 at 19:57
0

This is my approach

df = pd.merge(a_df,b_df, on = "index")

df.groupby("index").agg({"name" : 'first', "code" : list})

The result is

index name        code

1      aaa       [101]
2      bbb       [102]
3      ccc  [101, 102]
4      ddd  [103, 104]
5      eee       [101]
Yatish Kadam
  • 454
  • 2
  • 11
  • 1
    This question is about spark dataframes, not `pandas` DataFrames – pault Aug 13 '19 at 17:22
  • yes i know i did up-vote your answer(And it should be accepted as the answer too). I just put it here just in-case someone is looking for it. – Yatish Kadam Aug 13 '19 at 17:39