1

I am using delta (OSS - version 0.7.0 with pyspark 3.0.1) and the table is getting modified (merge) every 5 mins - microbatch pyspark script.

When I run for the first time it created 18 small files (numTargetRowsInserted -> 32560) and I used the same data and rerun again though there is no change in the data, table is touched and the version is updated and the number of small files increased to 400 and perviously added 18 files were marked as removed. However, except the first MERGE, subsequent merger is having the following values numTargetRowsCopied -> 32560 in the OperationMetics. Why the target rows copied again and the older files are marked as removed? Am i missing anything?

OperationMetrics data is as below,

operationMetrics |

[numTargetRowsCopied -> 32560, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 18, executionTimeMs -> 0, numTargetRowsInserted -> 0, scanTimeMs -> 68457, numTargetRowsUpdated -> 0, numOutputRows -> 32560, numSourceRows -> 32560, numTargetFilesRemoved -> 400, rewriteTimeMs -> 66410]|

[numTargetRowsCopied -> 32560, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 400, executionTimeMs -> 0, numTargetRowsInserted -> 0, scanTimeMs -> 16838, numTargetRowsUpdated -> 0, numOutputRows -> 32560, numSourceRows -> 32560, numTargetFilesRemoved -> 18, rewriteTimeMs -> 48810]|

[numTargetRowsCopied -> 32560, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 18, executionTimeMs -> 0, numTargetRowsInserted -> 0, scanTimeMs -> 12399, numTargetRowsUpdated -> 0, numOutputRows -> 32560, numSourceRows -> 32560, numTargetFilesRemoved -> 18, rewriteTimeMs -> 15039] |

[numTargetRowsCopied -> 32560, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 18, executionTimeMs -> 0, numTargetRowsInserted -> 0, scanTimeMs -> 12244, numTargetRowsUpdated -> 0, numOutputRows -> 32560, numSourceRows -> 32560, numTargetFilesRemoved -> 18, rewriteTimeMs -> 14828] |

[numTargetRowsCopied -> 32560, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 18, executionTimeMs -> 0, numTargetRowsInserted -> 0, scanTimeMs -> 67154, numTargetRowsUpdated -> 0, numOutputRows -> 32560, numSourceRows -> 32560, numTargetFilesRemoved -> 400, rewriteTimeMs -> 70194]|

[numTargetRowsCopied -> 32560, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 400, executionTimeMs -> 0, numTargetRowsInserted -> 0, scanTimeMs -> 20367, numTargetRowsUpdated -> 0, numOutputRows -> 32560, numSourceRows -> 32560, numTargetFilesRemoved -> 18, rewriteTimeMs -> 80719]|

[numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 18, executionTimeMs -> 0, numTargetRowsInserted -> 32560, scanTimeMs -> 7035, numTargetRowsUpdated -> 0, numOutputRows -> 32560, numSourceRows -> 32560, numTargetFilesRemoved -> 0, rewriteTimeMs -> 11606] |

Merge SQL :

MERGE INTO Target_table tgt
USING Source_table src
ON src.pk_col = tgt.pk_col
WHEN MATCHED AND src.operation=="DELETE" THEN DELETE
WHEN MATCHED AND src.operation=="UPDATE" THEN UPDATE SET *
WHEN NOT MATCHED AND src.operation!="DELETE" THEN INSERT *
Rak
  • 196
  • 2
  • 9

2 Answers2

0

That's a known behavior of the Delta - it rewrites every file that has matching record in the ON clause, regardless of the condition for WHEN MATCHED / WHEN NOT MATCHED.

In your case, if you're using the same data, you still have matches with data in the table, so ON condition is executed, and then when it's going through MATCHED/NOT MATCHED it doesn't find anything. To avoid this you need to think on how you can change condition to be more specific.

Look into this talk (and slides) - it explains how MERGE works

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
0

Hi try to see this for basic guidelines .

  1. I recomend use partition for minimize the cost of intence merge operations
  2. write with coalece 1 this guarantee 1 file per partition but watch out with the cardinaliti of columns selected for partition

my case

  • Thank you Cristain.! I got the #1 – Rak Apr 21 '21 at 17:18
  • For #2, how to enforce the 1 file per partition? I thought will have to do the compaction on the delta after the Merge. – Rak Apr 21 '21 at 17:25
  • only create a table with the coalace(1) and check files by partition, this depends of you spark configuration and executors , see this : https://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv – Cristián Vargas Acevedo Apr 21 '21 at 17:39