1

I'm trying to query the MIN(dt) in a table partitioned by dt column using the following query in both Spark2 and Spark3:

SELECT MIN(dt) FROM table_name

The table is stored in parquet format in S3, where each dt is a separate folder, so this seems like a pretty simple operation. There's about 3,200 days of data.

In Spark2, this query completes in ~1 minute, while in Spark3, the query takes over an hour (not sure how long exactly since it hasn't finished yet).

In Spark3, the execution plan is:

AdaptiveSparkPlan (10)
+- == Current Plan ==
   HashAggregate (6)
   +- ShuffleQueryStage (5)
      +- Exchange (4)
         +- * HashAggregate (3)
            +- * ColumnarToRow (2)
               +- Scan parquet table_name (1)
+- == Initial Plan ==
   HashAggregate (9)
   +- Exchange (8)
      +- HashAggregate (7)
         +- Scan parquet table_name (1)

It's confusing to me how this would take a long time, as the data is already partitioned by dt. Spark only needs to determine which partitions have any rows and return the min of those.

mazaneicha
  • 8,794
  • 4
  • 33
  • 52
RyanCheu
  • 3,522
  • 5
  • 38
  • 47

1 Answers1

3

What you're suggesting was implemented once as OptimizeMetadataOnly query optimizer rule, via JIRA SPARK-15752 "Optimize metadata only query that has an aggregate whose children are deterministic project or filter operators".

However, it was found to cause correctness issues sometimes, when some of the partitions contained zero-row files, see JIRA SPARK-26709 "OptimizeMetadataOnlyQuery does not correctly handle the files with zero record".

Along with the fix, an internal Spark config spark.sql.optimizer.metadataOnly was added to provide a way to circumvent full-table scans "at your own risk", i.e. when you are certain that all your partitions aren't empty. Possibly, in your Spark 2 you have it set to true (or your Spark 2 doesn't include a fix at all). See also SPARK-34194 for additional discussion around it.

SPARK 3.0 deprecated this config (SPARK-31647), so most likely it is set to false in your environment, which causes Spark to scan all table partitions before aggregating the result to find min. But for the time being, you can still try setting it to true to speed up your query, just beware of the consequences.

mazaneicha
  • 8,794
  • 4
  • 33
  • 52
  • 1
    Thanks for detailed answer! I guess we can set this to true for now, but I'm still a bit confused why the query is _so_ slow. Surely the spark job can just read a single record from each partition instead and should still be fast? – RyanCheu Jan 24 '23 at 05:56
  • Oh nvm, I guess this is because parquet is a columnar format so reading a single row requires decompressing the entire parquet. – RyanCheu Jan 24 '23 at 06:06
  • 1
    Parquet stores number of rows in its footer (file-level metadata), Avro - in a header, csv - not at all. So I guess instead of dealing with all these format differences, Spark takes honest route and materializes the entire dataset before applying aggregate function the same way it would for any "normal" (non partition) column. – mazaneicha Jan 24 '23 at 13:53