4

In case of stateful aggregation (arbitrary) in Structured Streaming with foreachBatch to merge update into delta table, should I persist batch dataframe inside foreachBatch before upserting or not?

It seems for be that persist is not required since i'm writing to single data sink.

On the other hand i have strong feeling that not persisting will cause source re-scan and trigger aggregation twice.

Any comments/thoughts?

foreachBatch((VoidFunction2<Dataset<Row>, Long>) (batchDf, batchId) -> 
        deltaTable.as("table").merge(batchDf.as("updates"), functions.expr("table.id=updates.id"))
        .whenNotMatched().insertAll()           // new session to be added
        .whenMatched()
        .updateAll()
        .execute())
Yurii Oleynikov
  • 103
  • 2
  • 8

2 Answers2

1

So the answer from delta-users (https://groups.google.com/g/delta-users/c/Ihm6PMilCdI) is:

DeltaTable.merge (upsert) does two passes on the source data.

So if you DO care about the Spark metrics or logs in Arbitrary Stateful Aggregation inside mapGroupsWithState/flatmapGroupsWithState - do persist/cache before merge inside foreachBatch, otherwise sent metrics will have double (x2) values and logs aggregation logs will be emitted twice

Yurii Oleynikov
  • 103
  • 2
  • 8
0

Let me cite the page below:

To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it.

I don't know if you have already visited this page but seems that you are correct that persist is not necessary in your case. It is essential for multiple locations.

Source: https://docs.databricks.com/spark/latest/structured-streaming/foreach.html

Marco Massetti
  • 539
  • 4
  • 12
  • Yep, I've seen that page and that what caused me to think that persisting is not required. Moreover, in databricks notebook they are not persisting(https://docs.databricks.com/_static/notebooks/merge-in-streaming.html). on the other hand it seems that if i not persisting i see my logs from arbitrary aggregation stage twice. – Yurii Oleynikov Jan 18 '20 at 22:45
  • I draw the same conclusion as you but i didn't get that you still have duplicates. I've checked even in the Spark docs and i've found a possible explanation in the "Note" part (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch). "By default, foreachBatch provides only at-least-once write guarantees. However, you can use the batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee." – Marco Massetti Jan 18 '20 at 22:58
  • Basically, they are saying that it doesn't matter what you do before, use the batchId and do the deduplication yourself – Marco Massetti Jan 18 '20 at 23:00
  • can you abit elaborate or refer to some example for de-duplication using batchId and foreachBatch? – Yurii Oleynikov Jan 18 '20 at 23:12
  • This answer should cover that part better than me https://stackoverflow.com/questions/57704664/how-does-structured-streaming-ensure-exactly-once-writing-semantics-for-file-sin – Marco Massetti Jan 18 '20 at 23:21
  • Thanks a lot, Marco! – Yurii Oleynikov Jan 18 '20 at 23:40