2

I need to do a lot of aggregations(around 9-10) on a large data-set in my PySpark code. I can approach it in 2 ways:

Single group by:

df.groupBy(col1, col2).agg({"col3":"sum", "col4":"avg", "col5":"min", "col6":"sum", "col7":"max", "col8":"avg", "col9":"sum"})

Group by and join

temp1 = df.groupBy(col1, col2).agg({"col3":"sum"})
temp2 = df.groupBy(col1, col2).agg({"col4":"avg"})
temp3 = df.groupBy(col1, col2).agg({"col5":"min"})
.
.
.
temp9 = df.groupBy(col1, col2).agg({"col9":"sum"})

And then join all these 9 dataframes to get the final output.

Which one would be more efficient?

zero323
  • 322,348
  • 103
  • 959
  • 935
pri
  • 1,521
  • 2
  • 13
  • 26

1 Answers1

5

TL;DR Go with the first one.

It is not even a competition. Readability alone should be enough to reject the second solution, which is verbose and convoluted.

Not to mention, that the execution plan is just a monstrosity (here only 2 tables!):

== Physical Plan ==
*Project [col1#512L, col2#513L, sum(col3)#597L, avg(col4)#614, min(col5)#631L]
+- *SortMergeJoin [col1#512L, col2#513L], [col1#719L, col2#720L], Inner
   :- *Project [col1#512L, col2#513L, sum(col3)#597L, avg(col4)#614]
   :  +- *SortMergeJoin [col1#512L, col2#513L], [col1#704L, col2#705L], Inner
   :     :- *Sort [col1#512L ASC NULLS FIRST, col2#513L ASC NULLS FIRST], false, 0
   :     :  +- *HashAggregate(keys=[col1#512L, col2#513L], functions=[sum(col3#514L)])
   :     :     +- Exchange hashpartitioning(col1#512L, col2#513L, 200)
   :     :        +- *HashAggregate(keys=[col1#512L, col2#513L], functions=[partial_sum(col3#514L)])
   :     :           +- *Project [_1#491L AS col1#512L, _2#492L AS col2#513L, _3#493L AS col3#514L]
   :     :              +- *Filter (isnotnull(_1#491L) && isnotnull(_2#492L))
   :     :                 +- Scan ExistingRDD[_1#491L,_2#492L,_3#493L,_4#494L,_5#495L,_6#496L,_7#497L,_8#498L,_9#499L,_10#500L]
   :     +- *Sort [col1#704L ASC NULLS FIRST, col2#705L ASC NULLS FIRST], false, 0
   :        +- *HashAggregate(keys=[col1#704L, col2#705L], functions=[avg(col4#707L)])
   :           +- Exchange hashpartitioning(col1#704L, col2#705L, 200)
   :              +- *HashAggregate(keys=[col1#704L, col2#705L], functions=[partial_avg(col4#707L)])
   :                 +- *Project [_1#491L AS col1#704L, _2#492L AS col2#705L, _4#494L AS col4#707L]
   :                    +- *Filter (isnotnull(_2#492L) && isnotnull(_1#491L))
   :                       +- Scan ExistingRDD[_1#491L,_2#492L,_3#493L,_4#494L,_5#495L,_6#496L,_7#497L,_8#498L,_9#499L,_10#500L]
   +- *Sort [col1#719L ASC NULLS FIRST, col2#720L ASC NULLS FIRST], false, 0
      +- *HashAggregate(keys=[col1#719L, col2#720L], functions=[min(col5#723L)])
         +- Exchange hashpartitioning(col1#719L, col2#720L, 200)
            +- *HashAggregate(keys=[col1#719L, col2#720L], functions=[partial_min(col5#723L)])
               +- *Project [_1#491L AS col1#719L, _2#492L AS col2#720L, _5#495L AS col5#723L]
                  +- *Filter (isnotnull(_1#491L) && isnotnull(_2#492L))
                     +- Scan ExistingRDD[_1#491L,_2#492L,_3#493L,_4#494L,_5#495L,_6#496L,_7#497L,_8#498L,_9#499L,_10#500L]

compared to plain aggregation (for all columns):

== Physical Plan ==
*HashAggregate(keys=[col1#512L, col2#513L], functions=[max(col7#518L), avg(col8#519L), sum(col3#514L), sum(col6#517L), sum(col9#520L), min(col5#516L), avg(col4#515L)])
+- Exchange hashpartitioning(col1#512L, col2#513L, 200)
   +- *HashAggregate(keys=[col1#512L, col2#513L], functions=[partial_max(col7#518L), partial_avg(col8#519L), partial_sum(col3#514L), partial_sum(col6#517L), partial_sum(col9#520L), partial_min(col5#516L), partial_avg(col4#515L)])
      +- *Project [_1#491L AS col1#512L, _2#492L AS col2#513L, _3#493L AS col3#514L, _4#494L AS col4#515L, _5#495L AS col5#516L, _6#496L AS col6#517L, _7#497L AS col7#518L, _8#498L AS col8#519L, _9#499L AS col9#520L]
         +- Scan ExistingRDD[_1#491L,_2#492L,_3#493L,_4#494L,_5#495L,_6#496L,_7#497L,_8#498L,_9#499L,_10#500L]
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • I've recently started out in PySpark, so I don't know if this is valid or not. Wouldn't the second method(group by and join) make use of parallelism on several nodes, whereas it would be difficult to do so in the first one, considering the fact that we have *sum* as well as *avg* as aggregations? – pri Jan 25 '18 at 13:50
  • @user8371915 what do you think about this https://stackoverflow.com/questions/40888946/spark-dataframe-count-distinct-values-of-every-column/40889920#40889920 ? – eliasah Jan 25 '18 at 14:03
  • @eliasah Looks good, but not a duplicate, if that is what you ask. – Alper t. Turker Jan 25 '18 at 14:14