6

Similar to Kafka's log compaction there are quite a few use cases where it is required to keep only the last update on a given key and use the result for example for joining data.

How can this be archived in spark structured streaming (preferably using PySpark)?

For example suppose I have table

key    | time   | value
----------------------------
A      | 1      | foo
B      | 2      | foobar
A      | 2      | bar
A      | 15     | foobeedoo

Now I would like to retain the last values for each key as state (with watermarking), i.e. to have access to a the dataframe

key    | time   | value
----------------------------
B      | 2      | foobar
A      | 15     | foobeedoo

that I might like to join against another stream.

Preferably this should be done without wasting the one supported aggregation step. I suppose I would need kind of a dropDuplicates() function with reverse order.

Please note that this question is explicily about structured streaming and how to solve the problem without constructs that waste the aggregation step (hence, everything with window functions or max aggregation is not a good answer). (In case you do not know: Chaining Aggregations is right now unsupported in structured streaming.)

Thilo
  • 8,827
  • 2
  • 35
  • 56
  • Possible duplicate of [Find maximum row per group in Spark DataFrame](https://stackoverflow.com/questions/35218882/find-maximum-row-per-group-in-spark-dataframe) – zero323 Oct 28 '18 at 15:18
  • 1
    Any deduplication has to be done through shuffle. You won' have a cake and eat it. – zero323 Oct 29 '18 at 22:59
  • 1
    Drop duplicates works by aggregation and retains arbitrary object. Additionally, if I am not mistaken, current implementation of the Kafka data source is not aware of Kafka partitioning key, therefore cannot factor it in the execution plan. – zero323 Nov 01 '18 at 11:42
  • I think @user6910411 left nice answers. "it is required to keep only the last update on a given key" Your question itself just indicates you need to shuffle with key. – Jungtaek Lim Nov 09 '18 at 01:12
  • 1
    Any progress here? It's an excellent question with few views. Surprising - at least to me. – thebluephantom Feb 07 '19 at 15:54
  • 1
    My conclusion is that it is not possible. – thebluephantom Feb 08 '19 at 13:32

1 Answers1

0

Using flatMapGroupsWithState or mapGroupsWithState, group by key, and sort the value by time in the flatMapGroupsWithState function, store the last line into the GroupState.

Bando
  • 1,223
  • 1
  • 12
  • 31
zhicheng
  • 156
  • 1
  • 6
  • Hi @zhicheng, the official doc says below. Does that mean the result of the aggregation you proposed cannot be used for join? ```As of Spark 2.4, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used. Cannot use streaming aggregations before joins. Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.``` or maybe you were proposing using 'Complete' mode? – soMuchToLearnAndShare Sep 14 '20 at 13:13
  • Hi @zhicheng, the official doc says ```Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.```. How did you do the sorting? – soMuchToLearnAndShare Sep 14 '20 at 21:18
  • If you want to join the dataframe of using flatMapGroupsWithState, the output mode should be "append". Maybe you have misunderstood what I mean, I mean using sorting in the flatMapGroupsWithState function. – zhicheng Sep 25 '20 at 09:29
  • Okay. I did not understand you use sorting inside of the update state function. By the way, one cannot use append mode if the query uses `flatmapGroupsWithState/mapGroupsWithState` – soMuchToLearnAndShare Sep 25 '20 at 17:32