Spark dropDuplicates
keeps the first instance and ignores all subsequent occurrences for that key. Is it possible to do remove duplicates while keeping the most recent occurrence?
For example if below are the micro batches that I get, then I want to keep the most recent record (sorted on timestamp field) for each country.
batchId: 0
Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06
batchId: 1
Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:03
Then output after batchId 1 should be below -
Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:06
Update-1 This is the current code that I have
//KafkaDF is a streaming dataframe created from Kafka as source
val streamingDF = kafkaDF.dropDuplicates("country")
streamingDF.writeStream
.trigger(Trigger.ProcessingTime(10000L))
.outputMode("update")
.foreachBatch {
(batchDF: DataFrame, batchId: Long) => {
println("batchId: "+ batchId)
batchDF.show()
}
}.start()
I want to output all rows which are either new or have greater timestamp than any record in previous batches processed so far. Example below
After batchId: 0 - Both countries appeared for first time so I should get them in output
Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06
After batchId: 1 - Belarus's timestamp is older than we I received in batch 0 so I don't display that in output. Australia is displayed as its timestamp is more recent than what I have seen so far.
Australia, 10, 2020-05-05 00:00:08
Now let's say batchId 2 comes up with both records as late arrival then it should not display anything in ouput for that batch.
Input batchId: 2
Australia, 10, 2020-05-05 00:00:01
Belarus, 10, 2020-05-05 00:00:01
After batchId: 2
.
Update-2
Adding input and expected records for each batch. Rows marked with red color are discarded and not shown in output as an another row with same country name and more recent timestamp is seen in previous batches