52

This command works with HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;

But with Spark SQL I'm getting an error with an org.apache.spark.sql.hive.HiveQl stack trace:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

Please guide me to write export to CSV feature in Spark SQL.

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
shashankS
  • 1,043
  • 1
  • 11
  • 21
  • This question/answer not solves the problem for Spark 2.x... the real problem is **to export to standard CSV format**. Please [answer here](https://stackoverflow.com/q/58142220/287948). – Peter Krauss Sep 28 '19 at 00:40

7 Answers7

88

You can use below statement to write the contents of dataframe in CSV format df.write.csv("/data/home/csv")

If you need to write the whole dataframe into a single CSV file, then use df.coalesce(1).write.csv("/data/home/sample.csv")

For spark 1.x, you can use spark-csv to write the results into CSV files

Below scala snippet would help

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")

To write the contents into a single file

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
sag
  • 5,333
  • 8
  • 54
  • 91
  • 2
    I tried the coalesce thing you mentioned. It creates a directory at the specified path with a "part" file and a file called "_SUCCESS". Do you know of a way to actually only get the one file? – Robert Hickman Jan 19 '18 at 01:24
  • No, I think there is no way to do it. – sag Jan 19 '18 at 08:43
  • 1
    it will not be local file but hdfs file – Alex B Sep 04 '18 at 18:53
  • I found a bug in this code, my original directory with partitions csv has 1 extra column when compared to the single csv generated by this code. I know the code works for trivial cases but my last 2 columns were of the format `concat('"', concat_ws(",", collect_list(some_column)), '"')` which worked fine on insert overwrite but not when I selected all the columns and wrote to this format, even the header was correct but it incorrectly identified the second last column values to fill both and ignored the rest – devssh Sep 06 '18 at 19:41
  • This is how my csv partitons looked before `"USR",0,0,""css","shell","html","python","javascript"","381534,3960,1683,229869,1569090"` and this is how they look like now `"\"USR\"",0,0,"\"\"css\"","\"shell\""` – devssh Sep 06 '18 at 19:48
  • I fixed it following this https://stackoverflow.com/questions/44395363/how-to-include-double-quotes-in-spark-sql-concat – devssh Sep 06 '18 at 20:02
  • `repartition(1)` might be faster than `coalesce(1)`, especially if there's heavy computation beforehand. – Leighton May 11 '20 at 21:48
50

Since Spark 2.X spark-csv is integrated as native datasource. Therefore, the necessary statement simplifies to (windows)

df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")

or UNIX

df.write
  .option("header", "true")
  .csv("/var/out.csv")

Notice: as the comments say, it is creating the directory by that name with the partitions in it, not a standard CSV file. This, however, is most likely what you want since otherwise your either crashing your driver (out of RAM) or you could be working with a non distributed environment.

Boern
  • 7,233
  • 5
  • 55
  • 86
  • 1
    Hi all, Is there a way to replace the file as it fails when it tries to rewrite the file. – user3341078 Jun 14 '17 at 09:53
  • 5
    Sure ! `.mode("overwrite").csv("/var/out.csv")` – Boern Jun 14 '17 at 11:46
  • 2
    In Spark 2.x it is creating the directory by that name. Any help? – GadaaDhaariGeek Feb 18 '19 at 13:53
  • 1
    My guess is that your partitions are inside that directory. – Boern Feb 18 '19 at 14:10
  • But **it is not a standard CSV file**, it is producing a folder with strange files (!). See https://stackoverflow.com/q/58142220/287948 – Peter Krauss Sep 28 '19 at 00:29
  • If you're using Spark because you're working with "big" datasets, you probably don't want to anything like `coalesce(1)` or `toPandas()` since that will most likely crash your driver (since the whole dataset has to fit in the drivers RAM). On the other hand: If your data does fit into the RAM of a single machine - why are you torturing yourself with distributed computing? – Boern Nov 25 '19 at 14:44
32

The answer above with spark-csv is correct but there is an issue - the library creates several files based on the data frame partitioning. And this is not what we usually need. So, you can combine all partitions to one:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

and rename the output of the lib (name "part-00000") to a desire filename.

This blog post provides more details: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

dirceusemighini
  • 1,344
  • 2
  • 16
  • 35
Dmitry Petrov
  • 1,490
  • 1
  • 19
  • 34
  • 2
    One can add model as well, if one wishes to keep writing to an existing file. `resultDF.repartition(1).write.mode("append").format("com.databricks.spark.csv").option("header", "true").save("s3://...")` – Pramit Sep 15 '16 at 17:38
  • 5
    `coalesce(1)` requires the dataset to fit into the heap of a single machine and will most likely cause issues when working with large datasets – Boern Feb 16 '17 at 08:22
  • @DmitryPetrov Do we need to mention write.format("com...") option when including coalesce option ? – JKC Aug 21 '17 at 07:16
  • @JKC Yes, coalesce(1) just repartitions to a single partition (file). – Dmitry Petrov Aug 21 '17 at 08:24
  • @DmitryPetrov I understand that coalesce(1) just repartitions it to a single partition file but do we need to explicitly mention the write.format option in Spark 2.x when we use coalesce option ? – JKC Aug 21 '17 at 08:29
11

The simplest way is to map over the DataFrame's RDD and use mkString:

  df.rdd.map(x=>x.mkString(","))

As of Spark 1.5 (or even before that) df.map(r=>r.mkString(",")) would do the same if you want CSV escaping you can use apache commons lang for that. e.g. here's the code we're using

 def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = {

    def trimColumnLength(c: String) = {
      val col = maxColumnLength match {
        case None => c
        case Some(len: Int) => c.take(len)
      }
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    }
    def rowToString(r: Row) = {
      val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    }

    def addHeader(r: RDD[String]) = {
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    }

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  }
Arnon Rotem-Gal-Oz
  • 25,469
  • 3
  • 45
  • 68
  • 2
    While this is the simplest answer (and a good one), if you're text has double-quotes, you'll have to account for them. – devonlazarus Aug 12 '15 at 05:05
  • Simply getting the error after create RDD for the table scala> df.rdd.map(x=>x.mkString(",")); :18: error: value rdd is not a member of org.apache.spark.sql.SchemaRDD df.rdd.map(x=>x.mkString(",")); – shashankS Aug 28 '15 at 10:47
2

With the help of spark-csv we can write to a CSV file.

val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
1

The error message suggests this is not a supported feature in the query language. But you can save a DataFrame in any format as usual through the RDD interface (df.rdd.saveAsTextFile). Or you can check out https://github.com/databricks/spark-csv.

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • scala> df.write.format("com.databricks.spark.csv").save("/data/home.csv") :18: error: value write is not a member of org.apache.spark.sql.SchemaRDD Do I need to build current jar with databricks package again? – shashankS Aug 28 '15 at 10:49
  • `DataFrame.write` was added in Apache Spark 1.4.0. – Daniel Darabos Aug 28 '15 at 12:47
-3

IN DATAFRAME:

val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")