1

Spark dropDuplicates keeps the first instance and ignores all subsequent occurrences for that key. Is it possible to do remove duplicates while keeping the most recent occurrence?

For example if below are the micro batches that I get, then I want to keep the most recent record (sorted on timestamp field) for each country.

batchId: 0

Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06

batchId: 1

Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:03

Then output after batchId 1 should be below -

Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:06

Update-1 This is the current code that I have

//KafkaDF is a streaming dataframe created from Kafka as source
val streamingDF = kafkaDF.dropDuplicates("country")

streamingDF.writeStream
    .trigger(Trigger.ProcessingTime(10000L))
    .outputMode("update")
    .foreachBatch {
      (batchDF: DataFrame, batchId: Long) => {
        println("batchId: "+ batchId)
        batchDF.show()
      }
    }.start()

I want to output all rows which are either new or have greater timestamp than any record in previous batches processed so far. Example below

After batchId: 0 - Both countries appeared for first time so I should get them in output

Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06

After batchId: 1 - Belarus's timestamp is older than we I received in batch 0 so I don't display that in output. Australia is displayed as its timestamp is more recent than what I have seen so far.

Australia, 10, 2020-05-05 00:00:08

Now let's say batchId 2 comes up with both records as late arrival then it should not display anything in ouput for that batch.

Input batchId: 2

Australia, 10, 2020-05-05 00:00:01
Belarus, 10, 2020-05-05 00:00:01

After batchId: 2

.

Update-2

Adding input and expected records for each batch. Rows marked with red color are discarded and not shown in output as an another row with same country name and more recent timestamp is seen in previous batches enter image description here

conetfun
  • 1,605
  • 4
  • 17
  • 38
  • I found a question which asks exactly what I am looking for but unfortunately, there are no answers to it. https://stackoverflow.com/questions/50823468/retain-last-row-for-given-key-in-spark-structured-streaming All, I am looking for is dropDuplicates in reverse order so that it keeps the most recent record instead of first – conetfun Jul 05 '20 at 10:00
  • Where are you writing data ?? – Srinivas Jul 05 '20 at 11:57
  • @Srinivas I will be writing it to a persistent database like Oracle but I don’t want to solve it using lookup to oracle as I want to filter the late arriving records in the begninning itself – conetfun Jul 05 '20 at 11:59
  • Late arriving records means how much duration.. just think same records that came two days back can come today .. how are you going to handle that ?? – Srinivas Jul 05 '20 at 12:05
  • I can safely assume that my events won’t be late by more than an hour. – conetfun Jul 05 '20 at 12:07
  • Can you add few more sample records if possible ?? – Srinivas Jul 05 '20 at 12:11
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/217248/discussion-between-conetfun-and-srinivas). – conetfun Jul 05 '20 at 12:36
  • Hi @conetfun, how did you write to oracle database. I did not see the official document supoprting db as a sink, which is quite surprising for me. – soMuchToLearnAndShare Sep 14 '20 at 20:01

2 Answers2

2

In order to avoid late arriving events in streaming app you need to keep a state in your application, that keeps track of latest processed event per key in your case it is country.

case class AppState(country:String, latestTs:java.sql.Timestamp)

For a microbatch, you might receive multiple events on that when you do groupByKey(_.country) you will get a events belong to a key(country) and you need to compare against it with the state to find the latest input event and update the state with the latest timestamp for the key and proceed with the latest event for further processing. For late arriving events, it should return an Option[Event] and filter out the same in subsequent process.

Refer this blog for detailed explanation.

Sivakumar
  • 1,711
  • 14
  • 18
  • 1
    I can't thank you enough for this answer. Though, I had read about stateful transformations in last few days but your answer gave me that confidence to dive deep and give it a try and it works. – conetfun Jul 05 '20 at 20:09
  • Hi @conetfun, when you say give it a try, you meant the 'blog' article? was it something similar to this https://stackoverflow.com/questions/50933606/spark-streaming-select-record-with-max-timestamp-for-each-id-in-dataframe-pysp, it talks about mapGroupsWithState – soMuchToLearnAndShare Sep 14 '20 at 10:01
  • Hi @Sivakumar, 1). I am not sure I understand what you meant by ```For late arriving events, it should return an Option[Event] and filter out the same in subsequent process```. 2) does the state case class only need to keep the key and the latestTs? it does not need to keep other values? that means we need to join back to get other values. – soMuchToLearnAndShare Sep 14 '20 at 15:21
  • Hi @conetfun, I've posted a question sort of related to what your case. Please have a look and comment/answer if possible: https://stackoverflow.com/questions/63916475/is-it-possible-to-let-spark-structured-streamupdate-mode-to-write-to-db – soMuchToLearnAndShare Sep 16 '20 at 08:51
0

Try to usewindow function in spark streaming, check below for example.

val columns = Seq("country","id").map(col(_))
df.groupBy(window($"timestamp","10 minutes","5 minutes"), columns:_*)

You can also check same in this question, Solution is in python.

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • Thanks, My apologies as my question wasn't very clear earlier. I have made edits to make it verbose. In your example, you are grouping over window, so it will display one record per window. I just want to output the records as I stated in update- without window based groups – conetfun Jul 05 '20 at 09:53
  • Hi @conetfun, so what is the difference with or without window? without window, you are showing only 1 record across the history? with window you are showing 1 record each time the trigger checks, and check across the window length? thank you. – soMuchToLearnAndShare Sep 14 '20 at 09:02
  • 1
    @Minnie - For my requirement, Sliding window won't work as I would need to get all data from beginning to compare it with current batch data for filtering out the late arriving records (across batches). I was able to solve the same using `mapGroupsWithState` but my final solution was not to use that as it writes the state to disk with every batch due to which trigger execution took longer. I ended up maintaining state using a scala map and recreating an RDD for every new batch, filter late arriving rows, and updating map with latest values from current batch. – conetfun Sep 14 '20 at 20:36
  • @conetfun, I also found once I started to use ```mapGroupsWithState```, it started to write output for each trigger/batch, even though there is no new data being processed - **if** I use timeout conf with processing time. ```mapGroupsWithState``` signature says though For a streaming Dataset, the function will be invoked for each group repeatedly in every trigger, and ```updates to each group's state will be saved across invocations.``` how ever that means **saved across invocations**.... **but** if i use timeout conf with eventtime, it will not write the empty output out. – soMuchToLearnAndShare Sep 17 '20 at 16:45