2

I was creating processing pipeline using Spark SQL 1.6.0 . This pipeline consist of steps/transformations and the output of one step is forward to next one. After last step the resulted DataFrame is save at HDFS. I also need to save the result at some intermediate steps. The code which is doing this as:

saveDataFrame(flushPath, flushFormat, isCoalesce, flushMode, previousDataFrame, sqlContext)
previousDataFrame

here, previousDataFrame is the result of the last step and saveDataFrame is just saving the DataFrame as given location, then the previousDataFrame will be used by next steps/transformation. And Finally after last step it will be saved at HDFS. The code for saveDataFrame is:

implicit def saveDataFrame(path: String, format: String, isCoalesce: Boolean, saveMode: SaveMode, dataFrame: DataFrame, sqlContext: SQLContext): Unit = {
val source = if (isCoalesce) dataFrame.coalesce(1) else dataFrame
if (format.equalsIgnoreCase("csv")) {
  source
    .write
    .mode(saveMode)
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .save(path)
}
else if (format.equalsIgnoreCase("parquet") || format.equalsIgnoreCase("json")) {
  source
    .write
    .mode(SaveMode.Overwrite)
    .format(format)
    .save(path)
}
else {
  throw new Exception("%s input format is not supported".format(format))
}}

This works well, only the spark application is taking longer time than usual. If with saving intermediate output application runs in 20 minutes, then with this code it took 1 hour. Although the jobs and tasks complete in 20 minutes as per the Spark UI, but the spark-submit process continue to run till 1 hour.

Please help in figuring out the result. I have also tried following 2 possible solutions:

  • Using Future to create multi-threading to call saveDataFrame.
  • Caching the previousDataFrame before saving and reusing it into next step.
Mukul Garg
  • 102
  • 10
  • Yes, saving dataframes is for some reason painfully slow in Spark. You may want to look at the answer I have given [here](http://stackoverflow.com/questions/35002184/spark-jobs-finishes-but-application-takes-time-to-close/35032070#35032070) - although the complaint is about writing to S3, the advice with the `OutputCommiter` and `SUCCES`-file should I guess still hold for HDFS. If you are running on AWS you could drop the rdds that needs to be persisted onto Kineses and have a Firehose store it. That way your Spark application will not be slowed down by writes. – Glennie Helles Sindholt Jun 24 '16 at 07:39
  • If you're not running on AWS, maybe you still have access to a queue of some sort? Or you could implement it - the idea is to get the writes processed outside you core application. – Glennie Helles Sindholt Jun 24 '16 at 07:42
  • Thanks, with the help of your reply, I found the issue, it was because of AWS S3. One of the many output was getting stored on S3, which was slowing down the entire execution. After moving that output to HDFS, execution got completed in 10 mins. – Mukul Garg Jun 29 '16 at 08:26

1 Answers1

0

The issue was the AWS S3 path, which was causing the delay in execution. As I started saving output to HDFS, the execution time got reduced.

Mukul Garg
  • 102
  • 10