33

I'm trying to find an effective way of saving the result of my Spark Job as a csv file. I'm using Spark with Hadoop and so far all my files are saved as part-00000.

Any ideas how to make my spark saving to file with a specified file name?

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Karusmeister
  • 871
  • 2
  • 12
  • 25
  • Possible duplicate of [How to write the resulting RDD to a csv file in Spark python](http://stackoverflow.com/questions/31898964/how-to-write-the-resulting-rdd-to-a-csv-file-in-spark-python) – gsamaras Aug 02 '16 at 03:06
  • @gsamaras given the timing, that question might a possible duplicate of this one :] – Karusmeister Aug 24 '16 at 15:41
  • The content matters to be more, than the timing, but that's fine. You made a great question, that's why I upvoted too! :) – gsamaras Aug 24 '16 at 16:50
  • Possible duplicate of [Write single CSV file using spark-csv](https://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv) – mrsrinivas Sep 07 '17 at 06:28

5 Answers5

53

Since Spark uses Hadoop File System API to write data to files, this is sort of inevitable. If you do

rdd.saveAsTextFile("foo")

It will be saved as "foo/part-XXXXX" with one part-* file every partition in the RDD you are trying to save. The reason each partition in the RDD is written a separate file is for fault-tolerance. If the task writing 3rd partition (i.e. to part-00002) fails, Spark simply re-run the task and overwrite the partially written/corrupted part-00002, with no effect on other parts. If they all wrote to the same file, then it is much harder recover a single task for failures.

The part-XXXXX files are usually not a problem if you are going to consume it again in Spark / Hadoop-based frameworks because since they all use HDFS API, if you ask them to read "foo", they will all read all the part-XXXXX files inside foo as well.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Tathagata Das
  • 1,808
  • 15
  • 13
10

I'll suggest to do it in this way (Java example):

theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName);
FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder);
FileUtil.copyMerge(
    fs, new Path(textFileName),
    fs, new Path(textFileNameDestiny),
    true, fs.getConf(), null);
nhahtdh
  • 55,989
  • 15
  • 126
  • 162
adoalonso
  • 187
  • 1
  • 7
2

Extending Tathagata Das answer to Spark 2.x and Scala 2.11

Using Spark SQL we can do this in one liner

//implicits for magic functions like .toDf
import spark.implicits._

val df = Seq(
  ("first", 2.0),
  ("choose", 7.0),
  ("test", 1.5)
).toDF("name", "vals")

//write DataFrame/DataSet to external storage
df.write
  .format("csv")
  .save("csv/file/location")

Then you can go head and proceed with adoalonso's answer.

Aniket Kulkarni
  • 12,825
  • 9
  • 67
  • 90
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
1

I have an idea, but not ready code snippet. Internally (as name suggest) Spark uses Hadoop output format. (as well as InputFormat when reading from HDFS).

In the hadoop's FileOutputFormat there is protected member setOutputFormat, which you can call from the inherited class to set other base name.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
David Gruzman
  • 7,900
  • 1
  • 28
  • 30
0

It's not really a clean solution, but inside a foreachRDD() you can basically do whatever you like, also create a new file.

In my solution this is what I do: I save the output on HDFS (for fault tolerance reasons), and inside a foreachRDD I also create a TSV file with statistics in a local folder.

I think you could probably do the same if that's what you need.

http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
gprivitera
  • 933
  • 1
  • 8
  • 22