55

Suppose that df is a dataframe in Spark. The way to write df into a single CSV file is

df.coalesce(1).write.option("header", "true").csv("name.csv")

This will write the dataframe into a CSV file contained in a folder called name.csv but the actual CSV file will be called something like part-00000-af091215-57c0-45c4-a521-cd7d9afb5e54.csv.

I would like to know if it is possible to avoid the folder name.csv and to have the actual CSV file called name.csv and not part-00000-af091215-57c0-45c4-a521-cd7d9afb5e54.csv. The reason is that I need to write several CSV files which later on I will read together in Python, but my Python code makes use of the actual CSV names and also needs to have all the single CSV files in a folder (and not a folder of folders).

Any help is appreciated.

antonioACR1
  • 1,303
  • 2
  • 15
  • 28
  • 1
    Possible duplicate of [Write single CSV file using spark-csv](http://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv) – T. Gawęda Apr 27 '17 at 15:17
  • 5
    Sorry but I think my question is different because I already know how to write a single CSV file but I don't want the folder that you get at the end and I want the CSV file called as I specified, not the folder – antonioACR1 Apr 27 '17 at 15:32
  • Still you can use `copyMerge`, as suggested in answers in that question to copy to one file in new directory – T. Gawęda Apr 27 '17 at 15:40
  • copyMerge is being removed in 3.0 lib. – woot Oct 18 '17 at 03:44
  • df.toPandas().to_csv("sample.csv",index=False) – Zerzavot Jul 22 '23 at 15:33

10 Answers10

11

A possible solution could be convert the Spark dataframe to a pandas dataframe and save it as csv:

df.toPandas().to_csv("<path>/<filename>")

EDIT: As caujka or snark suggest, this works for small dataframes that fits into driver. It works for real cases that you want to save aggregated data or a sample of the dataframe. Don't use this method for big datasets.

Paul Vbl
  • 147
  • 2
  • 8
  • 3
    I assume that works for Pyspark but what about Spark? – antonioACR1 Sep 10 '18 at 16:45
  • I've never tried but if your data is small you can collect it and after it save the array as a csv with pure scala methods as this questions shows: [How to write a file in Scala](https://stackoverflow.com/questions/43870910/how-to-store-an-arraystring-to-an-output-file) – Paul Vbl Sep 11 '18 at 08:36
  • Above command adds index column. so just in case if you need header and don't need index column and want to go by original schema, try this: df.toPandas().to_csv("/", header=True, index=False) – vmorusu Jan 24 '19 at 19:15
  • 10
    This solution is potentially dangerous as all of the data in the Spark DataFrame would be loaded into a single Pandas DataFrame on the Spark driver node. So you could run out of memory on the driver node if you have lots of data! See http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toPandas – snark Feb 13 '19 at 10:45
  • 1
    People with a real use case for using Spark likely don't have small enough data to use this method. I'd ignore this answer. – alex Oct 27 '20 at 15:48
5

If you want to use only the python standard library this is an easy function that will write to a single file. You don't have to mess with tempfiles or going through another dir.

import csv

def spark_to_csv(df, file_path):
    """ Converts spark dataframe to CSV file """
    with open(file_path, "w") as f:
        writer = csv.DictWriter(f, fieldnames=df.columns)
        writer.writerow(dict(zip(fieldnames, fieldnames)))
        for row in df.toLocalIterator():
            writer.writerow(row.asDict())
smw
  • 447
  • 6
  • 14
3

If the result size is comparable to spark driver node's free memory, you may have problems with converting the dataframe to pandas.

I would tell spark to save to some temporary location, and then copy the individual csv files into desired folder. Something like this:

import os
import shutil

TEMPORARY_TARGET="big/storage/name"
DESIRED_TARGET="/export/report.csv"

df.coalesce(1).write.option("header", "true").csv(TEMPORARY_TARGET)

part_filename = next(entry for entry in os.listdir(TEMPORARY_TARGET) if entry.startswith('part-'))
temporary_csv = os.path.join(TEMPORARY_TARGET, part_filename)

shutil.copyfile(temporary_csv, DESIRED_TARGET)

If you work with databricks, spark operates with files like dbfs:/mnt/..., and to use python's file operations on them, you need to change the path into /dbfs/mnt/... or (more native to databricks) replace shutil.copyfile with dbutils.fs.cp.

caujka
  • 121
  • 4
  • 1
    For a more databricks native approach, I replaced the use of `shutil` with `dbutils.fs.cp` so I could avoid messing with `/dbfs/` vs `dbfs:` – lukew Mar 31 '20 at 05:25
  • 1
    I ended up using this solution on a regular pysaprk install. One more line I added to the code at the bottom :) ```shutil.rmtree(TEMPORARY_TARGET)``` – Bikash Gyawali Dec 15 '20 at 14:14
2

There is no dataframe spark API which writes/creates a single file instead of directory as a result of write operation.

Below both options will create one single file inside directory along with standard files (_SUCCESS , _committed , _started).

 1. df.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header",
    "true").csv("PATH/FOLDER_NAME/x.csv")  



2. df.repartition(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header",
        "true").csv("PATH/FOLDER_NAME/x.csv")

If you don't use coalesce(1) or repartition(1) and take advantage of sparks parallelism for writing files then it will create multiple data files inside directory.

You need to write function in driver which will combine all data file parts to single file(cat part-00000* singlefilename ) once write operation is done.

vikrant rana
  • 4,509
  • 6
  • 32
  • 72
Ravi
  • 88
  • 7
2

A more databricks'y' solution is here:

TEMPORARY_TARGET="dbfs:/my_folder/filename"
DESIRED_TARGET="dbfs:/my_folder/filename.csv"

spark_df.coalesce(1).write.option("header", "true").csv(TEMPORARY_TARGET)

temporary_csv = os.path.join(TEMPORARY_TARGET, dbutils.fs.ls(TEMPORARY_TARGET)[3][1])

dbutils.fs.cp(temporary_csv, DESIRED_TARGET)

Note if you are working from Koalas data frame you can replace spark_df with koalas_df.to_spark()

UsAndRufus
  • 385
  • 5
  • 14
2

For pyspark, you can convert to pandas dataframe and then save it.

df.toPandas().to_csv("<path>/<filename.csv>", header=True, index=False)

Rohit Anil
  • 236
  • 1
  • 11
1

I had the same problem and used python's NamedTemporaryFile library to solve this.

from tempfile import NamedTemporaryFile

s3 = boto3.resource('s3')

with NamedTemporaryFile() as tmp:
  df.coalesce(1).write.format('csv').options(header=True).save(tmp.name)
  s3.meta.client.upload_file(tmp.name, S3_BUCKET, S3_FOLDER + 'name.csv')

See this documentation for more info on the upload_file() method.

UsAndRufus
  • 385
  • 5
  • 14
renzo
  • 23
  • 6
  • I don't think this will work, as `tmp.name` will be the _folder_ in which the CSV is output, and that _folder_ will then be moved to S3, yielding `S3_BUCKET/S3_FOLDER/tmp.name/part-0000-some-hash.csv`. – ijoseph Apr 30 '20 at 20:41
1

Create temp folder inside output folder. Copy file part-00000* with the file name to output folder. Delete the temp folder. Python code snippet to do the same in Databricks.

fpath=output+'/'+'temp'

def file_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

if file_exists(fpath):
  dbutils.fs.rm(fpath)
  df.coalesce(1).write.option("header", "true").csv(fpath)
else:
  df.coalesce(1).write.option("header", "true").csv(fpath)

fname=([x.name for x in dbutils.fs.ls(fpath) if x.name.startswith('part-00000')])
dbutils.fs.cp(fpath+"/"+fname[0], output+"/"+"name.csv")
dbutils.fs.rm(fpath, True) 

UsAndRufus
  • 385
  • 5
  • 14
nl09
  • 93
  • 1
  • 9
0

You can go with pyarrow, as it provides file pointer for hdfs file system. You can write your content to file pointer as a usual file writing. Code example:

import pyarrow.fs as fs


HDFS_HOST: str = 'hdfs://<your_hdfs_name_service>'
FILENAME_PATH: str = '/user/your/hdfs/file/path/<file_name>'


hadoop_file_system = fs.HadoopFileSystem(host=HDFS_HOST)

with hadoop_file_system.open_output_stream(path=FILENAME_PATH) as f:
    f.write("Hello from pyarrow!".encode())

This will create a single file with the specified name. To initiate pyarrow you should define environment CLASSPATH properly, set the output of hadoop classpath --glob to it

UsAndRufus
  • 385
  • 5
  • 14
-6
df.write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("PATH/FOLDER_NAME/x.csv")

you can use this and if you don't want to give the name of CSV everytime you can write UDF or create an array of the CSV file name and give it to this it will work

learner
  • 344
  • 2
  • 22
  • 7
    This still writes the dataframe into a folder and the name of the actual CSV file is still called `part-00000-c5f99bbc-f9a8-4fe6-bb47-9413f1fb4591.csv` – antonioACR1 Apr 28 '17 at 18:56