9

I have a use case where I'd need to drop duplicate rows of a dataframe (in this case duplicate means they have the same 'id' field) while keeping the row with the highest 'timestamp' (unix timestamp) field.

I found the drop_duplicate method (I'm using pyspark), but one don't have control on which item will be kept.

Anyone can help ? Thx in advance

arnaud briche
  • 1,479
  • 3
  • 20
  • 25

2 Answers2

10

A manual map and reduce might be needed to provide the functionality you want.

def selectRowByTimeStamp(x,y):
    if x.timestamp > y.timestamp:
        return x
    return y

dataMap = data.map(lambda x: (x.id, x))
uniqueData = dataMap.reduceByKey(selectRowByTimeStamp) 

Here we are grouping all of the data based on id. Then, when we are reducing the groupings, we do so by keeping the record with the highest timestamp. When the code is done reducing, only 1 record will be left for each id.

zero323
  • 322,348
  • 103
  • 959
  • 935
David
  • 11,245
  • 3
  • 41
  • 46
5

You can do something like this:

val df = Seq(
  (1,12345678,"this is a test"),
  (1,23456789, "another test"),
  (2,2345678,"2nd test"),
  (2,1234567, "2nd another test")
).toDF("id","timestamp","data")

+---+---------+----------------+
| id|timestamp|            data|
+---+---------+----------------+
|  1| 12345678|  this is a test|
|  1| 23456789|    another test|
|  2|  2345678|        2nd test|
|  2|  1234567|2nd another test|
+---+---------+----------------+

df.join(
  df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
  $"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
+---+---------+------------+
| id|timestamp|        data|
+---+---------+------------+
|  1| 23456789|another test|
|  2|  2345678|    2nd test|
+---+---------+------------+

If you expect there could be a repeated timestamp for an id (see comments below), you could do this:

df.dropDuplicates(Seq("id", "timestamp")).join(
  df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
  $"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • 1
    It is close but doesn't guarantee a single row per id. – zero323 Apr 14 '16 at 16:58
  • Hmm, you mean if the timestamp is the same? – David Griffin Apr 14 '16 at 17:03
  • Yes, exactly. It should be possible to simply take an arbitrary one here I guess. – zero323 Apr 14 '16 at 17:05
  • That's one of those things where if you know your data it might not be an issue. And like you said, you could always do: `df.groupBy($"id", $"timestamp").agg(last($"data"))` before you do the rest of it. – David Griffin Apr 14 '16 at 17:09
  • It is. `drop_duplicate` could be more universal than `last`. You can handle complete row without mixing values. – zero323 Apr 14 '16 at 17:12
  • Yup. Edited. Never used `dropDuplicates` before. On the other hand, you don't seem to have any control over which row is dropped -- I'm guessing it's arbitrary. – David Griffin Apr 14 '16 at 17:21
  • Thanks :) Already upvoted so I cannot do it once more ;) – zero323 Apr 14 '16 at 19:40
  • @David Griffi the row dropped is based on the first()/head() method call. Of a given data set, if you apply first() method call, the rest of the rows are removed when drop_duplicates() is called. – Kanav Sharma Jul 06 '17 at 12:10