2

Hi i have output of my spark data frame which creates folder structure and creates so may part files . Now i have to merge all part files inside the folder and rename that one file as folder path name .

This is how i do partition

df.write.partitionBy("DataPartition","PartitionYear")
  .format("csv")
  .option("nullValue", "")
  .option("header", "true")/
  .option("codec", "gzip")
  .save("hdfs:///user/zeppelin/FinancialLineItem/output")

It creates folder structure like this

hdfs:///user/zeppelin/FinancialLineItem/output/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz
hdfs:///user/zeppelin/FinancialLineItem/output/DataPartition=Japan/PartitionYear=1971/part-00002-87a61115-92c9-4926-a803-b46315e55a08.c001.csv.gz

I have to create final file like this

hdfs:///user/zeppelin/FinancialLineItem/output/Japan.1971.currenttime.csv.gz

No part files here bith 001 and 002 is merged two one .

My data size it very big 300 GB gzip and 35 GB zipped so coalesce(1) and repartition becomes very slow .

I have seen one solution here Write single CSV file using spark-csv but i am not able to implement it please help me with it .

Repartition throw error

error: value repartition is not a member of org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row]
       dfMainOutputFinalWithoutNull.write.repartition("DataPartition","StatementTypeCode")
  • 3
    I suppose that your motivation to merge the files is to process it _outside_ of Spark. I would say in this case that the approach is to merge them _outside_ of Spark, since you are giving up on the distributed nature of your data which is in essence the reason to process it with Spark. – Alexandre Dupriez Oct 18 '17 at 14:23
  • Why do you have to merge all the files? The files being split into parts is ideal for reading with Spark. Also, HDFS is not meant to hold single large files like this, so if you are going to do it, it should be saved to the head node of your cluster. Is that an option instead of HDFS? – Dan Ciborowski - MSFT Oct 18 '17 at 15:59
  • @Anupam ok - why do you want to have them merged in a single file? – Alexandre Dupriez Oct 18 '17 at 16:32
  • @DanCiborowski-MSFT I have to deliver these files to clients and they want it in the same format ..Can we at least control the no of files per partition for example 5 files per partition ?Currently it creates more than 200 for the partition that has even 1 GB file also .. –  Oct 18 '17 at 16:33
  • @AlexandreDupriez thats the requirement ..Can we at-least rename all the files like running number at suffix in all the files .Also if not merge all the part files but can we control the no of files per partition ... –  Oct 18 '17 at 16:34
  • 1
    *Currently it creates more than 200 for the partition that has even 1 GB file also*, it might be because you might be running group by(shuffling) kind of transformation. certainly you can limit with `rdd/df.repartition(x)`. **x** is the number of file you want create for that rdd/df – mrsrinivas Oct 18 '17 at 16:39
  • @mrsrinivas here is detail question that my colleague has posted https://stackoverflow.com/questions/46754432/how-to-tune-spark-job-on-emr-to-write-huge-data-quickly-on-s3?noredirect=1#comment80581459_46754432 –  Oct 19 '17 at 05:37
  • @mrsrinivas will this `dfMainOutputFinalWithoutNull.coalesce(5).write.partitionBy("DataPartition","StatementTypeCode") ` work ? –  Oct 19 '17 at 06:09
  • It should help, Make sure you use `s3a` and **nearest region for s3 bucket** – mrsrinivas Oct 19 '17 at 06:15
  • I had a discussion with EMR support team and they confirmed us s3a deprecated now ...We should use s3 only .. –  Oct 19 '17 at 06:20
  • check this answer on [removing empty or reducing partitions](https://stackoverflow.com/a/42049056/1592191) – mrsrinivas Oct 19 '17 at 06:23
  • @mrsrinivas repartition is not working my case i have updated my question please suggest –  Oct 19 '17 at 07:09
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/157045/discussion-between-mrsrinivas-and-anupam). – mrsrinivas Oct 19 '17 at 07:10

1 Answers1

4

Try this from the head node outside of Spark...

hdfs dfs -getmerge <src> <localdst>

https://hadoop.apache.org/docs/r1.2.1/file_system_shell.html#getmerge

"Takes a source directory and a destination file as input and concatenates files in src into the destination local file. Optionally addnl can be set to enable adding a newline character at the end of each file."

Dan Ciborowski - MSFT
  • 6,807
  • 10
  • 53
  • 88