0

Currently trying write large number(7.5 million) of json files from s3 into a few parquet files(128MB) or 3600 records per file with EMR. However the process either takes too long or does not output the correct number of files.

The following is run on EMR 12x c4.8xlarge core nodes, r3.2xlarge master node

df = spark.read.json("s3://BUCKET)

After reading the file into a dataframe. Attempted to write write directly to s3

df.write.parquet("s3://BUCKET")

however this created 200,000 files with only about 36 records per file over 3 hours

I have attempted to repartition and coalesce

df.coalesce(2000).write.parquet("s3://BUCKET")
df.repartition(2000).write.parquet("s3://BUCKET")

however this took around 10 hours

The expected output

  1. set number of records or a set file size per parquet file
  2. the whole process should run less than 3 hours

Please suggest any EMR configuration, or spark (python or Scala solutions are welcome) code I should be using

Also if anyone has any idea why EMR is creating so many tasks to begin with that would also be helpful

Zillah
  • 1
  • 1
  • You have more than 7 million small json files ? – eliasah Mar 29 '19 at 07:57
  • Whats the time taken for coalesce and repartition separately? I hope you wont need both of them. – Jim Todd Mar 29 '19 at 08:17
  • You can try df.coalesce(2001). spark uses different data structure for bookeeping during shuffles, when the number of partitions is <= 2000, vs more than 2000. – Gladiator Mar 29 '19 at 08:22
  • For avoid 7.5 mln partitions, better read with "sparkContext.wholeTextFiles", and then parse each record. – pasha701 Mar 29 '19 at 10:09
  • @JimTodd repartition currently takes 10 hours, I did try coalesce as well but no tasks were complete in the first hour so I killed the process – Zillah Mar 29 '19 at 17:01
  • @pasha701 currently it is not creating 7.5 million partitions. It is creating 200,000 partitions. Would you have any code samples on how I would use "sparkContext.wholeTextFiles" and parse each record? I do not have any experience with spark – Zillah Mar 29 '19 at 19:37
  • Some "sparkContext.wholeTextFiles" description here: https://stackoverflow.com/questions/47129950/spark-textfile-vs-wholetextfiles after reading, row can be parsed with "from_json" function, some examples here: https://stackoverflow.com/questions/48639535/how-to-parse-each-row-json-to-columns-of-spark-2-dataframe – pasha701 Mar 30 '19 at 19:03

0 Answers0