2

I've seen few questions on SO about caching sql tables but none of them seems to be exactly answering my question.

The resulting dataframe from a query (from sqlContext.sql("...")) does not seems to be cachable like a regular dataframe.

Here is some example code (spark 2.2):

import org.apache.spark.sql._

def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined

val df = List("1", "2", "3").toDF.cache
df.show
isCached(df) // 1) Here, isCached returns 'true'

df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")

val df2 = spark.sqlContext.sql("select value, count(*) from myTable group by value").cache
df2.show
isCached(df2) // 2) (???) returns 'false'

val df3 = spark.sqlContext.sql("select value, 'a', count(*) from myTable group by value")
df3.registerTempTable("x")
spark.sqlContext.cacheTable("x")
df3.show
spark.catalog.isCached("x") // Returns 'true'
isCached(df3) // 3) (???) Returns 'false'

spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'
spark.catalog.isCached("x") // 4) (???) Returns 'false'

Spark UI shows some storage associated to df2 but it seems to be tied to df. Usually, we do .cache() followed by .count() to materialize and then unpersist parent dataframe when not needed anymore. In this example, when unpersisting df, storage seen in spark UI for df2 and df3 disappear as well.

So how do we get (2), (3) or most importantly (4) to return true ?

Michel Lemay
  • 2,054
  • 2
  • 17
  • 34

1 Answers1

0

After a while, I think it may be useful to post an answer to my question.

The trick is to truncate relation lineage with a new dataframe.

For that, I call spark.createDataFrame(df.rdd, df.schema).cache(). Others have suggested to call rdd.cache.count but that seems to be a lot more inefficient than creating a new one without materializing the underlying rdd.

import org.apache.spark.sql._

def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined

val df = List("1", "2", "3").toDF.cache
df.count // cache the df.
isCached(df) // 1) Here, isCached returns 'true'

df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")

val df2Temp = spark.sqlContext.sql("select value, count(*) from myTable group by value")
// truncate lineage and materialize new dataframe
val df2Cached = spark.createDataFrame(df2Temp.rdd, df2Temp.schema).cache
df2Cached.count
isCached(df2Cached) // 2) returns 'true'
df2Cached.createOrReplaceTempView("x")

// Still cached
isCached(df) 
spark.catalog.isCached("myTable")

// parent df not needed anymore
spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'

spark.catalog.isCached("x") // Still cached
Michel Lemay
  • 2,054
  • 2
  • 17
  • 34
  • Note: Starting with spark 2.4, it is now possible to unpersist parent dataframe without affecting cached child dataframes! – Michel Lemay Nov 12 '18 at 21:44