6

I have a spark dataframe with multiple columns in it. I want to find out and remove rows which have duplicated values in a column (the other columns can be different).

I tried using dropDuplicates(col_name) but it will only drop duplicate entries but still keep one record in the dataframe. What I need is to remove all entries which were initially containing duplicate entries.

I am using Spark 1.6 and Scala 2.10.

ZygD
  • 22,092
  • 39
  • 79
  • 102
salmanbw
  • 1,301
  • 2
  • 17
  • 23
  • 2
    [How to make good reproducible Apache Spark Dataframe examples](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples) – philantrovert Apr 10 '18 at 06:46
  • If I understand correctly, you want to remove all entries of those that are duplicated in a single column? Or they are duplicates when considering multiple columns? – Shaido Apr 10 '18 at 07:02
  • @shaido yes, all entries of those that are duplicated in single column. – salmanbw Apr 10 '18 at 07:18
  • In scala that would be as follows, i guess there should by a similar way to do that in Python, hope this helps - get the column names: val columns = df.schema.map(_.name) - Run a foldLeft on that list of columns: columns.foldLeft(df)((acc, elem) => acc.dropDuplicates(elem)) – SCouto Apr 10 '18 at 07:32
  • I saw you removed the pyspark tag and added you are using Scala, so I changed the answer to reflect this. – Shaido Apr 10 '18 at 09:52

3 Answers3

10

I would use window-functions for this. Lets say you want to remove duplicate id rows :

import org.apache.spark.sql.expressions.Window

df
  .withColumn("cnt", count("*").over(Window.partitionBy($"id")))
  .where($"cnt"===1).drop($"cnt")
  .show()
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
2

This can be done by grouping by the column (or columns) to look for duplicates in and then aggregate and filter the results.

Example dataframe df:

+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  4|  5|
+---+---+

Grouping by the id column to remove its duplicates (the last two rows):

val df2 = df.groupBy("id")
  .agg(first($"num").as("num"), count($"id").as("count"))
  .filter($"count" === 1)
  .select("id", "num")

This will give you:

+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
+---+---+

Alternativly, it can be done by using a join. It will be slower, but if there is a lot of columns there is no need to use first($"num").as("num") for each one to keep them.

val df2 = df.groupBy("id").agg(count($"id").as("count")).filter($"count" === 1).select("id")
val df3 = df.join(df2, Seq("id"), "inner")
Shaido
  • 27,497
  • 23
  • 70
  • 73
  • 1
    Since you are checking the count of grouped column == 1, can't the join be replaced by `first($"num").as("num")` ? It should result in the same output. – philantrovert Apr 10 '18 at 07:58
  • @philantrovert That is a good point, not sure how I missed that. I changed the answer to take that into account. – Shaido Apr 10 '18 at 08:02
  • @shaido Your solution also worked fine, but i would like to avoid creating another dataframe for this problem, that's why accepting raphael's ans – salmanbw Apr 10 '18 at 15:00
  • @salmanbw The first approach will not create any extra dataframe though? The only difference is that this one uses `groupBy` and `agg` while his uses window functions to get the result. – Shaido Apr 11 '18 at 01:34
2

I added a killDuplicates() method to the open source spark-daria library that uses @Raphael Roth's solution. Here's how to use the code:

import com.github.mrpowers.spark.daria.sql.DataFrameExt._

df.killDuplicates(col("id"))

// you can also supply multiple Column arguments
df.killDuplicates(col("id"), col("another_column"))

Here's the code implementation:

object DataFrameExt {

  implicit class DataFrameMethods(df: DataFrame) {

    def killDuplicates(cols: Column*): DataFrame = {
      df
        .withColumn(
          "my_super_secret_count",
          count("*").over(Window.partitionBy(cols: _*))
        )
        .where(col("my_super_secret_count") === 1)
        .drop(col("my_super_secret_count"))
    }

  }

}

You might want to leverage the spark-daria library to keep this logic out of your codebase.

Powers
  • 18,150
  • 10
  • 103
  • 108