1

I am using:

  • emr-5.20.0
  • Hadoop 2.8.5
  • Spark 2.4.0

One of my Spark job writes parquet data to s3 which seems to be done in 20-30 mins with 90% of the processing but for last 5-10% takes 2hrs to finish. I have read many forums and got to know EMR is using optimized output committer but its still takes way too much time. I am trying to configure custom committer but the job always uses EmrOptimizedSparkSqlParquetOutputCommitter, how can i disable this and add custom Committer.... below are the logs:

19/01/12 23:17:11 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
19/01/12 23:17:11 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
19/01/12 23:17:11 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
19/01/12 23:17:11 INFO EmrOptimizedParquetOutputCommitter: EMR Optimized Committer: ENABLED
19/01/12 23:17:11 INFO EmrOptimizedParquetOutputCommitter: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter
19/01/12 23:17:11 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
19/01/12 23:17:11 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
19/01/12 23:17:11 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
19/01/12 23:17:11 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
19/01/12 23:17:11 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
19/01/12 23:17:11 INFO FileSystemOptimizedCommitter: Nothing to setup as successful task attempt outputs are written directly.```

How do i disbale EMR to not use its own optimized EmrOptimizedSparkSqlParquetOutputCommitter

--conf spark.hadoop.mapred.output.committer.class = ai.peritus.training.preprocess.PeritusS3PartitionedOutputFormat 
--conf spark.hadoop.mapreduce.use.directfileoutputcommitter=false 
--conf spark.hadoop.spark.sql.parquet.output.committer.class= com.netflix.bdp.s3.S3PartitionedOutputCommitter 
--conf mapreduce.fileoutputcommitter.algorithm.version=1


hong4rc
  • 3,999
  • 4
  • 21
  • 40
varshnes
  • 21
  • 1
  • 4

1 Answers1

2

I'm from the EMR team, so I'm a bit biased about this feature, though there is some evidence of it working well for other customers. See, for example, https://stackoverflow.com/a/54350777/2205987.

However, I'll first answer the question you asked. In order to use your own committer class, you need to set the spark.sql.parquet.output.committer.class property. In your example above, you are incorrectly using spark.hadoop.spark.sql.parquet.output.committer.class.

That said, I think we should first confirm that your application is even making use of the EMRFS S3-optimized committer in the first place. If you are experiencing slowness at the end of a job, it is possible that either the optimized committer is not actually being used or that there is some other cause of slowness in your job, such as data skew causing a very long running task.

There are some instances in which the EMRFS S3-optimized committer will not actually be utilized even if it's enabled. It would help to know a little bit more information about your application, such as some example code. Also, if you are able to provide an example cluster id (j-ABC123), it would help somebody from EMR debug your issue.

Jonathan Kelly
  • 1,940
  • 11
  • 14
  • Also, please see https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html for more information about the EMRFS S3-optimized committer. – Jonathan Kelly Jan 25 '19 at 19:37
  • We actually kill those clusters after successful or failed completions, but I have made sure that EMRFS S3-optimized committer was used and it was outputted in logs: – varshnes Mar 13 '19 at 06:59
  • @JonathanKelly, is there any way to check whether EMRFS S3-optimized committer is utilized or not? – Bruce Apr 26 '20 at 09:26
  • @JonathanKelly can you please help at https://stackoverflow.com/questions/64993446/s3guard-and-parquet-magic-commiter-for-s3a-on-emr-6-x I also try to disable EmrOptimizedSparkSqlParquetOutputCommitter and add custom – Valeriy Solovyov Nov 24 '20 at 19:52