1

I'm running Spark job on AWS Glue. The job transforms the data and saves output to parquet files, partitioned by date (year, month, day directories). Job must be able to handle terabytes of input data and uses hundreds of executors, each with 5.5 GB memory limit.

Input covers over 2 years of data. The output parquet files for each date should be as big as possible, optionally split into 500 MB chunks. Creating multiple small files for each day is not wanted.

Few tested approaches:

  • repartitioning by the same columns as in write results in Out Of Memory errors on executors:
df = df.repartition(*output_partitions)

(df
 .write
 .partitionBy(output_partitions)
 .parquet(output_path))
  • repartitioning with an additional column with random value results in having multiple small output files written (corresponding to spark.sql.shuffle.partitions value):
df = df.repartition(*output_partitions, "random")

(df
 .write
 .partitionBy(output_partitions)
 .parquet(output_path))
  • setting the number of partitions in repartition function, for example to 10, gives 10 quite big output files, but I'm afraid it will cause Out Of Memory errors when actual data (TBs in size) will be loaded:
df = df.repartition(10, *output_partitions, "random")

(df
 .write
 .partitionBy(output_partitions)
 .parquet(output_path))

(df in code snippets is a regular Spark Data Frame)

I know I can limit the output file size with maxRecordsPerFile write option. But this limits the output created from a single memory partition, so in the first place, I would need to have partitions created by date.

So the question is how to repartition data in memory to:

  • split it over multiple executors to prevent Out Of Memory errors,
  • save output for each day to limited number of big parquet files,
  • write output files in parallel (using as much executors as possible)?

I've read those sources but did not find a solution:

Radzikowski
  • 2,377
  • 4
  • 27
  • 39
  • Have you checked that you have enough disk storage on your nodes? This will help avoid OOM during repartition shuffle, allowing Spark to spill to disk. – Chris May 26 '20 at 13:06
  • From the metrics I have I don't see any disk space usage. And I'm not sure if the OOM does not happen after the repartition, during the write operation, as writing parquet requires a significant amount of memory as far as I know. – Radzikowski May 26 '20 at 14:25
  • How large is the partition size for daily repartition? For 2 years this should be approximately 700 partitions right? – abiratsis May 26 '20 at 14:46
  • Also if your total data is a couple of terra then one day data it should a couple of GB, so from the given information it doesn't seem that repartitioning by day would create many small chunks – abiratsis May 26 '20 at 15:09
  • One day data can have up to 3 GB as parquet (with snappy). But to do other transformations the input data is split into much more partitions, to speed up mappings. I test the process right now with 1-month data, 200 executors, and 800 partitions, and those are similar proportions to what I want to use for full-size job. – Radzikowski May 26 '20 at 15:33

0 Answers0