4

Is there a reliable way to predict which Spark dataframe operations will preserve partitioning and which won't?

Specifically, let's say my dataframes are all partitioned with .repartition(500,'field1','field2'). Can I expect an output with 500 partitions arranged by these same fields if I apply:

  1. select()
  2. filter()
  3. groupBy() followed by agg() when grouping happens on 'field1' and 'field2' (as in the above)
  4. join() on 'field1' and 'field2' when both dataframes are partitioned as above

Given the special way my data is prepartitioned, I'd expect no extra shuffling to take place. However, I always seem to end up with at least few stages having number of tasks equal to spark.sql.shuffle.partitions. Any way to avoid that extra shuffling bit?

Thanks

Go Erlangen
  • 191
  • 2
  • 11
  • 2
    `select` and `filter` will not cause a shuffle as these are narrow transformations. `GroupBy` will cause shuffle unless you have bucketing. Join will cause shuffle unless you do broadcast-join – Raphael Roth Sep 15 '18 at 09:13

1 Answers1

3

This is an well known issue with Spark. Even if you have re-partitioned the data Spark will shuffle the data.

What is the Problem

The re-partition ensures each partition contains the data about a single column value.

Good example here:

val people = List(
   (10, "blue"),
   (13, "red"),
   (15, "blue"),
   (99, "red"),
   (67, "blue")
)
val peopleDf = people.toDF("age", "color")
colorDf = peopleDf.repartition($"color")
Partition 00091
13,red
99,red

Partition 00168
10,blue
15,blue
67,blue

However Spark doesn't remember this information for subsequent operations. Also the total ordering of the partitions across different partitions are not kept in spark. i.e. Spark knows for a single partition it has data about one partition but doesn't know which other partitions have the data about the same column. Also the sorting is required in the data to ensure shuffle not required.

How can you solve

You need to use the Spark Bucketing feature

feature to ensure no shuffle in subsequent stages.

I found this Wiki is pretty detailed about the bucketing features.

Bucketing is an optimization technique in Spark SQL that uses buckets and bucketing columns to determine data partitioning.

The motivation is to optimize performance of a join query by avoiding shuffles (aka exchanges) of tables participating in the join. Bucketing results in fewer exchanges (and so stages).

Bartosz Konieczny
  • 1,985
  • 12
  • 27
Avishek Bhattacharya
  • 6,534
  • 3
  • 34
  • 53
  • I thought that was about writing out and reading in. Fine, But what about if dong many transformations after reading in? – thebluephantom Sep 15 '18 at 10:57
  • When you do many transformations back to back that would probably require a shuffle. Plus you have to ensure all the join/aggregations are on the bucketed columns. It is pretty hard and currently spark doesn't have a great support for bucket by. So probably you will end up having some shuffle – Avishek Bhattacharya Sep 15 '18 at 11:06
  • Cool, but I think that is the thrust of this question. – thebluephantom Sep 15 '18 at 11:19
  • Not sure how the question relates to the answer – thebluephantom Sep 15 '18 at 22:31
  • Hi and thanks for the answer. So, the only way to avoid shuffling for join and groupBy is to switch to table views and do bucketing while paying extra cost for sorting (that seems to be a precondition for bucketing)? In other words, if I only want to join/group my dataframe once, the effort is likely not worth it? – Go Erlangen Sep 19 '18 at 18:19