8

I am trying to write a dataframe to s3 location after re-partitioning. But whenever the write stage fails and Spark retry the stage it throws FileAlreadyExistsException.

When I re-submit the job it works fine if spark completes the stage in one try.

Below is my code block

df.repartition(<some-value>).write.format("orc").option("compression", "zlib").mode("Overwrite").save(path)

I believe Spark should remove files from the failed stage before retry. I understand this will be solved if we set retry to zero but the spark stage is expected to fail and that would not be a proper solution.

Below is the error:

Job aborted due to stage failure: Task 0 in stage 6.1 failed 4 times, most recent failure: Lost task 0.3 in stage 6.1 (TID 740, ip-address, executor 170): org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://<bucket-name>/<path-to-object>/part-00000-c3c40a57-7a50-41da-9ce2-555753cab63a-c000.zlib.orc
    at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36)
    at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.plan(RegularUploadPlanner.java:30)
    at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.UploadPlannerChain.plan(UploadPlannerChain.java:37)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:601)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:932)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:242)
    at org.apache.orc.impl.PhysicalFsWriter.<init>(PhysicalFsWriter.java:95)
    at org.apache.orc.impl.WriterImpl.<init>(WriterImpl.java:170)
    at org.apache.orc.OrcFile.createWriter(OrcFile.java:843)
    at org.apache.orc.mapreduce.OrcOutputFormat.getRecordWriter(OrcOutputFormat.java:50)
    at org.apache.spark.sql.execution.datasources.orc.OrcOutputWriter.<init>(OrcOutputWriter.scala:43)
    at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anon$1.newInstance(OrcFileFormat.scala:121)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:233)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:

I am using Spark 2.4 with EMR, Please suggest the solution.

Edit 1: Please note the issue is not related to overwrite mode, I am already using it. As the question title suggests, the issue is with leftover files in case of stage failure. May be the Spark UI clears it. enter image description here

Arghya Saha
  • 227
  • 1
  • 4
  • 17
  • Might be useful: https://stackoverflow.com/questions/27033823/how-to-overwrite-the-output-directory-in-spark – Bitswazsky Aug 13 '19 at 06:15
  • Hi @Bitswazsky: Thanks but the issue is about stage failure, overwrite is working fine as mentioned. – Arghya Saha Aug 13 '19 at 06:18
  • @ArghyaSaha Can you clarify the following - When a task fails (because of any reason) after creating partial files, subsequent attempts of that task and that stage fails with FileAlreadyExists exception (because spark doesn't clean that file). Your goal is that Job/Stage should succeed in subsequent attempt. Is that right? – moriarty007 Aug 13 '19 at 06:35
  • @prakharjain That's correct, Now I do not want to re-submit the job or disable spark retry to fix it. I found an article but not sure if it works for my case. https://docs.qubole.com/en/latest/troubleshooting-guide/spark-ts/troubleshoot-spark.html#filealreadyexistsexception-in-spark-jobs – Arghya Saha Aug 13 '19 at 06:38
  • https://docs.qubole.com/en/latest/troubleshooting-guide/spark-ts/troubleshoot-spark.html#filealreadyexistsexception-in-spark-jobs - These configs are specific to Qubole's Spark offering and won't work on EMR. – moriarty007 Aug 13 '19 at 06:45
  • How about spark.hadoop.mapreduce.output.textoutputformat.overwrite=true as its not specific to Qubole, but it says it works with DirectFileOutputCommitter (DFOC), not sure what that mean! – Arghya Saha Aug 13 '19 at 06:54
  • "spark.hadoop.mapreduce.output.textoutputformat.overwrite" one is also available in Qubole's Spark offering. – moriarty007 Aug 14 '19 at 05:33

1 Answers1

8

Set spark.hadoop.orc.overwrite.output.file=true in your Spark Config.

You can find more details on this config here - OrcConf.java

QuickSilver
  • 3,915
  • 2
  • 13
  • 29
moriarty007
  • 2,054
  • 16
  • 20
  • 2
    Thanks for answering but the issue is not with overwrite, check my code in question, I am already using it. The issue is leftover file from stage failure. – Arghya Saha Aug 13 '19 at 06:20
  • Can you tell the value of these configs in your setup - "spark.sql.sources.outputCommitterClass", "spark.sql.sources.commitProtocolClass" ? – moriarty007 Aug 13 '19 at 06:21
  • @ArghyaSaha The overwrite that I have suggested in my answer is different from what you have done. This is not SaveMode.Overwrite. Using this "spark.hadoop.orc.overwrite.output.file" config, we are telling underlying orc writer to overwrite the already existing file. This will prevent the FIleAlreadyExist exception and your job should pass. – moriarty007 Aug 13 '19 at 06:44
  • All right, I will try this config and get back. Thanks! – Arghya Saha Aug 13 '19 at 06:49
  • 1
    This seems to be working, I accepted it as answer. Just wanted to check if there is similar property for other file time ? – Arghya Saha Aug 18 '19 at 04:34
  • 3
    I faced a similar problem a while back -- [my answer](https://stackoverflow.com/a/62926226/3728233) might help for a more generic solution. – sbrk Jul 31 '20 at 07:37
  • 1
    @moriarty007 would you happen to know if a similar config exists for csv.gz files? or where I should look to find the config path? – J Schmidt Mar 23 '21 at 18:53