50

Say I have a Spark DF that I want to save to disk a CSV file. In Spark 2.0.0+, one can convert DataFrame(DataSet[Rows]) as a DataFrameWriter and use the .csv method to write the file.

The function is defined as

def csv(path: String): Unit
    path : the location/folder name and not the file name.

Spark stores the csv file at the location specified by creating CSV files with name - part-*.csv.

Is there a way to save the CSV with specified filename instead of part-*.csv ? Or possible to specify prefix to instead of part-r ?

Code :

df.coalesce(1).write.csv("sample_path")

Current Output :

sample_path
|
+-- part-r-00000.csv

Desired Output :

sample_path
|
+-- my_file.csv

Note : The coalesce function is used to output a single file and the executor has enough memory to collect the DF without memory error.

Amelio Vazquez-Reina
  • 91,494
  • 132
  • 359
  • 564
Spandan Brahmbhatt
  • 3,774
  • 6
  • 24
  • 36
  • Unfortunately I cannot answer here, so for java users: val fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); File dir = new File(System.getProperty("user.dir") + "/my.csv/"); File[] files = dir.listFiles((d, name) -> name.endsWith(".csv")); fs.rename(new Path(files[0].toURI()), new Path(System.getProperty("user.dir") + "/csvDirectory/newData.csv")); fs.delete(new Path(System.getProperty("user.dir") + "/my.csv/"), true); – Anna Klein Jul 20 '20 at 07:43
  • 1
    I get around this issue in pyspark by calling toPandas() and then just saving via a pandas dataframe. – Chris Ivan Feb 17 '22 at 04:23

1 Answers1

50

It's not possible to do it directly in Spark's save

Spark uses Hadoop File Format, which requires data to be partitioned - that's why you have part- files. You can easily change filename after processing just like in this question

In Scala it will look like:

import org.apache.hadoop.fs._
val fs = FileSystem.get(sc.hadoopConfiguration)
val file = fs.globStatus(new Path("path/file.csv/part*"))(0).getPath().getName()

fs.rename(new Path("csvDirectory/" + file), new Path("mydata.csv"))
fs.delete(new Path("mydata.csv-temp"), true)

or just:

import org.apache.hadoop.fs._
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.rename(new Path("csvDirectory/data.csv/part-0000"), new Path("csvDirectory/newData.csv"))

Edit: As mentioned in comments, you can also write your own OutputFormat, please see documents for information about this approach to set file name

Shafique Jamal
  • 1,550
  • 3
  • 21
  • 45
T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • @eliasah Didn't know it. But stil it's not out of the box in Spark – T. Gawęda Feb 03 '17 at 13:22
  • 1
    However good point, I will edit my answer later to show this possibility :) – T. Gawęda Feb 03 '17 at 13:25
  • it's always good to remind spark users that there is an underlying hadoop/hdfs layer – eliasah Feb 03 '17 at 13:28
  • I think you have two typos. (1) getting field values from scala arrays is in (brackets) not [square brackets]. So it should be fs.globStatus(new Path("path/file.csv/part*"))(0). (2) I think the sc.hadoopConfiguration does not take parameters so you should omit the () that you have at the end. My scala REPL throws this very clear error: error: org.apache.hadoop.conf.Configuration does not take parameters. But thanks for the answer! Super Helpful! – Max Oct 10 '17 at 21:47
  • @Max Thank you :) This code was ported from Java version, I've edited my answer :) – T. Gawęda Oct 11 '17 at 15:00
  • 2
    Thanks! consider these changes val file = fs.globStatus(new Path(s"$sinkDir/part*"))(0).getPath() fs.rename(file, new Path(s"$sinkDir/gremlin.fastq")) – Rubber Duck Apr 03 '19 at 11:50
  • 2
    sc.hadoopConfiguration does not take parameters in the second example as well, so parentheses () should be omitted there too :) – hooke Apr 23 '19 at 12:37
  • I'm new to spark + Hadoop.. what implication to resources and performance would this approach have? Will it all be executed on the spark Driver? – Jordan Simba Feb 02 '21 at 21:18
  • If I have multiple files in a folder, will `fs.globStatus(new Path("path/file.csv/part*"))(0).getPath().getName()` give me the latest one? – hipokito Nov 01 '21 at 13:17