2

We have fact table(30 columns) stored in parquet files on S3 and also created table on this files and cache it afterwards. Table is created using this code snippet:

val factTraffic = spark.read.parquet(factTrafficData)
factTraffic.write.mode(SaveMode.Overwrite).saveAsTable("f_traffic")
%sql CACHE TABLE f_traffic

We run many different calculations on this table(files) and are looking the best way to cache data for faster access in subsequent calculations. Problem is, that for some reason it's faster to read the data from parquet and do the calculation then access it from memory. One important note is that we do not utilize every column. Usually, around 6-7 columns per calculation and different columns each time.

Is there a way to cache this table in memory so we can access it faster then reading from parquet?

datahack
  • 477
  • 1
  • 11
  • 32

2 Answers2

3

It sounds like you're running on Databricks, so your query might be automatically benefitting from the Databricks IO Cache. From the Databricks docs:

The Databricks IO cache accelerates data reads by creating copies of remote files in nodes’ local storage using fast intermediate data format. The data is cached automatically whenever a file has to be fetched from a remote location. Successive reads of the same data are then executed locally, which results in significantly improved reading speed.

The Databricks IO cache supports reading Parquet files from DBFS, Amazon S3, HDFS, Azure Blob Storage, and Azure Data Lake. It does not support other storage formats such as CSV, JSON, and ORC.

The Databricks IO Cache is supported on Databricks Runtime 3.3 or newer. Whether it is enabled by default depends on the instance type that you choose for the workers on your cluster: currently it is enabled automatically for Azure Ls instances and AWS i3 instances (see the AWS and Azure versions of the Databricks documentation for full details).

If this Databricks IO cache is taking effect then explicitly using Spark's RDD cache with an untransformed base table may harm query performance because it will be storing a second redundant copy of the data (and paying a roundtrip decoding and encoding in order to do so).

Explicit caching can still can make sense if you're caching a transformed dataset, e.g. after filtering to significantly reduce the data volume, but if you only want to cache a large and untransformed base relation then I'd personally recommend relying on the Databricks IO cache and avoiding Spark's built-in RDD cache.

See the full Databricks IO cache documentation for more details, including information on cache warming, monitoring, and a comparision of RDD and Databricks IO caching.

Josh Rosen
  • 13,511
  • 6
  • 58
  • 70
1

The materalize dataframe in cache, you should do:

val factTraffic = spark.read.parquet(factTrafficData)
factTraffic.write.mode(SaveMode.Overwrite).saveAsTable("f_traffic")
val df_factTraffic = spark.table("f_traffic").cache
df_factTraffic.rdd.count
// now df_factTraffic is materalized in memory

See also https://stackoverflow.com/a/42719358/1138523

But it's questionable whether this makes sense at all because parquet is a columnar file format (meaning that projection is very efficient), and if you need different columns for each query the caching will not help you.

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • Projection isn't as efficient against S3 or the other cloudstores as it is against HDFS as seek is very expensive if you need to abort the HTTP GET and start a new one. If you aren't using the cache then you need to check to see if the S3A client in databricks supports the fs.s3a.experimental.fadvise=random option, then set it when working with parquet data, *but not when working with CSV or .gz file* – stevel Mar 23 '18 at 10:39
  • is 'f_traffic' a file on disk? – sAguinaga Aug 21 '19 at 12:54