0

I have a dataframe df as mentioned below:

**customers**   **product**   **val_id**  **rule_name**  **rule_id** **priority**
     1               A            1           ABC            123         1
     3               Z            r           ERF            789         2
     2               B            X           ABC            123         2
     2               B            X           DEF            456         3
     1               A            1           DEF            456         2      

I want to create a new dataframe df2, which will have only unique customer ids, but as rule_name and rule_id columns are different for same customer in data, so I want to pick those records which has highest priority for the same customer, so my final outcome should be:

 **customers**   **product**   **val_id**  **rule_name**  **rule_id** **priority**
         1               A            1           ABC            123         1
         3               Z            r           ERF            789         2
         2               B            X           ABC            123         2

Can anyone please help me to achieve it using Spark scala. Any help will be appericiated.

zero323
  • 322,348
  • 103
  • 959
  • 935
Rahul Cha
  • 21
  • 1
  • 4

4 Answers4

4

You basically want to select rows with extreme values in a column. This is a really common issue, so there's even a whole tag . Also see this question SQL Select only rows with Max Value on a Column which has a nice answer.

Here's an example for your specific case.

Note that this could select multiple rows for a customer, if there are multiple rows for that customer with the same (minimum) priority value.

This example is in pyspark, but it should be straightforward to translate to Scala

# find best priority for each customer. this DF has only two columns.
cusPriDF = df.groupBy("customers").agg( F.min(df["priority"]).alias("priority") )
# now join back to choose only those rows and get all columns back
bestRowsDF = df.join(cusPriDF, on=["customers","priority"], how="inner")
Corey
  • 1,845
  • 1
  • 12
  • 23
0

To create df2 you have to first order df by priority and then find unique customers by id. Like this:

val columns = df.schema.map(_.name).filterNot(_ == "customers").map(col => first(col).as(col))

val df2 = df.orderBy("priority").groupBy("customers").agg(columns.head, columns.tail:_*).show

It would give you expected output:

+----------+--------+-------+----------+--------+---------+
| customers| product| val_id| rule_name| rule_id| priority|
+----------+--------+-------+----------+--------+---------+
|         1|       A|      1|       ABC|     123|        1|
|         3|       Z|      r|       ERF|     789|        2|
|         2|       B|      X|       ABC|     123|        2|
+----------+--------+-------+----------+--------+---------+
himanshuIIITian
  • 5,985
  • 6
  • 50
  • 70
0

Corey beat me to it, but here's the Scala version:

val df = Seq(
(1,"A","1","ABC",123,1),
(3,"Z","r","ERF",789,2),
(2,"B","X","ABC",123,2),
(2,"B","X","DEF",456,3),
(1,"A","1","DEF",456,2)).toDF("customers","product","val_id","rule_name","rule_id","priority")
val priorities = df.groupBy("customers").agg( min(df.col("priority")).alias("priority"))
val top_rows = df.join(priorities, Seq("customers","priority"), "inner")

+---------+--------+-------+------+---------+-------+
|customers|priority|product|val_id|rule_name|rule_id|
+---------+--------+-------+------+---------+-------+
|        1|       1|      A|     1|      ABC|    123|
|        3|       2|      Z|     r|      ERF|    789|
|        2|       2|      B|     X|      ABC|    123|
+---------+--------+-------+------+---------+-------+
Garren S
  • 5,552
  • 3
  • 30
  • 45
0

You will have to use min aggregation on priority column grouping the dataframe by customers and then inner join the original dataframe with the aggregated dataframe and select the required columns.

val aggregatedDF = dataframe.groupBy("customers").agg(max("priority").as("priority_1"))
      .withColumnRenamed("customers", "customers_1")

    val finalDF = dataframe.join(aggregatedDF, dataframe("customers") === aggregatedDF("customers_1") && dataframe("priority") === aggregatedDF("priority_1"))
    finalDF.select("customers",   "product",   "val_id",  "rule_name",  "rule_id", "priority").show

you should have the desired result

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97