1

I have data frame with 20 columns and 25 records (small standard data. file size = 7KB). I need to perform multiple operations on the data frame in loop. The loop works fine within few seconds as expected. Problem is when it finishes, and I try to show() or write the data into disk, my CPU goes high for many minutes (15-20 minutes) with high memory usage. Many times I get stackoverflow or outofmemory error.

My main() method looks like:

val spark = get_spark()
val i_file = args(1)
val df = spark.read
  .format("csv")
  .option("delimiter", ",")
  .option("header", true)
  .option("inferSchema","true")
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
  .load(i_file)

var s_df = something.calculate(spark,df)


/////////problem starts here ///////////
s_df.repartition(col("cluster_id"))    //tried without repartition also
  .write.mode("overwrite")
  .option("header", "true").csv("C:\\Workspace\\data\\out.csv")    

And my calculate() method in something:

def calculate(spark: SparkSession, df: DataFrame): DataFrame = {
  var newdf = init(spark, df) //some initialization on the dataframe
  val t_count = 100
  var t_i = 0
  newdf.rdd.cache()
  while (t_i < t_count) {
    if(some codition){
      newdf = calculate_type1(spark, newdf)
    }else{
      newdf = calculate_type2(spark, newdf)
    }
    t_i = t_i + 1
    if(t_i === 50){
      newdf.rdd.checkpoint()
    }
  }
  return newdf
}

My analysis:

  1. Noticed that it works with less loops, e.g. t_count = 2, everything works good.

  2. I believe the problem is that spark keeps graph in its memory and tries to process the graph to generate the final data frame.

  3. I am using var which is incorrect, somehow I should use val and using leftfold or zip to update the original rdd. But I am struggling with that. Can someone help please. Many thanks!!!

  4. Do I need checkpoint? I don't see any use of that
Eyedia Tech
  • 135
  • 1
  • 11
  • Checkpoint is good. Also Dataframe (which is nothing but Dataset[Row]) checkpoint returns the checkpointed Dataframe back. So catch it back. So something like `df = df.checkpoint(eager = true)` – user238607 Nov 02 '18 at 20:18
  • Not convinced this is recursive. – thebluephantom Nov 02 '18 at 22:22
  • Thanks @user238607, will try that out and update. – Eyedia Tech Nov 02 '18 at 23:03
  • 1
    @thebluephantom - type more...You mean it should have been iterative? – Eyedia Tech Nov 02 '18 at 23:03
  • @user238607 - Seems like after reassigning the df back, I am not getting any OOM or stackoverflow errors. But saving checkpoint taking about 10-12 minutes with 75 records with t_count as 100 – Eyedia Tech Nov 07 '18 at 20:00
  • @EyediaTech : It seems that your calculation graph is accumulating a lot. Hence you might be getting OOM or SO error. What you can do is trigger an action after 10 loops or so. i.e println(newdf.count()). This way spark is forced to calculated the updated df and then it proceeds from there. Remove the checkpoint as it saves the df to disk which could take minutes as you are suggesting. – user238607 Nov 08 '18 at 15:40
  • Thanks again @user238607. Thats exactly what I am doing now, I wrote my own checkpoint save where I am trying 3 possible ways : (1) action method count(), (2) write the df into the disk and read it back (3) spark checkpoint as you suggested df = df.checkpoint(eager = true) – Eyedia Tech Nov 08 '18 at 16:01
  • Interesting...I am calling newdf.count() after every 10 loops: ` Duration :6.441859463 Duration :12.071247208 Duration :23.943826849 Duration :35.756145541 Duration :52.909446527 ` Please note that my dataframe's total record will remain constant, I am not adding new records. This is really strange!! – Eyedia Tech Nov 08 '18 at 16:26
  • @EyediaTech : Try to print out the plans for the dataframes and see what is going on. Is it recomputing the whole thing again and again. That could be the issue. https://stackoverflow.com/questions/50430233/spark-physical-plan-logical-plan – user238607 Nov 08 '18 at 17:53
  • Yes, something is getting recomputed. here are first 4 loops `*(72) Project [...nested info deleted] Duration :8.745540408` ` *(122) Project [...nested info deleted(bigger than the previous)] Duration :15.959967573` ` *(172) Project [...nested info deleted(bigger than the previous)] Duration :26.937254949` `*(222) Project [...nested info deleted(bigger than the previous)] Duration :40.952499263` – Eyedia Tech Nov 08 '18 at 18:24
  • Similar issue - https://stackoverflow.com/questions/39084739/evaluating-spark-dataframe-in-loop-slows-down-with-every-iteration-all-work-don – Eyedia Tech Nov 09 '18 at 11:58
  • @EyediaTech : try to cache your dataframe after computation. Here is more on that : https://stackoverflow.com/questions/37747712/spark-strange-behaviour-with-iterative-algorithms – user238607 Nov 09 '18 at 15:20
  • I had placed newdf.rdd.cache() before the while a few days ago. I updated the code in my original question. Stackoverflow is suggesting to move to a chat. Are you OK please? – Eyedia Tech Nov 09 '18 at 17:12
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/183381/discussion-between-eyedia-tech-and-user238607). – Eyedia Tech Nov 09 '18 at 17:12

0 Answers0