3

I have two scenarios where I have 23 GB partitioned parquet data and reading few of the columns & caching it upfront to fire a series of subsequent queries later on.

Setup:

  • Cluster: 12 Node EMR
  • Spark Version: 1.6
  • Spark Configurations: Default
  • Run Configurations: Same for both cases

Case 1:

val paths = Array("s3://my/parquet/path", ...)
val parqFile = sqlContext.read.parquet(paths:_*)
parqFile.registerTempTable("productViewBase")
val dfMain = sqlContext.sql("select guid,email,eventKey,timestamp,pogId from productViewBase")
dfMain.cache.count

From SparkUI, the input data read is 6.2 GB and the cached object is of 15.1 GB.

Case 1:

val paths = Array("s3://my/parquet/path", ...)
val parqFile = sqlContext.read.parquet(paths:_*)
parqFile.registerTempTable("productViewBase")
val dfMain = sqlContext.sql("select guid,email,eventKey,timestamp,pogId from productViewBase order by pogId")
dfMain.cache.count

From SparkUI, the input data read is 6.2 GB and the cached object is of 5.5 GB.

Any explanation, or code-reference to this behavior?

Mohitt
  • 2,957
  • 3
  • 29
  • 52

1 Answers1

4

It is actually relatively simple. As you can read in the SQL guide:

Spark SQL can cache tables using an in-memory columnar format ... Spark SQL will scan only required columns and will automatically tune compression

Nice thing about sorted columnar storage is that it is very easy to compress on typical data. When you sort, you get these blocks of the similar records which can be squashed together using even very simple techniques like RLE.

This is a property that is actually used quite often in databases with columnar storage because it is not only very efficient in terms of storage but also aggregations.

Different aspects of the Spark columnar compression are covered by the sql.execution.columnar.compression package and as you can see RunLengthEncoding is indeed one of the available compressions schemes.

So there are two pieces here:

  • Spark can adjust compression method on the fly based on the statistics:

    Spark SQL will automatically select a compression codec for each column based on statistics of the data.

  • sorting can cluster similar records together making compression much more efficient.

If there are some correlations between columns (when it is not the case?) even a simple sort based on a single column can have relatively large impact and improve the performance of different compression schemes.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Not an expert on columnar storage, but it looks like a miracle that ordered data on one column caused 3x compression. Can you share any specifics or code-reference which I can debug/experiment with? – Mohitt Mar 26 '16 at 17:13
  • 1
    @Mohitt is there any serialization involved ? – eliasah Mar 26 '16 at 17:27
  • No explicit serialization. Everything is default. – Mohitt Mar 26 '16 at 17:29
  • Can you try to experiment using Kryo per example ? Raw caching comes with an overhead and it can take 2-4 times more space when persisted. – eliasah Mar 26 '16 at 17:33
  • @Mohitt I've extended the answer a little but this things depend on different factors. Personally I would recommend to browse some the books about In-Memory Data Managemen written by prof. Hasso Plattner. These are usually crazy expensive but many chapters have been made available for free as the course materials. – zero323 Mar 26 '16 at 18:00