10

I've been exploring query optimizations in the recent releases of Spark SQL 2.3.0-SNAPSHOT and noticed different physical plans for semantically-identical queries.

Let's assume I've got to count the number of rows in the following dataset:

val q = spark.range(1)

I could count the number of rows as follows:

  1. q.count
  2. q.collect.size
  3. q.rdd.count
  4. q.queryExecution.toRdd.count

My initial thought was that it's almost a constant operation (surely due to a local dataset) that would somehow have been optimized by Spark SQL and would give a result immediately, esp. the 1st one where Spark SQL is in full control of the query execution.

Having had a look at the physical plans of the queries led me to believe that the most effective query would be the last:

q.queryExecution.toRdd.count

The reasons being that:

  1. It avoids deserializing rows from their InternalRow binary format
  2. The query is codegened
  3. There's only one job with a single stage

The physical plan is as simple as that.

Details for Job

Is my reasoning correct? If so, would the answer be different if I read the dataset from an external data source (e.g. files, JDBC, Kafka)?

The main question is what are the factors to take into consideration to say whether a query is more efficient than others (per this example)?


The other execution plans for completeness.

q.count

q.count

q.collect.size

q.collect.size

q.rdd.count

q.rdd.count

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • how did you get these DAG graphics with additional parameters (number of rows etc), never saw that. Is this new in spark 2.3? – Raphael Roth May 08 '17 at 09:19
  • 1
    @JacekLaskowski Benchmark? :) 2-node cluster, 10000000 rows, 10 iterations - should answer your question. Also you can launch Java Mission Control to measure GC, in line code compilation – T. Gawęda May 08 '17 at 09:36
  • @RaphaelRoth Don't remember when it was added, but think it's in 2.1 already. They're SQLMetrics (based on task metrics that are in turn based on accumulators). – Jacek Laskowski May 08 '17 at 09:55
  • @T.Gawęda I'd like to get a general answer that would teach me how to look at a query and know a probable answer. Perhaps some high-level tips and tricks. If, what you said, were so simple, you'd have answered my question already with **the** winning query, didn't you? ;-) – Jacek Laskowski May 08 '17 at 09:57
  • No, I woudn't answer it then - it's simple, but very time-consuming :) There's no other option to measure it than benchmarks of given example, other entity in Dataset can cause other times due to serialization, etc. Look at times and maybe compiled code also will give advice – T. Gawęda May 08 '17 at 09:58
  • 1
    Just a hunch but `q.count` seems to be the only reasonable choice here. This it the only one when source specific optimizations could be applied (related question by Daniel Darabos: http://stackoverflow.com/q/40629435/1560062). `q.queryExecution.toRdd.count` might be quite fast (after all it is just a naive `while` with mutable accumulator so JVM should like it) but it is completely unaware of the context. For example if you run this over JDBC it will just fetch all the rows, instead of a bunch of ones. – zero323 May 08 '17 at 09:59
  • @zero323 Could you elaborate on _" if you run this over JDBC it will just fetch all the rows, instead of a bunch of ones"_ I think I'd disagree as all the optimizations are taken care earlier in the query execution pipeline. The only difference from the other queries is lack of deserialization from InternalRow to rows. Would you disagree? Want to hear more. Thanks! – Jacek Laskowski May 08 '17 at 10:06
  • Just because you ask so nicely ;-) `SELECT COUNT(*) FROM table` is rewritten as `SELECT SUM(1) FROM table` and as a result the query, which will be pushed over JDBC, is `SELECT 1 FROM table` So effectively Spark has to fetch N * 1. This is what happens with `q.count`. When you use `q.queryExecution.toRdd.count` it will be `SELECT * FROM table`. Keeping `InternalRow` what help you with that ∎ – zero323 May 08 '17 at 10:12
  • It is of course not limited to JDBC. Cassandra should behave similarly, same as advanced file based sources. So there is a huge potential difference in terms of IO. – zero323 May 08 '17 at 10:36
  • 1
    @zero323 Despite this. Is there any benefits on running application not on main interface? I think that we shoudn't use queries directly, but via Datasets – T. Gawęda May 08 '17 at 10:48
  • @zero323 Disagree strongly (but very nicely ;-)). You're talking about logical plan optimization that **is supposed to** be applied before `toRdd` (which is one of the last phases in a query execution, just before `rdd` with deserializing InternalRows). No change here, I'm sure. – Jacek Laskowski May 08 '17 at 10:54
  • @T.Gawęda We're talking about optimizing Spark SQL that's in heavy development as far as optimizations are concerned and yes I **do** risk breaking the application between Spark versions. Let's put it aside for now as I'm asking strictly about how to fine tune the query to get the best performance. – Jacek Laskowski May 08 '17 at 10:57
  • Jacek, it wouldn't be fun if you just agree with me :) Luckily for us it is trivial to check and ince I am fairly confident about the outcome I'll skip to the conclusion - no optimization should (we can argue about that) or can happen here (given current API). Once you call `toRdd` and get `RDD[InternalRow]` it is just a black box with a blob of bytecode and optimizer is completly indifferent about it. You could eat puppies alive, try to take over the world, or just `count` - it is all the same. It is essentially the same behavior which makes "statically" typed `Dataset` so disappointing. – zero323 May 08 '17 at 11:14
  • 1
    any other 'count' methods y'all want benchmarked? compiling timings of different datasizes(1M,1B,1T), methods, data-sources(range,parquet,textfile) for the 4 listed – James Tobin May 08 '17 at 15:46

2 Answers2

11

I did some testing on val q = spark.range(100000000):

  1. q.count: ~50 ms
  2. q.collect.size: I stopped the query after a minute or so...
  3. q.rdd.count: ~1100 ms
  4. q.queryExecution.toRdd.count: ~600 ms

Some explanation:

Option 1 is by far the fastest because it uses both partial aggregation and whole stage code generation. The whole stage code generation allows the JVM to get really clever and do some drastic optimizations (see: https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html).

Option 2. Is just slow and materializes everything on the driver, which is generally a bad idea.

Option 3. Is like option 4, but this first converts an internal row to a regular row, and this is quite expensive.

Option 4. Is about as fast you will get without whole stage code generation.

  • BTW, why is option 4. without codegen? `QueryExecution.preparations` physical plan preparation rules are executed just before `toRdd` (as `executedPlan`) so codegen should be part of option 4. It's only today when I explored the area so I might be confusing things. I'd appreciate your answer. Thanks. – Jacek Laskowski May 08 '17 at 19:34
  • Ok, lemme rephrase this. Option 4 is partially code generated. The range operator is code generated and the aggregation is not. This has two drawbacks: we need to materialize a row and (more importantly) it is much harder for the JVM to optimize this. – Herman van Hovell May 08 '17 at 19:44
  • 1
    Nice rationale with the answers, I just finished compiling a set of benchmarks for different file types and sizes and your rationale seems to follow (for the most part) what I see in the data, I'll post my findings when I can get to my personal computer – James Tobin May 08 '17 at 20:01
4

Formatting stuff is terrible, oh well

/*note: I'm using spark 1.5.2 so some of these might be different than what you might find in a newer version
 *note: These were all done using a 25 node , 40 core/node and started with --num-executors 64 --executor-cores 1 --executor-memory 4g
 *note: the displayed values are the mean from 10 runs
 *note: the spark-shell was restarted every time I noticed any spikes intra-run
 *
 *million/billion = sc.parallelize(1  to 1000000).toDF("col1")
 *
 *val s0 = sc.parallelize(1  to 1000000000)
 *//had to use this to get around maxInt constraints for Seq
 *billion10 = sc.union(s0,s1,s2,s3,s4,s5,s6,s7,s8,s9).toDF("col1")
 *
 *for parquet files
 *compression=uncompressed
 *written with:    million/billion/billion10.write.parquet
 *read with:    sqlContext.read.parquet
 *
 *for text files
 *written with:    million/billion/billion10.map(x=> x.mkString(",")).saveAsTextFile
 *read with:    sc.textFile.toDF("col1")
 *
 *excluded the collect() because that would have murdered my machine
 *made them all dataframes for consistency
/*


size       type     query         
billion10  text     count              81.594582
                    queryExecution     81.949047
                    rdd.count         119.710021
           Seq      count              18.768544
                    queryExecution     14.257751
                    rdd.count          36.404834
           parquet  count              12.016753
                    queryExecution     24.305452
                    rdd.count          41.932466
billion    text     count              14.120583
                    queryExecution     14.346528
                    rdd.count          22.240026
           Seq      count               2.191781
                    queryExecution      1.655651
                    rdd.count           2.831840
           parquet  count               2.004464
                    queryExecution      5.010546
                    rdd.count           7.815010
million    text     count               0.975095
                    queryExecution      0.113718
                    rdd.count           0.184904
           Seq      count               0.192044
                    queryExecution      0.029069
                    rdd.count           0.036061
           parquet  count               0.963874
                    queryExecution      0.217661
                    rdd.count           0.262279

Observations:

  • For the million records, the Seq was the fastest, but tenths of seconds can be hard to gauge true speed difference across a cluster
  • Storing it as TEXT was slow in general, could be how I was reading it in, I almost exclusively work with Parquet, so missing something there would be easy for me to miss
  • count and queryExecution are faster than rdd.count for every case (as Herman rationalized in his answer)
  • count and queryExecution take turns being faster and vary between datatypes:
    • count is faster for parquet
    • queryExecution is faster for Seq
    • for Text they are nearly identical
  • the speeds are non-linear as size increases

If anyone would like a different storage type , counting different dtypes, compression, more columns, go ahead and comment or message me and I'll see what I can do

James Tobin
  • 3,070
  • 19
  • 35
  • Thanks James. It looks like the question requires checking all different file format, data sources and sizes. Lots of work and certainly depends on the version of Spark (as things change every release). – Jacek Laskowski May 09 '17 at 19:25