31

Say I have a Spark DataFrame which I want to save as CSV file. After Spark 2.0.0 , DataFrameWriter class directly supports saving it as a CSV file.

The default behavior is to save the output in multiple part-*.csv files inside the path provided.

How would I save a DF with :

  1. Path mapping to the exact file name instead of folder
  2. Header available in first line
  3. Save as a single file instead of multiple files.

One way to deal with it, is to coalesce the DF and then save the file.

df.coalesce(1).write.option("header", "true").csv("sample_file.csv")

However this has disadvantage in collecting it on Master machine and needs to have a master with enough memory.

Is it possible to write a single CSV file without using coalesce ? If not, is there a efficient way than the above code ?

Spandan Brahmbhatt
  • 3,774
  • 6
  • 24
  • 36
  • Did you solve this issue? – pietrop Apr 11 '17 at 13:21
  • No. I am still using `.coalesce(1)` to create a single file. – Spandan Brahmbhatt Apr 11 '17 at 13:25
  • 3
    If the file is huge and you are worried about memory on master, then it seems having part files is better. Especially for further analysis, having one file misses the point of HDFS. I don't see a spark way of doing it other than `coalesce(1)` or `repartition(1)`. If you wanted multiple workers to append to the same file, they would have to do it sequentially or wait for each other to finish, or records would be out of order, that would be hard & annoying to have to orchestrate. – Davos May 16 '17 at 08:01
  • 1
    for some reason even when using '.coalesce(1)' or '.repartition(1)' I still get a folder as the output and not a single text file or csv – ukbaz May 26 '17 at 10:31
  • @ukbaz That is the default behavior. You will have a folder and inside it there will be a single file `part-*`. – Spandan Brahmbhatt May 30 '17 at 21:21
  • 3
    I simply do not understand why this question is asked continually. It totally misses the point of Spark and distributed computing in general. – user4601931 Aug 04 '17 at 03:21
  • @user4601931: Results if valuable need to be stored or published not discarded. We do not compute for the pure fun of watching CPU loads. This is not said to you as a joke but because people continually ask as you did to challenge our reasons or challenge people to produce justifications for storing results. – Geoffrey Anderson Jun 06 '18 at 18:32
  • @GeoffreyAnderson It's been a while since my comment, but I don't remember penning it with the intent of suggesting to discard results. Reading it again, I don't see how you could have gotten that impression. Perhaps spend some more time with Spark or other distributed frameworks, which store or publish or otherwise consume the output of a Spark application without caring how many files comprise the output, and then maybe you'll understand why "people continually ask as I did." – user4601931 Jun 07 '18 at 00:33
  • @GeoffreyAnderson Being less pedantic, if you do have a use for Spark where the output is small enough and must, under every circumstance, be read as a single file in the next step of whatever process, there's always `cat *.txt > my_one_single_file.txt`. – user4601931 Jun 07 '18 at 00:43
  • @SpandanBrahmbhatt check my solution works like a charm, in this belo link check for sri hari kali charan tummala answer (https://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv/59772398#59772398) – sri hari kali charan Tummala Jan 18 '20 at 16:38
  • This should not have been flagged as a duplicate. This question asks for "Path mapping to the exact file name instead of folder", which requires a completely different implementation than simply outputting a single file. I wrote a blog post explaining how to write out a single file with a specific name: https://mungingdata.com/apache-spark/output-one-file-csv-parquet/. It's a shame I can't answer this question because it's important for the Spark community. – Powers Jun 18 '20 at 15:07
  • Where can I find a good definition of `coalesce`? Googling returned, inter alia, the following, but I'm not sure if they are "valid" in the context of Spark, etc: https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions023.htm#SQLRF00617, https://www.w3schools.com/sql/func_sqlserver_coalesce.asp, http://www.hplsql.org/coalesce, https://docs.cloudera.com/runtime/7.2.2/impala-sql-reference/topics/impala-conditional-functions.html#conditional_functions__coalesce, http://www.thelandbeyondspreadsheets.com/what-is-the-hive-sql-coalesce-function-what-does-it-do-and-why-on-earth-is-it-useful/. – nutty about natty Jan 15 '21 at 16:54

8 Answers8

21

Just solved this myself using pyspark with dbutils to get the .csv and rename to the wanted filename.

save_location= "s3a://landing-bucket-test/export/"+year
csv_location = save_location+"temp.folder"
file_location = save_location+'export.csv'

df.repartition(1).write.csv(path=csv_location, mode="append", header="true")

file = dbutils.fs.ls(csv_location)[-1].path
dbutils.fs.cp(file, file_location)
dbutils.fs.rm(csv_location, recurse=True)

This answer can be improved by not using [-1], but the .csv seems to always be last in the folder. Simple and fast solution if you only work on smaller files and can use repartition(1) or coalesce(1).

Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156
user1217169
  • 389
  • 5
  • 12
14

Use: df.toPandas().to_csv("sample_file.csv", header=True)

See documentation for details: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame.toPandas

osbon123
  • 499
  • 3
  • 11
5
df.coalesce(1).write.option("inferSchema","true").csv("/newFolder",header = 
'true',dateFormat = "yyyy-MM-dd HH:mm:ss")
Mohammad Kanan
  • 4,452
  • 10
  • 23
  • 47
manny
  • 59
  • 1
  • 1
  • 4
    Welcome to Stackoverflow, Your code might help to answer but without you explain the essence of your answer code, its like a low quality answer please review how to []answer](https://stackoverflow.com/help/how-to-answer) – Mohammad Kanan May 21 '18 at 21:08
3

The following scala method works in local or client mode, and writes the df to a single csv of the chosen name. It requires that the df fit into memory, otherwise collect() will blow up.

import org.apache.hadoop.fs.{FileSystem, Path}

val SPARK_WRITE_LOCATION = some_directory
val SPARKSESSION = org.apache.spark.sql.SparkSession

def saveResults(results : DataFrame, filename: String) {
    var fs = FileSystem.get(this.SPARKSESSION.sparkContext.hadoopConfiguration)
    
    if (SPARKSESSION.conf.get("spark.master").toString.contains("local")) {
      fs = FileSystem.getLocal(new conf.Configuration())
    }
    
    val tempWritePath = new Path(SPARK_WRITE_LOCATION)
    
    if (fs.exists(tempWritePath)) {
    
      val x = fs.delete(new Path(SPARK_WRITE_LOCATION), true)
      assert(x)
    }
    
    if (results.count > 0) {
      val hadoopFilepath = new Path(SPARK_WRITE_LOCATION, filename)
      val writeStream = fs.create(hadoopFilepath, true)
      val bw = new BufferedWriter( new OutputStreamWriter( writeStream, "UTF-8" ) )
    
      val x = results.collect()
      for (row : Row <- x) {
        val rowString = row.mkString(start = "", sep = ",", end="\n")
        bw.write(rowString)
      }
    
      bw.close()
      writeStream.close()
    
      val resultsWritePath = new Path(WRITE_DIRECTORY, filename)
    
      if (fs.exists(resultsWritePath)) {
        fs.delete(resultsWritePath, true)
      }
      fs.copyToLocalFile(false, hadoopFilepath, resultsWritePath, true)
    } else {
      System.exit(-1)
    }
}
Bryan
  • 91
  • 1
  • 6
2

This solution is based on a Shell Script and is not parallelized, but is still very fast, especially on SSDs. It uses cat and output redirection on Unix systems. Suppose that the CSV directory containing partitions is located on /my/csv/dir and that the output file is /my/csv/output.csv:

#!/bin/bash
echo "col1,col2,col3" > /my/csv/output.csv
for i in /my/csv/dir/*.csv ; do
    echo "Processing $i"
    cat $i >> /my/csv/output.csv
    rm $i
done
echo "Done"

It will remove each partition after appending it to the final CSV in order to free space.

"col1,col2,col3" is the CSV header (here we have three columns of name col1, col2 and col3). You must tell Spark to don't put the header in each partition (this is accomplished with .option("header", "false") because the Shell Script will do it.

pietrop
  • 1,071
  • 2
  • 10
  • 27
2

For those still wanting to do this here's how I got it done using spark 2.1 in scala with some java.nio.file help.

Based on https://fullstackml.com/how-to-export-data-frame-from-apache-spark-3215274ee9d6

val df: org.apache.spark.sql.DataFrame = ??? // data frame to write
val file: java.nio.file.Path = ??? // target output file (i.e. 'out.csv')

import scala.collection.JavaConversions._

// write csv into temp directory which contains the additional spark output files
// could use Files.createTempDirectory instead
val tempDir = file.getParent.resolve(file.getFileName + "_tmp")
df.coalesce(1)
    .write.format("com.databricks.spark.csv")
    .option("header", "true")
    .save(tempDir.toAbsolutePath.toString)

// find the actual csv file
val tmpCsvFile = Files.walk(tempDir, 1).iterator().toSeq.find { p => 
    val fname = p.getFileName.toString
    fname.startsWith("part-00000") && fname.endsWith(".csv") && Files.isRegularFile(p)
}.get

// move to desired final path
Files.move(tmpCsvFile, file)

// delete temp directory
Files.walk(tempDir)
    .sorted(java.util.Comparator.reverseOrder())
    .iterator().toSeq
    .foreach(Files.delete(_))
Jaroslav Bezděk
  • 6,967
  • 6
  • 29
  • 46
Thien
  • 672
  • 5
  • 12
1

The FileUtil.copyMerge() from the Hadoop API should solve your problem.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}

See Write single CSV file using spark-csv

Community
  • 1
  • 1
shants
  • 612
  • 1
  • 7
  • 12
0

This is how distributed computing work! Multiple files inside a directory is exactly how distributed computing works, this is not a problem at all since all software can handle it.

Your question should be "how is it possible to download a CSV composed of multiple files?" -> there are already lof of solutions in SO.

Another approach could be to use Spark as a JDBC source (with the awesome Spark Thrift server), write a SQL query and transform the result to CSV.

In order to prevent OOM in the driver (since the driver will get ALL the data), use incremental collect (spark.sql.thriftServer.incrementalCollect=true), more info at http://www.russellspitzer.com/2017/05/19/Spark-Sql-Thriftserver/.


Small recap about Spark "data partition" concept:

INPUT (X PARTITIONs) -> COMPUTING (Y PARTITIONs) -> OUTPUT (Z PARTITIONs)

Between "stages", data can be transferred between partitions, this is the "shuffle". You want "Z" = 1, but with Y > 1, without shuffle? this is impossible.

Thomas Decaux
  • 21,738
  • 2
  • 113
  • 124