36

For example, the result of this:

df.filter("project = 'en'").select("title","count").groupBy("title").sum()

would return an Array.

How to save a spark DataFrame as a csv file on disk ?

Shaido
  • 27,497
  • 23
  • 70
  • 73
Hello lad
  • 17,344
  • 46
  • 127
  • 200
  • 5
    btw this doesn't return an array, but a DataFrame! [reference here](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) – eliasah Oct 16 '15 at 20:42
  • If the answer given solves your question, please accept it and up-vote so we can class this question as resolved! – eliasah Sep 08 '20 at 15:36

4 Answers4

45

Apache Spark does not support native CSV output on disk.

You have four available solutions though:

  1. You can convert your Dataframe into an RDD :

    def convertToReadableString(r : Row) = ???
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
    

    This will create a folder filepath. Under the file path, you'll find partitions files (e.g part-000*)

    What I usually do if I want to append all the partitions into a big CSV is

    cat filePath/part* > mycsvfile.csv
    

    Some will use coalesce(1,false) to create one partition from the RDD. It's usually a bad practice, since it may overwhelm the driver by pulling all the data you are collecting to it.

    Note that df.rdd will return an RDD[Row].

  2. With Spark <2, you can use databricks spark-csv library:

    • Spark 1.4+:

      df.write.format("com.databricks.spark.csv").save(filepath)
      
    • Spark 1.3:

      df.save(filepath,"com.databricks.spark.csv")
      
  3. With Spark 2.x the spark-csv package is not needed as it's included in Spark.

    df.write.format("csv").save(filepath)
    
  4. You can convert to local Pandas data frame and use to_csv method (PySpark only).

Note: Solutions 1, 2 and 3 will result in CSV format files (part-*) generated by the underlying Hadoop API that Spark calls when you invoke save. You will have one part- file per partition.

eliasah
  • 39,588
  • 11
  • 124
  • 154
  • 1
    I think that `spark-csv` is preferred solution. It is not easy to create a correct csv line from scratch. All dialects and proper escaping can be quite tricky. – zero323 Oct 16 '15 at 15:58
  • 1
    In PySpark you can also convert small table to Pandas and save locally. but it probably a Scala question. – zero323 Oct 16 '15 at 16:00
  • If you feel like adding the information to the answer @zero323, please feel free to do so! – eliasah Oct 16 '15 at 16:01
  • 3
    Guys do you know if it is possible to avoid the hadoopish format and store data to a file under a file name or `s3` key name of my choice instead of the directory with `_SUCCES` and `part-*` ? – lisak May 19 '16 at 20:40
  • I posted solution using spark-csv – Ajk Aug 12 '16 at 08:28
  • Tried the 3rd point (for spark 2.x) to save csv on local machine with file:///path_to_file. But I am unable to locate the csv files anywhere. Have I missed anything? https://stackoverflow.com/questions/53307036/spark-scala-cannot-find-the-file-written-as-csv-in-local-storage – Mihir Mistry Nov 14 '18 at 19:54
  • @MihirMistry windows ? – eliasah Nov 14 '18 at 19:56
  • @eliasah Yes on windows with spark 2.1.1 and scala 2.11 You can check the code here: https://stackoverflow.com/questions/53307036/spark-scala-cannot-find-the-file-written-as-csv-in-local-storage – Mihir Mistry Nov 14 '18 at 19:57
27

Writing dataframe to disk as csv is similar read from csv. If you want your result as one file, you can use coalesce.

df.coalesce(1)
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("output/path")

If your result is an array you should use language specific solution, not spark dataframe api. Because all these kind of results return driver machine.

Erkan Şirin
  • 1,935
  • 18
  • 28
4

I had similar issue where i had to save the contents of the dataframe to a csv file of name which i defined. df.write("csv").save("<my-path>") was creating directory than file. So have to come up with the following solutions. Most of the code is taken from the following dataframe-to-csv with little modifications to the logic.

def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = {
    val tmpParquetDir = "Posts.tmp.parquet"

    df.repartition(1).write.
        format("com.databricks.spark.csv").
        option("header", header.toString).
        option("delimiter", sep).
        save(tmpParquetDir)

    val dir = new File(tmpParquetDir)
    val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
    val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
    (new File(tmpTsvFile)).renameTo(new File(tsvOutput))

    dir.listFiles.foreach( f => f.delete )
    dir.delete
    }
Jai Prakash
  • 2,461
  • 1
  • 26
  • 27
2

I had similar problem. I needed to write down csv file on driver while I was connect to cluster in client mode.

I wanted to reuse the same CSV parsing code as Apache Spark to avoid potential errors.

I checked spark-csv code and found code responsible for converting dataframe into raw csv RDD[String] in com.databricks.spark.csv.CsvSchemaRDD.

Sadly it is hardcoded with sc.textFile and the end of relevant method.

I copy-pasted that code and removed last lines with sc.textFile and returned RDD directly instead.

My code:

/*
  This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
  Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
  But in last lines of that method it's hardcoded against writing as text file -
  for our case we need RDD.
 */
object DataframeToRawCsvRDD {

  val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat

  def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
           (implicit ctx: ExecutionContext): RDD[String] = {
    val delimiter = parameters.getOrElse("delimiter", ",")
    val delimiterChar = if (delimiter.length == 1) {
      delimiter.charAt(0)
    } else {
      throw new Exception("Delimiter cannot be more than one character.")
    }

    val escape = parameters.getOrElse("escape", null)
    val escapeChar: Character = if (escape == null) {
      null
    } else if (escape.length == 1) {
      escape.charAt(0)
    } else {
      throw new Exception("Escape character cannot be more than one character.")
    }

    val quote = parameters.getOrElse("quote", "\"")
    val quoteChar: Character = if (quote == null) {
      null
    } else if (quote.length == 1) {
      quote.charAt(0)
    } else {
      throw new Exception("Quotation cannot be more than one character.")
    }

    val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
    val quoteMode: QuoteMode = if (quoteModeString == null) {
      null
    } else {
      QuoteMode.valueOf(quoteModeString.toUpperCase)
    }

    val nullValue = parameters.getOrElse("nullValue", "null")

    val csvFormat = defaultCsvFormat
      .withDelimiter(delimiterChar)
      .withQuote(quoteChar)
      .withEscape(escapeChar)
      .withQuoteMode(quoteMode)
      .withSkipHeaderRecord(false)
      .withNullString(nullValue)

    val generateHeader = parameters.getOrElse("header", "false").toBoolean
    val headerRdd = if (generateHeader) {
      ctx.sparkContext.parallelize(Seq(
        csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
      ))
    } else {
      ctx.sparkContext.emptyRDD[String]
    }

    val rowsRdd = dataFrame.rdd.map(row => {
      csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
    })

    headerRdd union rowsRdd
  }

}
Ajk
  • 520
  • 1
  • 5
  • 14