In pyspark, I have some operations that need to be looped over and constantly change certain dataframe.
df = spark.sql("select * from my_table")
for iter in range(0,num_iter):
for v in var_list:
//....
//Operations here that changes df
df = f1()
..
df = join(df, another dataset)
df = f2(df)
..
//There are some action here that should conclude DAG of this loop
I notice that as the loop progresses, performance is dragged down. Initially it can take only seconds; after a couple of loops, each iterations take minutes to hours. Checking from YARN, it seems that the DAG grows as we move along. At some point the executor just fail with error showing a very long DAG
The DAG will grow like this [Example only, it's not solely join operation that grow the DAG]
[]
Is there a way to improve performance in this situation ? What is causing performance hit ? If it's objects hogging memory, is there a way to flush them after each loop ?
Is cache() (with follow-up action) at each loop a good work-around to avoid DAG build-up ? I did try caching and still the performance drags on.