3

I have a streaming data coming in from kafka into dataFrame. I want to remove duplicates based in Id and keep the latest records based on timestamp.

Sample data is like this :

Id  Name    count   timestamp
1   Vikas   20      2018-09-19T10:10:10
2   Vijay   50      2018-09-19T10:10:20
3   Vilas   30      2018-09-19T10:10:30
4   Vishal  10      2018-09-19T10:10:40
1   Vikas   50      2018-09-19T10:10:50
4   Vishal  40      2018-09-19T10:11:00
1   Vikas   10      2018-09-19T10:11:10
3   Vilas   20      2018-09-19T10:11:20

The output that I am expecting would be :

Id  Name    count   timestamp
1   Vikas   10      2018-09-19T10:11:10
2   Vijay   50      2018-09-19T10:10:20
3   Vilas   20      2018-09-19T10:11:20
4   Vishal  40      2018-09-19T10:11:00

Older duplicates are removed and only the recent records are kept based on the timestamp field.

I am using watermarking for timestamp field. I have tried using "df.removeDuplicate" but it keeps older records intact and anything new gets discarded.

Current code is as follows :

df = df.withWatermark("timestamp", "1 Day").dropDuplicates("Id", "timestamp")

How can we implement custom dedup method so that we can keep latest record as unique record?

Any help is appreciated.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
Vikas Gite
  • 305
  • 5
  • 24

1 Answers1

2

Sort the timestamp column first before dropping the duplicates.

df.withWatermark("timestamp", "1 Day")
  .sort($"timestamp".desc)
  .dropDuplicates("Id", "timestamp")
bp2010
  • 2,342
  • 17
  • 34
  • 3
    "Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode." We are using watermarking, so in our case, it is not a complete output mode. – Vikas Gite Sep 20 '18 at 09:22