11

I am reading parquet data and I see that it is listing all the directories on driver side

Listing s3://xxxx/defloc/warehouse/products_parquet_151/month=2016-01 on driver
Listing s3://xxxx/defloc/warehouse/products_parquet_151/month=2014-12 on driver

I have specified month=2014-12 in my where clause. I have tried using spark sql and data frame API, and looks like both aren't pruning partitions.

Using Dataframe API

df.filter("month='2014-12'").show()

Using Spark SQL

sqlContext.sql("select name, price from products_parquet_151 where month = '2014-12'")

I have tried the above on versions 1.5.1, 1.6.1 and 2.0.0

swatisinghi
  • 667
  • 7
  • 9
  • I had a similar problem and I solved it with \` \` - try to use something like `sqlContext.sql("select name, price from products_parquet_151 where \`month\` = '2014-12'")` – VladoDemcak Oct 06 '16 at 14:04
  • 1
    that doesn't solve it. The issue is that spark issues a catalog call without pushing down the filter to the lower HDFS layer – Gaurav Shah Oct 15 '16 at 06:37

2 Answers2

3

Spark needs to load the partition metdata first in the driver to know whether the partition exists or not. Spark will query the directory to find existing partitions to know if it can prune the partition or not during the scanning of the data.

I've tested this on Spark 2.0 and you can see in the log messages.

16/10/14 17:23:37 TRACE ListingFileCatalog: Listing s3a://mybucket/reddit_year on driver
16/10/14 17:23:37 TRACE ListingFileCatalog: Listing s3a://mybucket/reddit_year/year=2007 on driver

This doesn't mean that we're scaning the files in each partition, but Spark will store the locations of the partitions for future queries on the table.

You can see the logs that it is actually passing in partition filters to prune the data:

16/10/14 17:23:48 TRACE ListingFileCatalog: Partition spec: PartitionSpec(StructType(StructField(year,IntegerType,true)),ArrayBuffer(PartitionDirectory([2012],s3a://mybucket/reddit_year/year=2012), PartitionDirectory([2010],s3a://mybucket/reddit_year/year=2010), ...PartitionDirectory([2015],s3a://mybucket/reddit_year/year=2015), PartitionDirectory([2011],s3a://mybucket/reddit_year/year=2011)))
16/10/14 17:23:48 INFO ListingFileCatalog: Selected 1 partitions out of 9, pruned 88.88888888888889% partitions.

You can see this in the logical plan if you run an explain(True) on your query: spark.sql("select created_utc, score, name from reddit where year = '2014'").explain(True)

This will show you the plan and you can see that it is filtering at the bottom of the plan:

+- *BatchedScan parquet [created_utc#58,name#65,score#69L,year#74] Format: ParquetFormat, InputPaths: s3a://mybucket/reddit_year, PartitionFilters: [isnotnull(year#74), (cast(year#74 as double) = 2014.0)], PushedFilters: [], ReadSchema: struct<created_utc:string,name:string,score:bigint>
MrChristine
  • 1,461
  • 13
  • 13
  • In general this is not required. For example partitioned JSON can be loaded without scanning other partitions. –  Oct 14 '16 at 18:39
  • This is not satisfying answer but as the only one bounty is yours. –  Oct 17 '16 at 01:54
  • @LostInOverflow The initial question was regarding parquet, which is a binary format that supports partition pruning. I did a quick check with partitioned json files and it looks like it doesn't pass down any filters during the scan. For non-binary formats, this looks like a feature request would be needed for your specific use case. Hope that helps. – MrChristine Oct 17 '16 at 19:15
  • 1
    A little. I set a couple of bounties on similar topics so far hoping someone will give a better answer but nothing so far :( In my particular case plain formats with partitions are just significantly more efficient. Using persistent metastore helps but it is something I would like to avoid. Thanks for the answer and I hope I didn't sound ungrateful. –  Oct 18 '16 at 16:27
3

Spark has opportunities to improve its partition pruning when going via Hive; see SPARK-17179.

If you are just going direct to the object store, then the problem is that recursive directory operations against object stores are real performance killers. My colleagues and I have done work in the S3A client there HADOOP-11694 —and now need to follow it up with the changes to Spark to adopt the specific API calls we've been able to fix. For that though we need to make sure we are working with real datasets with real-word layouts, so don't optimise for specific examples/benchmarks.

For now, people should chose partition layouts which have shallow directory trees.

stevel
  • 12,567
  • 1
  • 39
  • 50