2

I have the following DataFrame df:

How can I delete duplicates, while keeping the minimum value of level per each duplicated pair of item_id and country_id.

+-----------+----------+---------------+                                        
|item_id    |country_id|level          |
+-----------+----------+---------------+
|     312330|  13535670|             82|
|     312330|  13535670|            369|
|     312330|  13535670|            376|
|     319840|  69731210|            127|
|     319840|  69730600|            526|
|     311480|  69628930|            150|
|     311480|  69628930|            138|
|     311480|  69628930|            405|
+-----------+----------+---------------+

The expected output:

+-----------+----------+---------------+                                        
|item_id    |country_id|level          |
+-----------+----------+---------------+
|     312330|  13535670|             82|
|     319840|  69731210|            127|
|     319840|  69730600|            526|
|     311480|  69628930|            138|
+-----------+----------+---------------+

I know how to delete duplicates without conditions using dropDuplicates, but I don't know how to do it for my particular case.

zero323
  • 322,348
  • 103
  • 959
  • 935
Markus
  • 3,562
  • 12
  • 48
  • 85

1 Answers1

10

One of the method is to use orderBy (default is ascending order), groupBy and aggregation first

import org.apache.spark.sql.functions.first
df.orderBy("level").groupBy("item_id", "country_id").agg(first("level").as("level")).show(false)

You can define the order as well by using .asc for ascending and .desc for descending as below

df.orderBy($"level".asc).groupBy("item_id", "country_id").agg(first("level").as("level")).show(false)

And you can do the operation using window and row_number function too as below

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("item_id", "country_id").orderBy($"level".asc)

import org.apache.spark.sql.functions.row_number
df.withColumn("rank", row_number().over(windowSpec)).filter($"rank" === 1).drop("rank").show()
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • The first method will take the minimum value of `level`, not the maximum, right? – Markus Jan 05 '18 at 11:52
  • 1
    first will take the first row of the grouping. If the ordering is ascending then its the minimum and if its descending then the maximum. – Ramesh Maharjan Jan 05 '18 at 11:56
  • `orderBy($"level".asc)` is marked in red for me. I use Spark 2.2 and Scala 2.11.8 – Markus Jan 05 '18 at 12:26
  • 2
    why would somebody downvote without even commenting. Please comment the drawbacks if you really want to downvote so that I can improve the answer and if the answer is inappropriate then I shall delete it. I just don't understand why people downvote even without commenting. – Ramesh Maharjan Jan 05 '18 at 12:35
  • 2
    I didn't downvote, but are you sure `orderBy` with `first` is guaranteed to work? In my experience it doesn't always do what you might expect in a distributed setup. – Jasper-M Jan 05 '18 at 12:53
  • Yes, definitely @Jasper-M It works as Spark api is designed to work in distributed way. groupBy would distribute grouped dataframe to each executor and order them and takes the first from them. And furthermore I have included window function too which is better than groupBy. – Ramesh Maharjan Jan 05 '18 at 15:41
  • 1
    In the meantime I saw that the accepted answer to the question which this one duplicates says exactly the same as me, so I tend to agree with the downvoter that this answer is party incorrect. – Jasper-M Jan 05 '18 at 16:31