1

I have a spark dataframe named df, which is partitioned on the column date. I need to save on S3 this dataframe with the CSV format. When I write the dataframe, I need to delete the partitions (i.e. the dates) on S3 for which the dataframe has data to be written to. All the other partitions need to remain intact.

I saw here that this is exactly the job of the option spark.sql.sources.partitionoverwritemode set to dynamic.

However, it does not seem to work for me with CSV files.
If I use it on parquet with the following command it works perfectly:

df.write
.option("partitionOverwriteMode", "dynamic")
.partitionBy("date")
.format("parquet")
.mode("overwrite")
.save(output_dir)

But if I use it on CSV with the following command it does not work:

df.write
.option("partitionOverwriteMode", "dynamic")
.partitionBy("date")
.format("csv")
.mode("overwrite")
.save(output_dir)

Why is this the case? Any idea of how this behaviour could be implemented with CSV outputs?

aprospero
  • 529
  • 3
  • 14

3 Answers3

1

I need to delete the partitions (i.e. the dates) on S3 for which the dataframe has data to be written to

Assuming you have a convenient list of the dates you are processing you can use the replaceWhere option to determine the partitions to overwrite (delete and replace).

For example:

df.write
.partitionBy("date")
.option("replaceWhere", "date >= '2020-12-14' AND date <= '2020-12-15'")
.format("csv")
.mode("overwrite")
.save(output_dir)

A more dynamic way is if you have the start_date and end_date stored in variables:

start_date = "2022-01-01"
end_date = "2022-01-14"

condition = f"date >= '{start_date}' AND date <= '{end_date}'"

df.write
.partitionBy("date")
.option("replaceWhere", condition)
.format("csv")
.mode("overwrite")
.save(output_dir)
tjheslin1
  • 1,378
  • 6
  • 19
  • 36
  • I guess this also works with a list of dates placed in OR conditions. However, do you think there is any limitation with respect to the maximum number of dates? I.e. if I have 1000 separated dates with OR conditions do you think there is any perfomance limitations? – aprospero Apr 20 '22 at 14:41
  • There might be a limit, yes. If you share how you have these dates stored (e.g in `val`'s I can update my answer with the best way I know of as to how to incorporate them into the `replaceWhere`. – tjheslin1 Apr 20 '22 at 15:49
  • I just tried to add the condition but unfortunately it does not work. All the remaining partitions (which should not be touched by the condition) are deleted. Any help? – aprospero May 13 '22 at 07:07
  • Can you update your question with your updated query? I'll be able to update my answer accordingly from this. – tjheslin1 May 15 '22 at 08:40
0

What Spark version do you use? For Spark <2.0.0 it may seem impossible to use partitioning along with the csv format

mckraqs
  • 113
  • 7
  • 1
    I am on spark 3.2.1. I saw that "spark.sql.sources.partitionoverwritemode" works from 2.3.0, but not for CSV file formats apparently. – aprospero Apr 20 '22 at 14:46
0

If you are not on EMR, and are using the s3a committers to safely commit work to s3, then the partitioned committer can be set to delete all data in the destination partitions before committing new work, leaving all other partitions alone.

stevel
  • 12,567
  • 1
  • 39
  • 50
  • If I use "replace" with .option("fs.s3a.committer.staging.conflict-mode", "replace") but it does not seem to work. The partitions which should be untouched are deleted. – aprospero May 13 '22 at 07:31