2

I am trying to save a data frame as CSV file in my local drive. But, when I do that so, I get a folder generated and within that partition files were written. Is there any suggestion to overcome this ?

My Requirement: To get a normal csv file with actual name given in the code.

Code Snippet: dataframe.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("E:/dataframe.csv")

Ramkumar
  • 444
  • 1
  • 7
  • 22

3 Answers3

1

TL:DR You are trying to enforce sequential, in-core concepts on a distribute enviornment. It cannot end up well.

Spark doesn't provide utility like this one. To be able to create one in a semi distributed fashion, you'd have to implement multistep, source dependent protocol where:

  • You write header.
  • You write data files for each partition.
  • You merge the files, and give a new name.

Since this has limited applications, is useful only for smallish files, and can be very expensive with some sources (like object stores) nothing like this is implemented in Spark.

You can of course collect data, use standard CSV parser (Univoicity, Apache Commons) and then put to the storage of your choice. This is sequential and requires multiple data transfers.

0

There is no automatic way to do this. I see two solutions

  • If the local directory is mounted on all the executors: Write the file as you did, but then move/rename the part-*csv file to the desired name
  • Or if the directory is not available on all executors: collect the dataframe to the driver and then create the file using plain scala

But both solutions kind of destroy parallelism and thus the goal of spark.

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
0

It is not possible but you can do somethings like this:

dataframe.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("E:/data/")

import org.apache.hadoop.fs._
val fs = FileSystem.get(sc.hadoopConfiguration)
val filePath = "E:/data/"
val fileName = fs.globStatus(new Path(filePath+"part*"))(0).getPath.getName
fs.rename(new Path(filePath+fileName), new Path(filePath+"dataframe.csv"))
Souvik
  • 377
  • 4
  • 16