15

I use this method to write csv file. But it will generate a file with multiple part files. That is not what I want; I need it in one file. And I also found another post using scala to force everything to be calculated on one partition, then get one file.

First question: how to achieve this in Python?

In the second post, it is also said a Hadoop function could merge multiple files into one.

Second question: is it possible merge two file in Spark?

Community
  • 1
  • 1
sydridgm
  • 1,012
  • 4
  • 16
  • 30

5 Answers5

34

You can use,

df.coalesce(1).write.csv('result.csv')

Note: when you use coalesce function you will lose your parallelism.

Mohamed Thasin ah
  • 10,754
  • 11
  • 52
  • 111
6

You can do this by using the cat command line function as below. This will concatenate all of the part files into 1 csv. There is no need to repartition down to 1 partition.

import os
test.write.csv('output/test')
os.system("cat output/test/p* > output/test.csv")
David
  • 11,245
  • 3
  • 41
  • 46
  • I think this will not help when job is running on cluster mode, still there will be a different files on each executor. – RockStar Oct 31 '18 at 17:15
  • This will not work on cloud blobs such as AWS S3. https://stackoverflow.com/a/32467122/1001015 – SantiagoRodriguez May 05 '20 at 11:30
  • 1
    Watch out that if you choose to save the header this saves it for all the parts, so when you concatenate them together you will have headers that are now part of the data. – Nic Scozzaro Mar 19 '21 at 03:19
1

Requirement is to save an RDD in a single CSV file by bringing the RDD to an executor. This means RDD partitions present across executors would be shuffled to one executor. We can use coalesce(1) or repartition(1) for this purpose. In addition to it, one can add a column header to the resulted csv file. First we can keep a utility function for make data csv compatible.

def toCSVLine(data):
    return ','.join(str(d) for d in data)

Let’s suppose MyRDD has five columns and it needs 'ID', 'DT_KEY', 'Grade', 'Score', 'TRF_Age' as column Headers. So I create a header RDD and union MyRDD as below which most of times keeps the header on top of the csv file.

unionHeaderRDD = sc.parallelize( [( 'ID','DT_KEY','Grade','Score','TRF_Age' )])\
   .union( MyRDD )

unionHeaderRDD.coalesce( 1 ).map( toCSVLine ).saveAsTextFile("MyFileLocation" )

saveAsPickleFile spark context API method can be used to serialize data that is saved in order save space. Use pickFile to read the pickled file.

David
  • 11,245
  • 3
  • 41
  • 46
AK Gangoni
  • 124
  • 6
1

I needed my csv output in a single file with headers saved to an s3 bucket with the filename I provided. The current accepted answer, when I run it (spark 3.3.1 on a databricks cluster) gives me a folder with the desired filename and inside it there is one csv file (due to coalesce(1)) with a random name and no headers.

I found that sending it to pandas as an intermediate step provided just a single file with headers, exactly as expected.

my_spark_df.toPandas().to_csv('s3_csv_path.csv',index=False)
Chappy Hickens
  • 397
  • 2
  • 5
0

I found this solution

df.coalesce(1).write.mode('overwrite').csv('test.csv')


from py4j.java_gateway import java_import
java_import(spark._jvm,'org.apache.hadoop.fs.Path')
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(spark._jvm.Path('test.csv/part*'))[0].getPath().getName()
fs.rename(spark._jvm.Path('test.csv/'+ file), spark._jvm.Path('test2.csv'))
fs.delete(spark._jvm.Path('test.csv'), True)