5

I have parquet files stored in partitions by date in directories like:

/activity
    /date=20180802

I'm using Spark 2.2 and there are 400+ partitions. My understanding is that predicate pushdown should allow me to run a query like the one below and get quick results.

spark.read.parquet(".../activity")
    .filter($"date" === "20180802" && $"id" === "58ff800af2")
    .show()

However, the query above is taking around 90 seconds while the query below takes around 5 seconds. Am I doing something wrong or is this expected behavior?

spark.read.parquet(".../activity/date=20180802")
    .filter($"id" === "58ff800af2")
    .show()
Duke Silver
  • 115
  • 2
  • 7
  • 1
    Have you tried to run the query several times in a row? The first one is much slower because Spark needs to scan all the partitions. Also, it may be worth calling .explain() to see the plan and verify that the filter is pushed down indeed as expected. – Denis Makarenko Aug 15 '18 at 06:39
  • I misinterpreted this question initially, too hasty. I am wondering if predicate pushdown works here actually. This is not SPARK SQL but spark.read non-JDBC. – thebluephantom Aug 15 '18 at 09:55
  • 1
    Interesting question with some interesting statements on SO. I think Denis has the answer as you are not using an SQL statement but spark.read. That said one would expect smarter performance, but ... – thebluephantom Aug 15 '18 at 10:21
  • 1
    https://stackoverflow.com/questions/37180073/does-spark-support-partition-pruning-with-parquet-files Please note the example given and Denis answer, I think this explains it. – thebluephantom Aug 15 '18 at 10:39
  • 1
    I think we are also talking about partition pruning here as well. – thebluephantom Aug 15 '18 at 10:44
  • Thanks, @DenisMakarenko and thebluephantom. I ran the explain to verify that the filter is pushed down and running the query several times did help. Filtering on the date instead still took around 10 seconds vs 5 seconds when including it in the path, but that's more in line with what I'd expect. – Duke Silver Aug 15 '18 at 10:57

2 Answers2

5

I noticed this too and talked about it at a Spark Summit presentation.

Spark performs an expensive file listing operation that can really slow things down. Spark is really bad at listing files. I've compared Spark file listing times with AWS CLI and don't know why it takes Spark so long to list files.

You should rephrase "My understanding is that predicate pushdown..." to "my understanding is that partition filters...". Predicate pushdown filtering is different.

This is also an issue with Delta Data lakes. It's actually worse with Delta data lakes because the work-around you mentioned to avoid the file listing doesn't work with Delta.

In short, you're not doing anything wrong and this is the expected behavior. You only have 400 partitions, so the unnecessary file listing isn't so bad in your case. Imagine how slow this gets when you have 20,000 partitions!

Powers
  • 18,150
  • 10
  • 103
  • 108
0

Try this and see if predicate pushdown and partition pruning is working or not:

val df = spark.read.parquet(".../activity")
df.filter($"date" === "20180802" && $"id" === "58ff800af2").explain(true)

Look for PushedFilter[ ...] and PartitionFilters [ ...] in the Physical Plan generated. This will tell you the reason why the 1st part is not working. But I am not sure how to resolve it as I am facing something similar and weird, but not yet resolved.

Nitin Kumar
  • 219
  • 2
  • 10