0

I have the following piece of code to save a file on S3

rdd
        //drop header
        .mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
        //assign the Key for PartitionBy
        //if the Key doesnt exist assign -1 which means all the data goes to part-00000 File
        .map(line => if (colIndex == -1) (null, line) else (line.split(TILDE)(colIndex), line))
        .partitionBy(customPartitioner)

        .map { case (_, line) => line }
        //Add Empty columns and Change the order and get the modified string
        .map(line => addEmptyColumns(line, schemaIndexArray))            
        .saveAsTextFile(s"s3a://$bucketName/$serviceName/$folderPath")

For HDFS , there is no S3 path and the code takes 1/5th of time. Any other approaches on how to fix this?. I am setting the hadoop configuration in spark.

user3897533
  • 417
  • 1
  • 8
  • 24

1 Answers1

0

S3 is an object store and not a file system, hence the issues arising out of eventual consistency, non-atomic rename operations i.e., every time the executors writes the result of the job, each of them write to a temporary directory outside the main directory (on S3) where the files had to be written and once all the executors are done a rename is done to get atomic exclusivity. This is all fine in a standard filesystem like hdfs where renames are instantaneous but on an object store like S3, this is not conducive as renames on S3 are done at 6MB/s.

To overcome above problem, ensure setting the following two conf parameters

  • spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2

  • spark.speculation=false

For more details on above parameters, refer the following answer

How to tune spark job on EMR to write huge data quickly on S3

Reference article:

https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98

Harsh Bafna
  • 2,094
  • 1
  • 11
  • 21
  • Thanks for the response, i am getting this error now in the tasl - java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 6155820641931972169, local class serialVersionUID = -3720498261147521051. i have the following set – user3897533 Nov 21 '19 at 05:38
  • sparkSession.sparkContext.hadoopConfiguration.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version","2") sparkSession.sparkContext.hadoopConfiguration.set("spark.speculation","false") sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.fast.upload","true") – user3897533 Nov 21 '19 at 05:38
  • It was a spark version conflict which i fixed and i dont see the error. But setting these is not having an affect. There is no performance gain – user3897533 Nov 21 '19 at 18:21
  • Well that's strange, these parameters should have provided the performance boost. Could you try setting fs.trash.interval to 0 in hadoop configuration? – Harsh Bafna Nov 21 '19 at 18:44
  • I did set all the values, no luck Performance is just the same -- sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.fast.upload","true") sparkSession.sparkContext.hadoopConfiguration.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version","2") sparkSession.sparkContext.hadoopConfiguration.set("fs.trash.interval", "0") sparkSession.sparkContext.hadoopConfiguration.set("spark.speculation","false") – user3897533 Nov 26 '19 at 23:49