2

I would like to drop all records which are duplicate entries but have said a difference in the timestamp could be of any amount of time as an offset but for simplicity will use 2 minutes.

+-------------------+-----+----+
|Date               |ColA |ColB|
+-------------------+-----+----+
|2017-07-04 18:50:21|ABC  |DEF |
|2017-07-04 18:50:26|ABC  |DEF |
|2017-07-04 18:50:21|ABC  |KLM |
+-------------------+-----+----+

I would like my dataframe to have only rows

+-------------------+-----+----+
|Date               |ColA |ColB|
+-------------------+-----+----+
|2017-07-04 18:50:26|ABC  |DEF |
|2017-07-04 18:50:21|ABC  |KLM |
+-------------------+-----+----+

I tried something like this but this does not remove duplicates.

    val joinedDfNoDuplicates = joinedDFTransformed.as("df1").join(joinedDFTransformed.as("df2"), col("df1.ColA") === col("df2.ColA") &&
      col("df1.ColB") === col("df2.ColB") && 
      && abs(unix_timestamp(col("Date")) - unix_timestamp(col("Date"))) > offset
      )

For now, I am just selecting distinct or a group by min here Find minimum for a timestamp through Spark groupBy dataframe on the data based on certain columns but I would like a more robust solution the reason for this is that data outside of that interval may be valid data. Also, the offset could be changed so maybe within 5s or 5 minutes depending on requirements.

Somebody mentioned to me about creating a UDF comparing dates and if all other columns are the same but I am not sure exactly how to do that such that either I would filter out rows or add a flag and then remove those rows any help would be greatly appreciated.

Similiar sql question here Duplicate entries with different timestamp

Thanks!

Rami
  • 8,044
  • 18
  • 66
  • 108
javadev
  • 277
  • 3
  • 19

1 Answers1

5

I would do it like this:

  1. Define a Window to order to dates over a dummy column.
  2. Add a dummy column, and add a constant value to it.
  3. Add a new column containing the date of the previous record.
  4. calculate the difference between the date and the previous date.
  5. Filter your records based on the value of the difference.

The Code can be something like the follow:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("dummy").orderBy("Date") // step 1

df.withColumn("dummy", lit(1)) // this is step 1
  .withColumn("previousDate", lag($"Date", 1) over w) // step 2
  .withColumn("difference", unix_timestamp($"Date") - unix_timestamp("previousDate")) // step 3

This above solution is valid if you have pairs of records that might be close in time. If you have more than two records, you can compare each record to the first record (not the previous one) in the window, so instead of using lag($"Date",1), you use first($"Date"). In this case the 'difference' column contains the difference in time between the current record and the first record in the window.

Rami
  • 8,044
  • 18
  • 66
  • 108