53

I am using Spark 1.3.0 with python api. While transforming huge dataframes, I cache many DFs for faster execution;

df1.cache()
df2.cache()

Once use of certain dataframe is over and is no longer needed how can I drop DF from memory (or un-cache it??)?

For example, df1 is used through out the code while df2 is utilized for few transformations and after that, it is never needed. I want to forcefully drop df2 to release more memory space.

Shaido
  • 27,497
  • 23
  • 70
  • 73
ankit patel
  • 1,399
  • 5
  • 17
  • 29
  • 1
    @Paul For the record, the reason this is actually not a duplicate is because the DataFrame API is different from the RDD API. This method call in particular is shared between them, however. – JHixson Feb 09 '17 at 07:17

4 Answers4

83

just do the following:

df1.unpersist()
df2.unpersist()

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

Alexander
  • 105,104
  • 32
  • 201
  • 196
  • 7
    And pay attention to unpersist the df after the end of the lineage, so after the last action that involves the cached df. – axlpado - Agile Lab Aug 26 '15 at 10:11
  • 2
    I tried this for one of my dataframes "df" and when I did df.show(), df was still displaying data. When does it actually unpersist? – spacedustpi Aug 10 '20 at 15:09
  • 3
    @spacedustpi it removes the dataframe from the cache. (Somewhere in memory or on disk if not enough space in memory) By calling show, you triggered an action and then the computation has been done from the beginning to show you the data. – Itération 122442 Apr 05 '22 at 07:31
41

If the dataframe registered as a table for SQL operations, like

df.createGlobalTempView(tableName) // or some other way as per spark verision

then the cache can be dropped with following commands, off-course spark also does it automatically

Spark >= 2.x

Here spark is an object of SparkSession

  • Drop a specific table/df from cache

     spark.catalog.uncacheTable(tableName)
    
  • Drop all tables/dfs from cache

     spark.catalog.clearCache()
    

Spark <= 1.6.x

  • Drop a specific table/df from cache

     sqlContext.uncacheTable(tableName)
    
  • Drop all tables/dfs from cache

     sqlContext.clearCache()
    
Community
  • 1
  • 1
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
2
  1. If you need to block during removal => df2.unpersist(true)
  2. Unblocking removal => df2.unpersist()
Vinayak Mishra
  • 341
  • 4
  • 11
0

Here is a simple utility context manager that takes care of that for you:

@contextlib.contextmanager
def cached(df):
    df_cached = df.cache()
    try:
        yield df_cached
    finally:
        df_cached.unpersist()
user344577
  • 127
  • 6