In case of stateful aggregation (arbitrary) in Structured Streaming with foreachBatch to merge update into delta table, should I persist batch dataframe inside foreachBatch before upserting or not?
It seems for be that persist is not required since i'm writing to single data sink.
On the other hand i have strong feeling that not persisting will cause source re-scan and trigger aggregation twice.
Any comments/thoughts?
foreachBatch((VoidFunction2<Dataset<Row>, Long>) (batchDf, batchId) ->
deltaTable.as("table").merge(batchDf.as("updates"), functions.expr("table.id=updates.id"))
.whenNotMatched().insertAll() // new session to be added
.whenMatched()
.updateAll()
.execute())