0

I am writing a PySpark implementation of an algorithm that is iterative in nature. Part of the algorithm involves iterating a strategy until no more improvements can be made (i.e., a local maximum has been greedily reached).

The function optimize returns a three-column dataframe that looks as follows:

id current_value best_value
0 1 1
1 0 1

This function is used in a while loop until current_value and best_value are identical (meaning that no more optimizations can be made).

# Init while loop
iterate = True

# Start iterating until optimization yields same result as before
while iterate:

    # Create (or overwrite) `df`
    df = optimizeAll(df2) # Uses `df2` as input
    df.persist().count()

    # Check stopping condition
    iterate = df.where('current_value != best_value').count() > 0

    # Update `df2` with latest results
    if iterate:
        df2 = df2.join(other=df, on='id', how='left') # <- Should I persist this?

This function runs very quickly when I pass it the inputs manually. However, I have noticed that the time it takes for the function to run increases exponentially as it iterates. That is, the first iteration runs in milliseconds, the second one in seconds and eventually it takes up to 10 minutes per pass.

This question suggests that if df isn't cached, the while loop will start running from scratch on every iteration. Is this true?

If so, which objects should I persist? I know that persisting df will be triggered by the count when defining iterate. However, df2 has no action, so even if I persist it, will it make the while loop start from scratch every time? Likewise, should I unpersist either table at some point in the loop?

Arturo Sbr
  • 5,567
  • 4
  • 38
  • 76
  • i'm afraid caching, in your case, might make it worse given it will store a dataframe with every iteration – samkart Jun 08 '23 at 16:09
  • What if I unpersist? – Arturo Sbr Jun 08 '23 at 18:08
  • un-persisting the dataframe before your final action (i.e. writing the df2 from final iteration) will result in loss of cached data for the whole process. – samkart Jun 09 '23 at 04:43
  • What does the `optimizeAll` actually do? For each row Is there a dependency on other rows to make the optimization? or each row is independent? – Islam Elbanna Jun 11 '23 at 12:30
  • Each row is independent. `optimizeAll` essentially does a ton of groupBy, join and row_number operations. – Arturo Sbr Jun 11 '23 at 22:14

1 Answers1

1

Both df and df2 should be persisted in your case if your resources are sufficient.

In your above example, as the df2 is not persisted, when you call df.persist().count() every time, since df2 is not persisted and only lineage information left, it will start joining from the first df2 in the 1st iteration, which is obviously not an efficient approach. Even you can persist either df or df2 only, you should persist df2 first in your example provided. It should give you a more consistent run time.

Beside, not sure which Spark version are you using, if you're using Spark >= 3.0.0, you should enable spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled since AQE will optimize the query plan (https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution and https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html). If you're using Spark < 3.0.0, you should check the joining strategy and data skew condition in your log when you want to optimize the joining and grouping strategy.

Jonathan Lam
  • 1,761
  • 2
  • 8
  • 17
  • 1
    this is not a very good suggestion, IMO. even if resources are sufficient, caching everything will only increase the overhead. – samkart Jun 12 '23 at 15:41
  • @samkart Yes you're right, caching will do increase overhead. On the other hand, based on the information provided, in my perspective, if you don't cache the `df2`, the processing time will be very long when the iteration increase, especially when you use blob cloud storage and store a lot of small chunk of file. It's a huge burden in file scanning and it takes time. For `df`, since it calls two action, I believe caching should help the processing time. I agree that the caching will increase overhead, but it should be relatively minimal comparing to the iterative processing in the example. – Jonathan Lam Jun 13 '23 at 01:14
  • https://stackoverflow.com/a/61079688/10445333 – Jonathan Lam Jun 13 '23 at 01:14
  • I think it wouldn't be relatively minimal once you start the action(s). calling cache doesn't cache the data, the action does. once, it starts caching everything from every iteration, your memory space keeps reducing and overhead increases, thus slowing down the whole process. – samkart Jun 13 '23 at 01:33