134

I have a spark streaming application which produces a dataset for every minute. I need to save/overwrite the results of the processed data.

When I tried to overwrite the dataset org.apache.hadoop.mapred.FileAlreadyExistsException stops the execution.

I set the Spark property set("spark.files.overwrite","true") , but there is no luck.

How to overwrite or Predelete the files from spark?

maasg
  • 37,100
  • 11
  • 88
  • 115
Vijay Innamuri
  • 4,242
  • 7
  • 42
  • 67

9 Answers9

129

UPDATE: Suggest using Dataframes, plus something like ... .write.mode(SaveMode.Overwrite) ....

Handy pimp:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

For older versions try

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

In 1.1.0 you can set conf settings using the spark-submit script with the --conf flag.

WARNING (older versions): According to @piggybox there is a bug in Spark where it will only overwrite files it needs to to write it's part- files, any other files will be left unremoved.

samthebest
  • 30,803
  • 25
  • 102
  • 142
  • 37
    For `Spark 1.4`: `df.write.mode(SaveMode.Overwrite).parquet(path)` – Minh Ha Pham Jul 17 '15 at 07:43
  • For Spark SQL, you have options to define the SaveMode for Core Spark you don't have anything like that. Would really like to some of that kind of feature for saveAsTextFile and other transformations – Murtaza Kanchwala Aug 04 '15 at 08:36
  • 4
    A hidden problem: comparing to @pzecevic's solution to wipe out the whole folder through HDFS, in this approach Spark will only overwrite the part files with the same file name in the output folder. This works most of time, but if there are something else such as extra part files from another Spark/Hadoop job in the folder this will not overwrite these files. – piggybox Nov 06 '15 at 19:13
  • @piggybox Well that sounds like a bug in Spark. It should wipe out the whole folder, if it doesn't it's pretty unintuitive, especially since this is how Scalding works and how Spark used to work up to 0.9 I think. Having to do a manual delete is utterly ridiculous, imagine having to do a `rm` every time we did a `mv`!! Better raise a JIRA :). – samthebest Nov 06 '15 at 19:44
  • 7
    You can also use `df.write.mode(mode: String).parquet(path)` Where mode: String can be: "overwrite", "append", "ignore", "error". – rye Jun 09 '16 at 16:07
  • but what if we want to save `rdd` data? You mean, we need to first convert `rdd` to `df`? – avocado Sep 09 '17 at 11:52
  • 1
    @avocado Yup think so, the Spark APIs just get worse and worse on every release :P – samthebest Sep 10 '17 at 12:55
58

since df.save(path, source, mode) is deprecated, (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame)

use df.write.format(source).mode("overwrite").save(path)
where df.write is DataFrameWriter

'source' can be ("com.databricks.spark.avro" | "parquet" | "json")

Curycu
  • 1,545
  • 17
  • 21
30

From the pyspark.sql.DataFrame.save documentation (currently at 1.3.1), you can specify mode='overwrite' when saving a DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

I've verified that this will even remove left over partition files. So if you had say 10 partitions/files originally, but then overwrote the folder with a DataFrame that only had 6 partitions, the resulting folder will have the 6 partitions/files.

See the Spark SQL documentation for more information about the mode options.

dnlbrky
  • 9,396
  • 2
  • 51
  • 64
  • 2
    True and helpful, thanks, but a DataFrame specific solution - `spark.hadoop.validateOutputSpecs` will work across all Spark APIs. – samthebest Jul 17 '15 at 10:02
  • For some reason, `spark.hadoop.validateOutputSpecs` didn't work for me on 1.3, but this does. – Eric Walker Sep 02 '15 at 23:01
  • 1
    @samthebest With the `save(... , mode=` route, you could overwrite one set of files, append another, etc. within the same Spark context. Wouldn't `spark.hadoop.validateOutputSpecs` limit you to just one mode per context? – dnlbrky Sep 03 '15 at 15:20
  • 1
    @dnlbrky The OP didn't ask to append. As I said, true, useful, but unnecessary. If the OP asked "how do I append" then a whole range of answers could be given. But let's not go into that. Also I advise you consider using the Scala version of DataFrames as it has type safety and more checking - for example if you had a typo in "overwrite" you wouldn't find out until that DAG was evaluated - which in a Big Data job could be 2 hours later!! If you use the Scala version the compiler will check everything up front! Pretty cool, and very important for Big Data. – samthebest Sep 05 '15 at 13:38
28

df.write.mode('overwrite').parquet("/output/folder/path") works if you want to overwrite a parquet file using python. This is in spark 1.6.2. API may be different in later versions

AlexLordThorsen
  • 8,057
  • 5
  • 48
  • 103
akn
  • 587
  • 1
  • 7
  • 15
27

The documentation for the parameter spark.files.overwrite says this: "Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source." So it has no effect on saveAsTextFiles method.

You could do this before saving the file:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Aas explained here: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html

Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
pzecevic
  • 2,807
  • 22
  • 21
4
  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)
vaquar khan
  • 10,864
  • 5
  • 72
  • 96
3

This overloaded version of the save function works for me:

yourDF.save(outputPath, org.apache.spark.sql.SaveMode.valueOf("Overwrite"))

The example above would overwrite an existing folder. The savemode can take these parameters as well (https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html):

Append: Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

ErrorIfExists: ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.

Ignore: Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.

Shay
  • 505
  • 1
  • 3
  • 19
3

Spark – Overwrite the output directory:

Spark by default doesn’t overwrite the output directory on S3, HDFS, and any other file systems, when you try to write the DataFrame contents to an existing directory, Spark returns runtime error hence. To overcome this Spark provides an enumeration org.apache.spark.sql.SaveMode.Overwrite to overwrite the existing folder.

We need to use this Overwrite as an argument to mode() function of the DataFrameWrite class, for example.

df. write.mode(SaveMode.Overwrite).csv("/tmp/out/foldername")

or you can use the overwrite string.

df.write.mode("overwrite").csv("/tmp/out/foldername")

Besides Overwrite, SaveMode also offers other modes like SaveMode.Append, SaveMode.ErrorIfExists and SaveMode.Ignore

For older versions of Spark, you can use the following to overwrite the output directory with the RDD contents.

sparkConf.set("spark.hadoop.validateOutputSpecs", "false") val sparkContext = SparkContext(sparkConf)

1

If you are willing to use your own custom output format, you would be able to get the desired behaviour with RDD as well.

Have a look at the following classes: FileOutputFormat, FileOutputCommitter

In file output format you have a method named checkOutputSpecs, which is checking whether the output directory exists. In FileOutputCommitter you have the commitJob which is usually transferring data from the temporary directory to its final place.

I wasn't able to verify it yet (would do it, as soon as I have few free minutes) but theoretically: If I extend FileOutputFormat and override checkOutputSpecs to a method that doesn't throw exception on directory already exists, and adjust the commitJob method of my custom output committer to perform which ever logic that I want (e.g. Override some of the files, append others) than I may be able to achieve the desired behaviour with RDDs as well.

The output format is passed to: saveAsNewAPIHadoopFile (which is the method saveAsTextFile called as well to actually save the files). And the Output committer is configured at the application level.

Michael Kopaniov
  • 957
  • 9
  • 17
  • I would avoid going near subclassing FileOutputCommitter if you can help it: that's a scary bit of code. Hadoop 3.0 adds a plugin point where FileOutputFormat can take different implementations of a refactored superclass (PathOutputCommitter). The S3 one from Netflix will write-in-place into a partitioned tree, only doing conflict resolution (fail, delete, add) at job commit, and only in the updated partitions – stevel Jan 20 '18 at 12:47