1

I am running into some issues using cache on a spark dataframe. My expectation is that after a cache on a dataframe, the dataframe is created and cached the fist time it is needed. Any further calls to the dataframe should be from the cache

here's my code:

val mydf = spark.sql("read about 400 columns from a hive table").
  withColumn ("newcol", someudf("existingcol")).
  cache()

To test I ran a mydf.count() twice. I would expect the first time to take some time since the data is being cached. But the second time should be instantaneous?

What I am actually seeing is that it takes the same time for both the counts. This first one comes back pretty quickly which I think tells me that the data was not cached. If I remove the withColumn part of the code and just cache the raw data, the second count is instantaneous

Am I doing something wrong? How can I load raw data from hive, add columns and then cache the dataframe for further use? Using spark 2.3

Any help will be great!

Shay
  • 505
  • 1
  • 3
  • 19
  • Could you check explainPlan() for both and the Spark UIs storage tab to see if the data is actually persisted? – Sai Oct 04 '19 at 17:25

3 Answers3

0

As you are caching a dataset/dataframe, se the documented default behavior:

def cache(): Dataset.this.type

Persist this Dataset with the default storage level (MEMORY_AND_DISK).

So for your case you can try persist(MEMORY_ONLY)

def persist(newLevel: StorageLevel): Dataset.this.type

Persist this Dataset with the given storage level.

newLevel One of: MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
0

the problem with your case is that mydf.count() is not actually materializing the dataframe (i.e. not all columns are read, your udf will no be called). That is because count() is highly optimized.

To make sure the entire dataframe is cached into memory, you should repeat your experiment with mydf.rdd.count() or another query (e.g. using sorting and/or aggregation)

See e.g. this SO question

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • That does not make a difference. I tried running an agg too, the response time for multiple queries remains the same. The only way i can get it to work fast is by removing the withColumn, but I need it – Shay Oct 07 '19 at 16:07
0

If its relevant

.cache/persist is lazy evaluation, to force it you can use the spark SQL's API which have the capability change form lazy to eager.

CACHE [ LAZY ] TABLE table_identifier
    [ OPTIONS ( 'storageLevel' [ = ] value ) ] [ [ AS ] query ]

Unless LAZY specified it would be eager mode, you need to register a temp table prior to this.

Pseudo code would be:

df.createOrReplaceTempView("dummyTbl")
spark.sql("cache table dummyTbl")

More on the document reference - https://spark.apache.org/docs/latest/sql-ref-syntax-aux-cache-cache-table.html