1

I am using Spark Scala and have a dataset which I want to group by and then send the GroupedData to a a custom function. In the custom function, I would process the rows and update an empty dataframe.

I have the below dataframe DF1:

+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
| ACC_SECURITY|ACCOUNT_NO|COSTCENTER|    BU|   MPU|LONG_IND|SHORT_IND|SECURITY_ID|QUANTITY|POS_NEG_QUANTITY|PROCESSED|ALLOC_QUANTITY|NET_QUANTITY|
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
|3FA34789290X2|  3FA34789|    0800TS|BOXXBU|BOXXMP|    0101|     5279|      290X2|   18063|               P|         |             0|           0|
|3FA34782290X2|  3FA34782|    0800TS|BOXXBU|BOXXMP|    0102|     5322|      290X2|    -863|               N|         |             0|           0|
|3FA34789290X2|  3FA34789|    0800TS|BOXXBU|BOXXMP|    0101|     5279|      290X2| -108926|               N|         |             0|           0|
|9211530135G71|  92115301|    08036C|BOXXBU|BOXXMP|    0154|     8380|      35G71|    8003|               P|         |             0|           0|
|9211530235G71|  92115302|    08036C|BOXXBU|BOXXMP|    0144|     8382|      35G71|   -2883|               N|         |             0|           0|
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+

After grouping on SECURITY_ID, I get 2 datasets based on SECURITY_ID values (290X2 and 35G71). These datasets have to be sent to a custom function.

I tried:

  1. Groupby on SECURITY_ID but it needs some aggregation to be done, which I don't have:

    DF1.groupBy("SECURITY_ID").agg(max("SECURITY_ID")).apply(F) 
    

    I dont want aggregation but I can still drop the aggregated column as long as I can pass a function F in the apply block, on the grouped dataset. But apply doesnt take any custom function.

  2. Window function on SECURITY_ID but I don't know how to execute a custom function based on each window:

    val window = Window.partitionBy("security_id") 
    val option2DF = DF1.withColumn("Quantity_Row", F over(window))
    

    I want to see how I can call a function F over the window but not by adding a column.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
Voila
  • 85
  • 2
  • 15
  • @Voila: I added that into the question. Note that you can [edit] a question yourself in the future when adding more information. – Shaido Sep 04 '19 at 06:45
  • As for the question, there are two approaches. You can collect all the values for each group as lists and then process it with the custom function (with an UDF) or you can create a UDAF. This question describes it quite well (although there is no answer): https://stackoverflow.com/questions/49294294/spark-custom-aggregation-collect-listudf-vs-udaf – Shaido Sep 04 '19 at 06:53

0 Answers0