I am using:
- emr-5.20.0
- Hadoop 2.8.5
- Spark 2.4.0
One of my Spark job writes parquet data to s3 which seems to be done in 20-30 mins with 90% of the processing but for last 5-10% takes 2hrs to finish. I have read many forums and got to know EMR is using optimized output committer but its still takes way too much time. I am trying to configure custom committer but the job always uses EmrOptimizedSparkSqlParquetOutputCommitter, how can i disable this and add custom Committer.... below are the logs:
19/01/12 23:17:11 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
19/01/12 23:17:11 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
19/01/12 23:17:11 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
19/01/12 23:17:11 INFO EmrOptimizedParquetOutputCommitter: EMR Optimized Committer: ENABLED
19/01/12 23:17:11 INFO EmrOptimizedParquetOutputCommitter: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter
19/01/12 23:17:11 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
19/01/12 23:17:11 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
19/01/12 23:17:11 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
19/01/12 23:17:11 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
19/01/12 23:17:11 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
19/01/12 23:17:11 INFO FileSystemOptimizedCommitter: Nothing to setup as successful task attempt outputs are written directly.```
How do i disbale EMR to not use its own optimized EmrOptimizedSparkSqlParquetOutputCommitter
--conf spark.hadoop.mapred.output.committer.class = ai.peritus.training.preprocess.PeritusS3PartitionedOutputFormat
--conf spark.hadoop.mapreduce.use.directfileoutputcommitter=false
--conf spark.hadoop.spark.sql.parquet.output.committer.class= com.netflix.bdp.s3.S3PartitionedOutputCommitter
--conf mapreduce.fileoutputcommitter.algorithm.version=1