I have written pyspark code with group by and sum function. I feel performance affected due to group by. Instead I want to use reducebykey. But I am new to this area. Please find my scenario below,
Step 1: read hive table join query data thru sqlcontext and store in dataframe
Step2: total number of input columns are 15. In that 5 are key fields and remaining are numeric values.
Step3: Along with above input columns few more columns need to be derived from numeric columns. Few columns with default values.
Step 4: I have used group by and sum function. How to do similar logics with spark way with map and reducebykey option.
from pyspark.sql.functions import col, when, lit, concat, round, sum
#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])
#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).\
withColumn("col6", col6).\
withColumn("col7", col7)
#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).\
withColumn("col6", col9).\
withColumn("col7", col10)
#final dataframe
final_df = df1.union(df2)
final_df.show()
#groupBy calculation
#final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")).show()from pyspark.sql.functions import col, when, lit, concat, round, sum
#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])
#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).\
withColumn("col6", col6).\
withColumn("col7", col7)
#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).\
withColumn("col6", col9).\
withColumn("col7", col10)
#final dataframe
final_df = df1.union(df2)
final_df.show()
#groupBy calculation
final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")........sum("coln")).show()