I have data frame with 20 columns and 25 records (small standard data. file size = 7KB). I need to perform multiple operations on the data frame in loop. The loop works fine within few seconds as expected. Problem is when it finishes, and I try to show() or write the data into disk, my CPU goes high for many minutes (15-20 minutes) with high memory usage. Many times I get stackoverflow or outofmemory error.
My main() method looks like:
val spark = get_spark()
val i_file = args(1)
val df = spark.read
.format("csv")
.option("delimiter", ",")
.option("header", true)
.option("inferSchema","true")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.load(i_file)
var s_df = something.calculate(spark,df)
/////////problem starts here ///////////
s_df.repartition(col("cluster_id")) //tried without repartition also
.write.mode("overwrite")
.option("header", "true").csv("C:\\Workspace\\data\\out.csv")
And my calculate() method in something:
def calculate(spark: SparkSession, df: DataFrame): DataFrame = {
var newdf = init(spark, df) //some initialization on the dataframe
val t_count = 100
var t_i = 0
newdf.rdd.cache()
while (t_i < t_count) {
if(some codition){
newdf = calculate_type1(spark, newdf)
}else{
newdf = calculate_type2(spark, newdf)
}
t_i = t_i + 1
if(t_i === 50){
newdf.rdd.checkpoint()
}
}
return newdf
}
My analysis:
Noticed that it works with less loops, e.g. t_count = 2, everything works good.
I believe the problem is that spark keeps graph in its memory and tries to process the graph to generate the final data frame.
I am using var which is incorrect, somehow I should use val and using leftfold or zip to update the original rdd. But I am struggling with that. Can someone help please. Many thanks!!!
- Do I need checkpoint? I don't see any use of that