2

I am running a Spark (2.0.1) job with multiple stages. I noticed that when I insert a cache() in one of later stages it changes the execution time of earlier stages. Why? I've never encountered such a case in literature when reading about caching().

Here is my DAG with cache():

myDAGwithCache

And here is my DAG without cache(). All remaining code is the same.

myDAGwithoutCache

I have a cache() after a sort merge join in Stage10. If the cache() is used in Stage10 then Stage8 is nearly twice longer (20 min vs 11 min) then if there were no cache() in Stage10. Why?

My Stage8 contains two broadcast joins with small DataFrames and a shuffle on a large DataFrame in preparation for the merge join. Stages8 and 9 are independent and operate on two different DataFrames.

Let me know if you need more details to answer this question.

UPDATE 8/2/1018 Here are the details of my Spark script:

I am running my job on a cluster via spark-submit. Here is my spark session.

   val spark = SparkSession.builder
      .appName("myJob")
      .config("spark.executor.cores", 5)
      .config("spark.driver.memory", "300g")
      .config("spark.executor.memory", "15g")
      .getOrCreate()

This creates a job with 21 executors with 5 cpu each.

Load 4 DataFrames from parquet files:

val dfT = spark.read.format("parquet").load(filePath1) // 3 Tb in 3185 partitions
val dfO = spark.read.format("parquet").load(filePath2) // ~ 700 Mb
val dfF = spark.read.format("parquet").load(filePath3) // ~ 800 Mb
val dfP = spark.read.format("parquet").load(filePath4) // 38 Gb

Preprocessing on each of the DataFrames is composed of column selection and dropDuplicates and possible filter like this:

val dfT1 = dfT.filter(...)
val dfO1 = dfO.select(columnsToSelect2).dropDuplicates(Array("someColumn2"))
val dfF1 = dfF.select(columnsToSelect3).dropDuplicates(Array("someColumn3"))
val dfP1 = dfP.select(columnsToSelect4).dropDuplicates(Array("someColumn4"))

Then I left-broadcast-join together first three DataFrames:

val dfTO = dfT1.join(broadcast(dfO1), Seq("someColumn5"), "left_outer")
val dfTOF = dfTO.join(broadcast(dfF1), Seq("someColumn6"), "left_outer")

Since the dfP1 is large I need to do a merge join, I can't afford it to do it now. I need to limit the size of dfTOF first. To do that I add a new timestamp column which is a withColumn with a UDF which transforms a string into a timestamp

val dfTOF1 = dfTOF.withColumn("TransactionTimestamp", myStringToTimestampUDF)

Next I filter on a new timestamp column:

val dfTrain = dfTOF1.filter(dfTOF1("TransactionTimestamp").between("2016-01-01 00:00:00+000", "2016-05-30 00:00:00+000"))

Now I am joining the last DataFrame:

val dfTrain2 = dfTrain.join(dfP1, Seq("someColumn7"), "left_outer")

And lastly the column selection with a cache() that is puzzling me.

val dfTrain3 = dfTrain.select("columnsToSelect5").cache()
dfTrain3.agg(sum(col("someColumn7"))).show()

It looks like the cache() is useless here but there will be some further processing and modelling of the DataFrame and the cache() will be necessary.

Should I give more details? Would you like to see execution plan for dfTrain3?

astro_asz
  • 2,278
  • 3
  • 15
  • 31
  • We definitely need more details here. In general this is not unusual, but details depend on the particular scenario. See https://stackoverflow.com/q/48427185/6910411 for hints, how to create useful examples. – zero323 Feb 07 '18 at 16:43
  • 1
    @user6910411 Added more details about the job as requested. Thanks. – astro_asz Feb 08 '18 at 08:58

0 Answers0