While working on improving code performance as I had many jobs fail (aborted), I thought about using persist()
function on Spark Dataframe whenever I need to use that same dataframe on many other operations. When doing it and following the jobs, stages in the Spark application UI, I felt like it's not really always optimal to do so, it depends on the number of partitions and the data size. I wasn't sure until I got the job aborted because of a fail in the persist stage.
I'm questioning if the best practice of using persist()
whenever many operations will be performed on the dataframe is always valid? If not, when it's not? how to judge?
To be more concrete I will present my code and the details of the aborted job:
#create a dataframe from another one df_transf_1 on which I made a lot of transformations but no actions
spark_df = df_transf_1.select('user_id', 'product_id').dropDuplicates()
#persist
spark_df.persist()
products_df = spark_df[['product_id']].distinct()
df_products_indexed = products_df.rdd.map(lambda r: r.product_id).zipWithIndex().toDF(['product_id', 'product_index'])
You may ask why I persisted spark_df
?
It's because I'm going to use it multiple of times like with products_df
and also in joins
(e.g: spark_df = spark_df.join(df_products_indexed,"product_id")
Details of fail reason in Stage 3:
Job aborted due to stage failure: Task 40458 in stage 3.0 failed 4 times, most recent failure: Lost task 40458.3 in stage 3.0 (TID 60778, xx.xx.yyyy.com, executor 91): ExecutorLostFailure (executor 91 exited caused by one of the running tasks) Reason: Slave lost Driver stacktrace:
The size of the input data (4 TB) is huge, before doing persist is there a way to check the size of the data? Is it a parameter in choosing to persist or not? Also the number of partitions (tasks) for persist
> 100,000