0

I am doing something like this:

import pandas as pd

pdf = pd.DataFrame({
    'a': [1, 2, 3],
    'b': ['a', 'b', 'c']
})

parent_df = spark.createDataFrame(pdf)
parent_df.cache().count()  

child_df = parent_df.replace('c', 'x')
child_df.cache().count()

parent_df.unpersist()

Essentially, I want to cache the parent_df because in the next steps, I am doing some heavy transformations on it. Once I finish those and I get back child_df, I no longer need the parent_df and so want to release it from the cache. However, doing this unpersists also the freshly cached child_df!

So obviously, the questions are:

  • why does this happen?
  • how can I accomplish what I want (releasing parent_df from cache while keeping the new child_df in cache)?

Interestingly, opposite scenario works - i.e. if I unpersist child_df instead of parent_df on the last line, the parent_df would remain cached as expected while child_df would be released.

PS: I found a similar question here Understanding Spark's caching . However, the answer for that one does not seem to work in this case, as here we are already calling an action (.count()) right after caching.

Ferrard
  • 2,260
  • 3
  • 22
  • 26

2 Answers2

1

This is a conscious design decision based on data consistency. One possible reason for unpersisting the parent would be because you expect its source data to change. Having a parent with new data, and an apparent child using old data, can cause unexpected and inconsistent results. As such any cached children of a parent are invalidated when the parent is.

There's a little discussion in the PR that implemented this change and in this bug report after the change was introduced.

As mentioned in the second link, if you do need to persist the child you can do so by materializing it as a table using saveAsTable.

jpeg
  • 2,372
  • 4
  • 18
  • 31
Dutch Gecko
  • 108
  • 1
  • 2
  • 9
  • 1
    Thanks, this is what I was looking for - not sure I am completely on board with the reason, but good to know this is the way it is meant to work! – Ferrard Aug 06 '19 at 13:25
0

Ok, I think I found a solution:

  • First, my guess as to why this is happening is that the parent_df cache point is part of child_df's lineage. I.e. even though child_df is using a later cache point, its DAG still contains the earlier bit from parent_df. Thus removing that cache point somehow affects the later cache points.

  • As to how to prevent this from happening, doing the following works:

import pandas as pd

pdf = pd.DataFrame({
    'a': [1, 2, 3],
    'b': ['a', 'b', 'c']
})

parent_df = spark.createDataFrame(pdf)
parent_df.cache().count()  

# this is the relevant line
child_df = spark.createDataFrame(parent_df.rdd, schema=parent_df.schema) 

child_df = child_df.replace('c', 'x')
child_df.cache().count()

parent_df.unpersist()

What happens at the relevant line (marked with comment) is that the lineage of the child_df is cut not to include the section corresponding to parent_df and starts with a "fresh RDD". Unpersisting the parent_df then leaves the lineage of child_df unaffected.

Again - even though this seem to work, I welcome more explanation/confirmation of this theory as an accepted answer!

Ferrard
  • 2,260
  • 3
  • 22
  • 26