2

I ran into a strange problem where calling unpersist() on one Dataset affects the count of another Dataset in the same block of code. Unfortunately this happens during a complex long-running job with many Datasets so I can't summarize the whole thing here. I know this makes for a difficult question, however let me try to sketch it out. What I'm looking for is some confirmation that this behavior is unexpected and any ideas about why it may be occurring or how we can avoid it.

Edit: This problem as reported occurs on Spark 2.1.1, but does not occur on 2.1.0. The problem is 100% repeatable but only in my project with 1000's of lines of code and data, I'm working to try to distill it down to a concise example but have not yet been able, I will post any updates or re-submit my question if I find something. The fact that the exact same code and data works in 2.1.0 but not 2.1.1 leads me to believe it is due to something within Spark.

val claims:Dataset = // read claims from file
val accounts:Dataset = // read accounts from file
val providers:Dataset = // read providers from file
val payers:Dataset = // read payers from file

val claimsWithAccount:Dataset = // join claims and accounts
val claimsWithProvider:Dataset = // join claims and providers

val claimsWithPayer:Dataset = // join claimsWithProvider and payers

claimsWithPayer.persist(StorageLevel.MEMORY_AND_DISK)
log.info("claimsWithPayer = " + claimsWithPayer.count()) // 46

// This is considered unnecessary intermediate data and can leave the cache
claimsWithAccount.unpersist()

log.info("claimsWithPayer = " + claimsWithPayer.count()) // 41

Essentially, calling unpersist() on one of the intermediate data sets in a series of joins affects the number of rows in one of the later data sets, as reported by Dataset.count().

My understanding is that unpersist() should remove data from the cache but that it shouldn't affect the count or contents of other data sets? This is especially surprising since I explicitly persist claimsWithPayer before I unpersist the other data.

Uncle Long Hair
  • 2,719
  • 3
  • 23
  • 33
  • 2
    "Roughly like this" is not good enough. Please try to work on a [mcve] - it is either __a serious correctness bug__ (and [then should be reported](https://issues.apache.org/jira/projects/SPARK/summary) with enough information to determine the source of the problem) __or it is your mistake__ (like assuming deterministic behavior where Spark doesn't provide any guarantees) and working towards minimal example should give some hints. Also please make sure you're using the latest minor version. – zero323 Sep 24 '17 at 15:36
  • Like so many other Spark issues it only happens with certain data and circumstances. I've worked many hours to distill it down to a concise example but have not yet been able. My question is about the semantics of Spark's cache, persist and unpersist similar to this question https://stackoverflow.com/questions/29903675/understanding-sparks-caching – Uncle Long Hair Sep 24 '17 at 17:16
  • So like I said: if every ancestor of `claimsWithPayer` is deterministic (enough) then persistence should have no effect at all, and this is a bug. Otherwise it is probably misunderstanding of semantics. It is up to you to figure out if this is the case or not. If you can reduce the problem to something manageable I'd recommend forwarding this to dev list. If something changed between 2.1.0 and 2.1.1 it is more likely that someone responsible will be able to recognize the problem. – zero323 Sep 24 '17 at 18:43

1 Answers1

1

I believe the behaviour you experience is related to the change that is for "UNCACHE TABLE should un-cache all cached plans that refer to this table".

I think you may find more information in SPARK-21478 Unpersist a DF also unpersists related DFs where Xiao Li said:

This is by design. We do not want to use the invalid cached data.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • Thank you this does in fact look like the cause, though in these bugs they don't specifically mention affecting the count of the RDD's. I'll continue to work on distilling my example down. Incidentally I think this is a pretty radical thing to change in a x.x.x release. The release notes say it contains "stability fixes" but this change is destabilizing. – Uncle Long Hair Sep 25 '17 at 13:11