1

I want to write data in delta tables incrementally while replacing (overwriting) partitions already present in sink. Example: Consider this data inside my delta table already partionned by id column:

+---+---+
| id|  x|
+---+---+
|  1|  A|
|  2|  B|
|  3|  C|
+---+---+

Now, I would like to insert the following dataframe:

+---+---------+
| id|        x|
+---+---------+
|  2|      NEW|
|  2|      NEW|
|  4|        D|
|  5|        E|
+---+---------+

The desired output is this

+---+---------+
| id|        x|
+---+---------+
|  1|        A|
|  2|      NEW|
|  2|      NEW|
|  3|        C|
|  4|        D|
|  5|        E|
+---+---------+

What I did is the following:

df = spark.read.format("csv").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/in/input.csv")
Ids=[x.id for x in df.select("id").distinct().collect()]
for Id in Ids:
  df.filter(df.id==Id).write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id == '$i'".format(i=Id)).mode("append").save("/mnt/blob/datafinance/bronze/simba/test/res/")
spark.read.format("delta").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/res/").show()

And this is the result:

+---+---------+
| id|        x|
+---+---------+
|  2|        B|
|  1|        A|
|  5|        E|
|  2|      NEW|
|  2|NEW AUSSI|
|  3|        C|
|  4|        D|
+---+---------+

As you can see it appended all value without replacing the partition id=2 which was already present in table.

I think it is because of mode("append"). But changing it to mode("overwrite") throws the following error:

Data written out does not match replaceWhere 'id == '$i''.

Can anyone tell me how to achieve what I want please ?

Thank you.

wohlstad
  • 12,661
  • 10
  • 26
  • 39
Haha
  • 973
  • 16
  • 43
  • I think you should use [dynamic overwrite option](https://stackoverflow.com/a/50006527/6080276). I did an answer about [replaceWhere option](https://stackoverflow.com/questions/67540077/would-replacewhere-causes-deletion/67542956#67542956), maybe you can understand why your process isn't working as expected – Kafels Aug 13 '21 at 11:04
  • Is my code not suitable for such use case ? Can you please apply your method on my case ? Seems the same thing for me. – Haha Aug 13 '21 at 12:21
  • Dynamic overwrite doesn't need to filter, it's only `df.write.save('path', format='delta', mode='overwrite')` and Spark does the work for you. `replaceWhere` might be useful when you are dealing with date partitions or range values – Kafels Aug 13 '21 at 12:36
  • So I replaced `df.filter(df.id==Id).write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id == '$i'".format(i=Id)).mode("append").save("/mnt/blob/datafinance/bronze/simba/test/res/")` with `spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") and df.write.partitionBy(PartitionKey).save('/mnt/blob/datafinance/bronze/simba/test/res/', format='delta', mode='overwrite')` and it overwrote the whole dataframe and not only id=2 partition – Haha Aug 13 '21 at 12:52
  • Delete and update is the case here – thebluephantom Aug 13 '21 at 15:04
  • Do you mean that I should delete id=2 partitions and then reinsert all ? Not possible to do it with replaceWhere ? – Haha Aug 13 '21 at 15:21

1 Answers1

0

I actually had an error in the code. I replaced

.option("replaceWhere", "id == '$i'".format(i=idd))

with

.option("replaceWhere", "id == '{i}'".format(i=idd))

and it worked.

Thanks to @ggordon who noticed me about the error on another question.

Haha
  • 973
  • 16
  • 43