3

In pyspark, I have some operations that need to be looped over and constantly change certain dataframe.

df = spark.sql("select * from my_table")
for iter in range(0,num_iter):
   for v in var_list:
     //....
     //Operations here that changes df
     df = f1()
     ..
     df = join(df, another dataset)
     df = f2(df)
     ..
     //There are some action here that should conclude DAG of this loop

I notice that as the loop progresses, performance is dragged down. Initially it can take only seconds; after a couple of loops, each iterations take minutes to hours. Checking from YARN, it seems that the DAG grows as we move along. At some point the executor just fail with error showing a very long DAG

The DAG will grow like this [Example only, it's not solely join operation that grow the DAG] [DAG]

  1. Is there a way to improve performance in this situation ? What is causing performance hit ? If it's objects hogging memory, is there a way to flush them after each loop ?

  2. Is cache() (with follow-up action) at each loop a good work-around to avoid DAG build-up ? I did try caching and still the performance drags on.

Kenny
  • 1,902
  • 6
  • 32
  • 61
  • how about making one big dataframe including all those datasets, from the beginning, instead of joining (putting all into one table and get a df out of it once) – mangusta Jul 31 '19 at 21:40
  • You can cache at the end of each iteration, force the evaluation and the unpersist the old df _(you need to save a reference to it)_. It may help, but forcing the computation may lead to more performance problems. – Luis Miguel Mejía Suárez Jul 31 '19 at 21:50
  • @LuisMiguelMejíaSuárez will not work unfortunately. Lineage is still kept even after computation. – simpadjo Jul 31 '19 at 22:40
  • 2
    Take a look at this question and answers: https://stackoverflow.com/questions/57265156/iterative-caching-vs-checkpointing-in-spark – simpadjo Jul 31 '19 at 22:40
  • 1. I am intrigued that lineage is still kept. Isn't cache A>B>C.cache()>D supposed to force evaluation at C, use that onwards instead of going through A>B again ? 2. https://stackoverflow.com/questions/36195105/what-happens-if-i-cache-the-same-rdd-twice-in-spark it seems that if **setname** is used, I don't need to unpersist the old df because if will overwrite the old df. Any thoughts ? @mangusta as said, it's not just joining ready-to-join datasets; it would be too easy. df is already the big dataset, some columns are recomputed and another_dataset is calculated within that loop. – Kenny Aug 01 '19 at 13:44
  • 1
    https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/Dataset.html#checkpoint-- ? – mazaneicha Aug 01 '19 at 14:33
  • @mazaneicha According to the link of simpadjo, should I understand that **cache** = **checkpoint** + lineage still kept **in its knowledge** in case of fault but not evaluated each time ? In that case is cache a better option as it's also eager evaluation, bonus fault tolerance ? – Kenny Aug 01 '19 at 14:58
  • `cache` (or `persist` in general for that matter) is "lazy", they are not evaluated until you perform an action. Ssee Spark docs or TL;DR this SO https://stackoverflow.com/questions/34438670/why-persist-are-lazily-evaluated-in-spark. Also `cache`'s fault tolerance = recompute the whole DAG, while `checkpoint()` does compute eagerly, and truncates DAG which I thought was your issue, no? – mazaneicha Aug 01 '19 at 15:19
  • 1
    By saying cache() I meant cache() and action to trigger it. Otherwise most of the questions on Stackoverflow regarding transformations would be invalid, aren't they; since none of them is evaluated or does anything. So to rephrase the question : is cache + trigger action = checkpoint, with bonus of DAG in history. As I understand, even cache is evaluated, the DAG is still kept in its knowledge. Fault tolerance seems to be for emergency only, when the cached objects fail etc. – Kenny Aug 01 '19 at 16:08
  • what was the final conclusion if i may ask? – thebluephantom Apr 27 '20 at 22:19
  • @thebluephantom cache does not truncate the DAG. My [not-so-clean] solution was to write and read back. That ensures a clean DAG. Any recommendation is welcome. – Kenny Apr 28 '20 at 19:42
  • 1
    I find this area hard to follow as we hear that Spark is great for this and that compared to hadoop and then we had spark streaming / checkpointing and watermark bla bla alleviation, but for iterative due to immutable RDD we get into a strange situation in which Spark is NOT so good. Most machine leaning is iterative. From reading around I find the issue not well described, not even in Holden karau. I will get back as due to corona I have some extra time to spare! – thebluephantom Apr 28 '20 at 20:12
  • I did see some comments elsewhere that Spark is not so great for loops like that. I was sure about the cache [then action] not solving the issue. Tested, checked that objects are cached in memory and nevertheless always got built-up lineage + stack overflown. Not sure how ML algorithm are implemented. If you happen to have any insights on the tricks mllib uses, I'm happy to hear. – Kenny May 16 '20 at 18:09

0 Answers0