19

I am working with a large dataset, that is partitioned by two columns - plant_name and tag_id. The second partition - tag_id has 200000 unique values, and I mostly access the data by specific tag_id values. If I use the following Spark commands:

sqlContext.setConf("spark.sql.hive.metastorePartitionPruning", "true")
sqlContext.setConf("spark.sql.parquet.filterPushdown", "true")
val df = sqlContext.sql("select * from tag_data where plant_name='PLANT01' and tag_id='1000'")

I would expect a fast response as this resolves to a single partition. In Hive and Presto this takes seconds, however in Spark it runs for hours.

The actual data is held in a S3 bucket, and when I submit the sql query, Spark goes off and first gets all the partitions from the Hive metastore (200000 of them), and then calls refresh() to force a full status list of all these files in the S3 object store (actually calling listLeafFilesInParallel).

It is these two operations that are so expensive, are there any settings that can get Spark to prune the partitions earlier - either during the call to the metadata store, or immediately afterwards?

Marco99
  • 1,639
  • 1
  • 19
  • 32
Euan
  • 559
  • 4
  • 10
  • 1
    I have also tried the above code with an additional config parameter: `sqlContext.setConf("spark.sql.hive.verifyPartitionPath", "false")` with no effect on performance – Euan May 13 '16 at 04:04
  • 1
    It is an interesting question, but hard to answer because you do not describe how the DataFrame for `tag_data` is created. I think it would be a good idea to extend the question so that it is reproducible on its own. – Daniel Darabos Aug 31 '16 at 12:53
  • 1
    If I knew more about Hive and Parquet, probably I would. As it is, I don't know how to create a (doubly) partitioned Parquet file. And it is unclear to me if you are just using the Parquet file directly or if Hive is involved here somehow. (Hive is mentioned several times, but I don't know what role it plays if this is just a Parquet file.) – Daniel Darabos Aug 31 '16 at 16:28
  • 1
    add your spark version. I'm not sure but might be creating external table(search for it) will help(enable hive support for this). As far as I understand it will do this scan only once and afterwards will save this data in hive metadata store. Next time you'll don't spend this overhead. Once again all above need to be verified. – Igor Berman Aug 31 '16 at 17:35
  • 2
    Related [Spark lists all leaf node even in partitioned data](http://stackoverflow.com/q/39513505/1560062) – zero323 Sep 15 '16 at 14:23
  • @Euan was the table created with hive ddl or spark ddl (can you share the create table statement)? There is no differences at runtime but spark ddl / tables handle metadata differently, upon first query of a table all files are listed and cached in the driver. – shoeboxer Sep 17 '16 at 22:26

3 Answers3

2

Yes, spark supports partition pruning.

Spark does a listing of partitions directories (sequential or parallel listLeafFilesInParallel) to build a cache of all partitions first time around. The queries in the same application, that scan data takes advantage of this cache. So the slowness that you see could be because of this cache building. The subsequent queries that scan data make use of the cache to prune partitions.

These are the logs which shows partitions being listed to populate the cache.

App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-01 on driver
App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-02 on driver
App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-03 on driver

These are the logs showing pruning is happening.

App > 16/11/10 12:29:16 main INFO DataSourceStrategy: Selected 1 partitions out of 20, pruned 95.0% partitions.

Refer convertToParquetRelation and getHiveQlPartitions in HiveMetastoreCatalog.scala.

swatisinghi
  • 667
  • 7
  • 9
2

similar question popped up here recently: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-reads-all-leaf-directories-on-a-partitioned-Hive-table-td35997.html#a36007

This question is old but I thought I'd post the solution here as well.

spark.sql.hive.convertMetastoreParquet=false

will use the Hive parquet serde instead of the spark inbuilt parquet serde. Hive's Parquet serde will not do a listLeafFiles on all partitions, but only and directly read from the selected partitions. On tables with many partitions and files, this is much faster (and cheaper, too). Feel free to try it ou! :)

Clemens Valiente
  • 829
  • 1
  • 8
  • 16
1

Just a thought:

Spark API documentation for HadoopFsRelation says, ( https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/sources/HadoopFsRelation.html )

"...when reading from Hive style partitioned tables stored in file systems, it's able to discover partitioning information from the paths of input directories, and perform partition pruning before start reading the data..."

So, i guess "listLeafFilesInParallel" could not be a problem.

A similar issue is already in spark jira: https://issues.apache.org/jira/browse/SPARK-10673

In spite of "spark.sql.hive.verifyPartitionPath" set to false and, there is no effect in performance, I suspect that the issue might have been caused by unregistered partitions. Please list out the partitions of the table and verify if all the partitions are registered. Else, recover your partitions as shown in this link:

Hive doesn't read partitioned parquet files generated by Spark

Update:

  1. I guess appropriate parquet block size and page size were set while writing the data.

  2. Create a fresh hive table with partitions mentioned, and file-format as parquet, load it from non-partitioned table using dynamic partition approach. ( https://cwiki.apache.org/confluence/display/Hive/DynamicPartitions ) Run a plain hive query and then compare by running a spark program.

Disclaimer: I am not a spark/parquet expert. The problem sounded interesting, and hence responded.

Community
  • 1
  • 1
Marco99
  • 1,639
  • 1
  • 19
  • 32