I ran into a strange problem where calling unpersist()
on one Dataset affects the count of another Dataset in the same block of code. Unfortunately this happens during a complex long-running job with many Datasets so I can't summarize the whole thing here. I know this makes for a difficult question, however let me try to sketch it out. What I'm looking for is some confirmation that this behavior is unexpected and any ideas about why it may be occurring or how we can avoid it.
Edit: This problem as reported occurs on Spark 2.1.1, but does not occur on 2.1.0. The problem is 100% repeatable but only in my project with 1000's of lines of code and data, I'm working to try to distill it down to a concise example but have not yet been able, I will post any updates or re-submit my question if I find something. The fact that the exact same code and data works in 2.1.0 but not 2.1.1 leads me to believe it is due to something within Spark.
val claims:Dataset = // read claims from file
val accounts:Dataset = // read accounts from file
val providers:Dataset = // read providers from file
val payers:Dataset = // read payers from file
val claimsWithAccount:Dataset = // join claims and accounts
val claimsWithProvider:Dataset = // join claims and providers
val claimsWithPayer:Dataset = // join claimsWithProvider and payers
claimsWithPayer.persist(StorageLevel.MEMORY_AND_DISK)
log.info("claimsWithPayer = " + claimsWithPayer.count()) // 46
// This is considered unnecessary intermediate data and can leave the cache
claimsWithAccount.unpersist()
log.info("claimsWithPayer = " + claimsWithPayer.count()) // 41
Essentially, calling unpersist()
on one of the intermediate data sets in a series of joins affects the number of rows in one of the later data sets, as reported by Dataset.count()
.
My understanding is that unpersist()
should remove data from the cache but that it shouldn't affect the count or contents of other data sets? This is especially surprising since I explicitly persist claimsWithPayer
before I unpersist the other data.