1

I have written pyspark code with group by and sum function. I feel performance affected due to group by. Instead I want to use reducebykey. But I am new to this area. Please find my scenario below,

Step 1: read hive table join query data thru sqlcontext and store in dataframe

Step2: total number of input columns are 15. In that 5 are key fields and remaining are numeric values.

Step3: Along with above input columns few more columns need to be derived from numeric columns. Few columns with default values.

Step 4: I have used group by and sum function. How to do similar logics with spark way with map and reducebykey option.

from pyspark.sql.functions import col, when, lit, concat, round, sum

#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])

#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).\
    withColumn("col6", col6).\
    withColumn("col7", col7)

#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).\
    withColumn("col6", col9).\
    withColumn("col7", col10)

#final dataframe
final_df = df1.union(df2)
final_df.show()

#groupBy calculation
#final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")).show()from pyspark.sql.functions import col, when, lit, concat, round, sum

#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])

#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).\
    withColumn("col6", col6).\
    withColumn("col7", col7)

#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).\
    withColumn("col6", col9).\
    withColumn("col7", col10)

#final dataframe
final_df = df1.union(df2)
final_df.show()

#groupBy calculation
final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")........sum("coln")).show()
user3150024
  • 139
  • 3
  • 14

1 Answers1

2

There's no reduceByKey in Spark SQL.

groupBy + aggregation function will work almost identical as RDD.reduceByKey. Spark will automatically choose if it should be similar to RDD.groupByKey (i.e. for collect_list) or to RDD.reduceByKey

The performance of Dataset.groupBy + aggregation function should be better or equal to RDD.reduceByKey. Catalyst optimizer takes care how to do aggregation in the background

T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • As far as I remember, it will only add additional step of final aggregation on executor instead of driver in Spark SQL groupBy + aggregation. – T. Gawęda Sep 20 '17 at 20:23
  • Thank you for your reply. Can't we apply reduceByKey over dataframe? Many articles telling same thing as reduceByKey is faster than group by for large dataset since group by reducing the number of rows at the final stage. – user3150024 Sep 20 '17 at 21:00
  • @user3150024 Those articles are about RDDs - Datasets has an abstraction layer and Catalyst optimizer optimizes queries :) – T. Gawęda Sep 20 '17 at 21:10
  • Is there any other way to improve performance ? Tried to increase the number of executors but it is not reflecting and only two core running out of 8 vcores. Shall I convert that dataframe into RDD and apply reduceByKey. Will that work ? – user3150024 Sep 21 '17 at 03:17
  • @user3150024 dataframe's `groupBy` and `agg` should be at least as fast as `reduceByKey`. It sounds as you have some other problem related to the cluster settings. – Shaido Sep 21 '17 at 04:20
  • My script is running for 5 mins in PySpark- shell but same script running 15 mins in Spark-submit in CLI. Tried in both YARN client and cluster mode. Still wondering why this much difference and doesn't have any clue on that. Same posted in https://stackoverflow.com/questions/46307302/how-pyspark-shell-is-faster-than-cli – user3150024 Sep 21 '17 at 06:08
  • @Shaido Thanks. @user3150024 Try setting `spark.sql.shuffle.partitions` to lower number, by default it's 200 – T. Gawęda Sep 21 '17 at 10:39