1

After Spark upgrade from 2.1 to 2.3 I have issues with cached PySpark dataframes. In Spark 2.1 cache() method worked for me as deep copy even though it shouldn't worked like that based on the documentation.

Example:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window 

sparkSession = (SparkSession
               .builder
               .appName('process_name')
               .enableHiveSupport()
               .getOrCreate())
src_tbl = sparkSession.sql("SELECT * FROM src_tbl") 
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl") 
delta = src_tbl.subtract(dst_tbl)  # find the difference 

# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records) 
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number(). \                                        
  over(Window.partitionBy(join_field). \                                                    
  orderBy(spark_func.desc("last_modified_date"))))
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records    

# save final tables to DB
snpsht_tbl_name = 'snpsht'
snpsht_tbl.write.mode("overwrite").saveAsTable(snpsht_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl  + 
                    SELECT * FROM snpsht_table_name_tmp""")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count()  # 0 records     

inc_tbl_name = 'inc'
inc_tbl.write.mode("overwrite").saveAsTable(inc_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl  + 
                    SELECT * FROM inc_table_name_tmp""")

This is a minimal example to produce my problem.

And now, in Spark 2.1 inc_tbl has been just saved to the inc_tbl with all new records (from the current day) with the data that was there in the moment of cache method usage and this is what I want to have. In Spark 2.3 there is something that calculates all transformations from the beginning again so checking that snpsht_tbl table already has records from the current date so just inserting records that were there before processing.

max04
  • 5,315
  • 3
  • 13
  • 21
  • 2
    Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you call `saveAsTable`, so there is not enough information here to diagnose the problem. Could you please [edit] the question and include a [mcve]? – 10465355 Nov 21 '18 at 21:24
  • 1
    Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it. – max04 Nov 21 '18 at 21:45
  • In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does? – max04 Nov 21 '18 at 22:03
  • [These are pretty good explanations](https://stackoverflow.com/q/28981359/10465355). In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or [be evicted](https://stackoverflow.com/search?tab=votes&q=%5bapache-spark%5d%20cache%20lru). As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these. – 10465355 Nov 21 '18 at 23:22
  • 1
    What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working. – max04 Nov 22 '18 at 21:08

0 Answers0