16

The Parquet files contain a per-block row count field. Spark seems to read it at some point (SpecificParquetRecordReaderBase.java#L151).

I tried this in spark-shell:

sqlContext.read.load("x.parquet").count

And Spark ran two stages, showing various aggregation steps in the DAG. I figure this means it reads through the file normally instead of using the row counts. (I could be wrong.)

The question is: Is Spark already using the row count fields when I run count? Is there another API to use those fields? Is relying on those fields a bad idea for some reason?

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114

2 Answers2

19

That is correct, Spark is already using the rowcounts field when you are running count.

Diving into the details a bit, the SpecificParquetRecordReaderBase.java references the Improve Parquet scan performance when using flat schemas commit as part of [SPARK-11787] Speed up parquet reader for flat schemas. Note, this commit was included as part of the Spark 1.6 branch.

If the query is a row count, it pretty much works the way you described it (i.e. reading the metadata). If the predicates are fully satisfied by the min/max values, that should work as well though that is not as fully verified. It's not a bad idea to use those Parquet fields but as implied in the previous statement, the key issue is to ensure that the predicate filtering matches the metadata so you are doing an accurate count.

To help understand why there are two stages, here's the DAG created when running the count() statement.

enter image description here

When digging into the two stages, notice that the first one (Stage 25) is running the file scan while the second stage (Stage 26) runs the shuffle for the count.

enter image description here enter image description here

Thanks to Nong Li (the author of the SpecificParquetRecordReaderBase.java commit) for validating!

 

Updated

To provide additional context on the bridge between Dataset.count and Parquet, the flow of the internal logic surrounding this is:

  • Spark does not read any Parquet columns to calculate the count
  • Passing of the Parquet schema to the VectorizedParquetRecordReader is actually an empty Parquet message
  • Computing the count using the metadata stored in the Parquet file footers. involves the wrapping of the above within an iterator that returns an InternalRow per InternalRow.scala.

To work with the Parquet File format, internally, Apache Spark wraps the logic with an iterator that returns an InternalRow; more information can be found in InternalRow.scala. Ultimately, the count() aggregate function interacts with the underlying Parquet data source using this iterator. BTW, this is true for both vectorized and non-vectorized Parquet reader.

Therefore, to bridge the Dataset.count() with the Parquet reader, the path is:

  • The Dataset.count() call is planned into an aggregate operator with a single count() aggregate function.
  • Java code is generated at planning time for the aggregate operator as well as the count() aggregate function.
  • The generated Java code interacts with the underlying data source ParquetFileFormat with an RecordReaderIterator, which is used internally by the Spark data source API.

For more information, please refer to Parquet Count Metadata Explanation.

Denny Lee
  • 3,154
  • 1
  • 20
  • 33
  • 2
    Thanks for the very detailed answer! When you say _"it pretty much works the way you described it"_ do you mean it does a scan through the file, or does it look at the metadata in the Parquet file? (I'm afraid I described both possibilities in the question...) Same goes for the _"That is correct"_ at the start :). Thanks for any clarification! – Daniel Darabos Nov 22 '16 at 14:22
  • Sorry about that - I meant that it reads the metadata of the Parquet file. I'll edit the answer shortly. :). – Denny Lee Nov 22 '16 at 14:46
  • 1
    Thank you for a nice answer but I am not convinced yet. You are linking to `totalRowCount` but it looks like it is used only by `getProgress()`, `checkEndOfRowGroups()` and `nextBatch()` and I cannot find any package level access. Could you add a reference how it is used with combination with `Dataset.count`. TIA –  Nov 22 '16 at 16:22
  • Good point - I thought I had tracked it down correctly but I may be missing something. Here's the path from `org.apache.spark.sql.execution.datasources.parquet` in this [gist](https://gist.github.com/dennyglee/d427594b3a0a58890631e9b020418625) - does this look right? – Denny Lee Nov 23 '16 at 02:41
  • I am not sure. It looks like the thing that could be used but I still cannot figure out if it is or not. Not enough understanding on my side :( –  Nov 23 '16 at 12:34
  • Just updated my response which hopefully helps address the bridge between ParquetFileFormat and `dataset.count()`. HTH! – Denny Lee Dec 06 '16 at 17:03
-1

We can also use

java.text.NumberFormat.getIntegerInstance.format(sparkdf.count)

RajenDharmendra
  • 349
  • 3
  • 5