I was creating processing pipeline using Spark SQL 1.6.0 . This pipeline consist of steps/transformations and the output of one step is forward to next one. After last step the resulted DataFrame is save at HDFS. I also need to save the result at some intermediate steps. The code which is doing this as:
saveDataFrame(flushPath, flushFormat, isCoalesce, flushMode, previousDataFrame, sqlContext)
previousDataFrame
here, previousDataFrame is the result of the last step and saveDataFrame is just saving the DataFrame as given location, then the previousDataFrame will be used by next steps/transformation. And Finally after last step it will be saved at HDFS. The code for saveDataFrame is:
implicit def saveDataFrame(path: String, format: String, isCoalesce: Boolean, saveMode: SaveMode, dataFrame: DataFrame, sqlContext: SQLContext): Unit = {
val source = if (isCoalesce) dataFrame.coalesce(1) else dataFrame
if (format.equalsIgnoreCase("csv")) {
source
.write
.mode(saveMode)
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.save(path)
}
else if (format.equalsIgnoreCase("parquet") || format.equalsIgnoreCase("json")) {
source
.write
.mode(SaveMode.Overwrite)
.format(format)
.save(path)
}
else {
throw new Exception("%s input format is not supported".format(format))
}}
This works well, only the spark application is taking longer time than usual. If with saving intermediate output application runs in 20 minutes, then with this code it took 1 hour. Although the jobs and tasks complete in 20 minutes as per the Spark UI, but the spark-submit process continue to run till 1 hour.
Please help in figuring out the result. I have also tried following 2 possible solutions:
- Using Future to create multi-threading to call saveDataFrame.
- Caching the previousDataFrame before saving and reusing it into next step.