14

Running spark job using scala, as expected all jobs are finishing up on time , but somehow some INFO logs are printed for 20-25 minutes before job stops.

Posting few UI screenshot which can help to undestand the problem .

  1. Following is time taken by 4 stages :

Time taken by 4 stages

  1. Following is time between consecutive job ids time between consecutive job ids

I dont understand why there is so much time spent in between both job ids.

Following is my code snippet:

    val sc = new SparkContext(conf)
for (x <- 0 to 10) {
  val zz = getFilesList(lin);
  val links = zz._1
  val path = zz._2
  lin = zz._3
  val z = sc.textFile(links.mkString(",")).map(t => t.split('\t')).filter(t => t(4) == "xx" && t(6) == "x").map(t => titan2(t)).filter(t => t.length > 35).map(t => ((t(34)), (t(35), t(5), t(32), t(33))))
  val way_nodes = sc.textFile(way_source).map(t => t.split(";")).map(t => (t(0), t(1)));
  val t = z.join(way_nodes).map(t => (t._2._1._2, Array(Array(t._2._1._2, t._2._1._3, t._2._1._4, t._2._1._1, t._2._2)))).reduceByKey((t, y) => t ++ y).map(t => process(t)).flatMap(t => t).combineByKey(createTimeCombiner, timeCombiner, timeMerger).map(averagingFunction).map(t => t._1 + "," + t._2)
  t.saveAsTextFile(path)
}
sc.stop()

Some more followup : spark-1.4.1 saveAsTextFile to S3 is very slow on emr-4.0.0

Community
  • 1
  • 1
Harshit
  • 1,207
  • 1
  • 20
  • 40
  • I would generally recommend using the spark-csv package from Databricks rather than the saveAsTextFile, but aside from that, which version of Spark are you running? – Glennie Helles Sindholt Jan 26 '16 at 07:17
  • Advantage of saveAsTextFile is I can directly save everything on s3 , not sure how spark-csv package databricks works. Thanks for some direction , will look into it anyways . spark - 1.4.1 scala - 2.10.6 – Harshit Jan 26 '16 at 15:15

3 Answers3

23

As I put in a comment, I recommend using the spark-csv package instead of sc.saveAsTextFile and there are no problems with writing directly to s3 using that package :)

I don't know if you use s3 or s3n, but maybe try to switch. I have experienced problems with using s3a on Spark 1.5.2 (EMR-4.2) where writes timed out all the time and switching back to s3 solved the problem, so it's worth a try.

A couple of other things that should speed up writes to s3 is to use the DirectOutputCommiter

conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter")

and disabling generation of _SUCCESS files:

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Note that disabling _SUCCESS files has to be set on the hadoop configuration of the SparkContext and not on the SparkConf.

I hope this helps.

Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
  • can you please suggest how to handle this https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename – BdEngineer May 27 '20 at 08:00
2

I ended up upgrading my spark version and issue was resolved .

Harshit
  • 1,207
  • 1
  • 20
  • 40
  • can you please suggest how to handle this https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename – BdEngineer May 27 '20 at 07:59
2

I had the same kind of problem when writing files to S3. I use the spark 2.0 version, just to give you a updated code for the verified answer

In Spark 2.0 you can use,

val spark = SparkSession.builder().master("local[*]").appName("App_name").getOrCreate()

spark.conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter")
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

This solved my problem of job getting struck

Aravind Krishnakumar
  • 2,727
  • 1
  • 28
  • 25
  • can you please suggest how to handle this https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename – BdEngineer May 27 '20 at 07:58