52

I am a spark application with several points where I would like to persist the current state. This is usually after a large step, or caching a state that I would like to use multiple times. It appears that when I call cache on my dataframe a second time, a new copy is cached to memory. In my application, this leads to memory issues when scaling up. Even though, a given dataframe is a maximum of about 100 MB in my current tests, the cumulative size of the intermediate results grows beyond the alloted memory on the executor. See below for a small example that shows this behavior.

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)

df = (hive_context.read
      .format('com.databricks.spark.csv')
      .load('simple_data.csv')
     )
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()

simple_data.csv:

1,2,3
4,5,6
7,8,9

Looking at the application UI, there is a copy of the original dataframe, in adition to the one with the new column. I can remove the original copy by calling df.unpersist() before the withColumn line. Is this the recommended way to remove cached intermediate result (i.e. call unpersist before every cache()).

Also, is it possible to purge all cached objects. In my application, there are natural breakpoints where I can simply purge all memory, and move on to the next file. I would like to do this without creating a new spark application for each input file.

Thank you in advance!

zero323
  • 322,348
  • 103
  • 959
  • 935
bjack3
  • 991
  • 2
  • 7
  • 14

3 Answers3

96

Spark 2.x

You can use Catalog.clearCache:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()

Spark 1.x

You can use SQLContext.clearCache method which

Removes all cached tables from the in-memory cache.

from pyspark.sql import SQLContext
from pyspark import SparkContext

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()
10465355
  • 4,481
  • 2
  • 20
  • 44
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 3
    This is a good solution for now as it allows me to clear the full cache at reasonable breakpoints. I will incorporate this, but I'm worried about when I scale up and begin working with larger datasets the old caches my start to start to grow out of control. If I want to clear old caches as I go, is the recommendation to create a new variable (or temp variables), and explicitly unpersist the old objects. Something like: `df.cache()` ; `df_new = df.withColumn('C1+C2', df['C1'] + df['C2'])` ; `df_new.cache()` ; `df.unpersist()`. This seems a bit cumbersome if it's the only way... – bjack3 Apr 28 '16 at 15:46
  • Typically there is no need to explicitly clear cache. It is cleaned automatically when needed. – zero323 Apr 28 '16 at 15:55
  • 4
    I'm worried I'm doing something wrong then. In my full application, my jobs will eventually crash due to out-of-memory errors. Each individual copy of a dataframe is reasonably small (under 100 MB), but the caches seem to live on forever; even after writing output to a file, and moving on to the next steps. I'll see if I can generate a smaller working example to show this in action. – bjack3 Apr 28 '16 at 15:58
  • For now, this doesn't happen when I call clearCache() between major steps, but I'm worried about when the data frames get much larger. – bjack3 Apr 28 '16 at 16:02
  • 1
    I think you have a wrong impression how caching works. It may or may not happen, data can be cached only partially, and even if it is cached, it can be evicted from cache without user knowledge. – zero323 Apr 28 '16 at 16:20
  • 4
    Thank you for the clarification. I'm not sure if my observations are consistent with the behavior you describe. In my tests, lost of small caches are persisting unless I explicitly free them with `clearCache()`. This leads to an out-of-memory error. If the caches were being freed behind the scenes, I wouldn't expect to saturate my allocated memory, even if I don't use `clearCache`. Do you have ideas why the caches may not be evicted, even though the executor is low on memory? – bjack3 Apr 28 '16 at 17:49
  • It is really hard to tell without seeing your code, configuration and runtime data. – zero323 Apr 28 '16 at 18:10
  • 1
    I'm trying to update the example in the OP to show the behavior I'm seeing. I'm having issues running the example with less than 500 MB of memory (to read in a 4 kb file!). I'm going to post a new question to address this issue, and come back to this once I can generate a minimum working example. – bjack3 Apr 28 '16 at 20:58
  • 1
    Is `clearCache()` a blocking operation or is it lazy? – asmaier Nov 11 '17 at 13:58
27

We use this quite often

for (id, rdd) in sc._jsc.getPersistentRDDs().items():
    rdd.unpersist()
    print("Unpersisted {} rdd".format(id))

where sc is a sparkContext variable.

Tagar
  • 13,911
  • 6
  • 95
  • 110
1

When you use cache on dataframe it is one of the transformation and gets evaluated lazily when you perform any action on it like count(),show() etc.

In your case after doing first cache you are calling show() that is the reason the dataframe is cached in memory. Now then you are again performing transformation on the dataframe to add additional column and again caching the new dataframe and then calling the action command show again and this would cache the second dataframe in memory. In case if size of your dataframe is big enough to just hold one dataframe then when you cache the second dataframe it would remove the first dataframe from the memory as it does not have enough space to hold the second dataframe.

Thing to keep in mind: You should not cache a dataframe unless you are using it in multiple actions otherwise it would be an overload in terms of performance as caching itself is costlier operation.

Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35