1

I am using spark-sql-2.4.1v with java8.

I have following scenario

val df = Seq(
  ("0.9192019",  "0.1992019",  "0.9955999"),
  ("0.9292018",  "0.2992019",  "0.99662018"),
  ("0.9392017",  "0.3992019",  "0.99772000")).toDF("item1_value","item2_value","item3_value")
.withColumn("item1_value", $"item1_value".cast(DoubleType))
.withColumn("item2_value", $"item2_value".cast(DoubleType))
.withColumn("item3_value", $"item3_value".cast(DoubleType))

df.show(20)

I need an expected output something like this

-----------------------------------------------------------------------------------
col_name      |  sum_of_column     | avg_of_column   | vari_of_column 
-----------------------------------------------------------------------------------
"item1_value" | sum("item1_value") | avg("item1_value") | variance("item1_value")
"item2_value" | sum("item2_value") | avg("item2_value") | variance("item2_value")
"item3_value" | sum("item3_value") | avg("item3_value") | variance("item3_value")
----------------------------------------------------------------------------------

how to achieve this dynamically .. tomorrow i may have

BdEngineer
  • 2,929
  • 4
  • 49
  • 85

1 Answers1

2

This is sample code that can achieve this. You can make column list dynamic and add more functions if needed.

import org.apache.spark.sql.types._
import org.apache.spark.sql.Column

val df = Seq(
  ("0.9192019",  "0.1992019",  "0.9955999"),
  ("0.9292018",  "0.2992019",  "0.99662018"),
  ("0.9392017",  "0.3992019",  "0.99772000")).
  toDF("item1_value","item2_value","item3_value").
  withColumn("item1_value", $"item1_value".cast(DoubleType)).
  withColumn("item2_value", $"item2_value".cast(DoubleType)).
  withColumn("item3_value", $"item3_value".cast(DoubleType))

val aggregateColumns = Seq("item1_value","item2_value","item3_value")

var aggDFs = aggregateColumns.map( c => {
    df.groupBy().agg(lit(c).as("col_name"),sum(c).as("sum_of_column"), avg(c).as("avg_of_column"), variance(c).as("var_of_column"))
})

var combinedDF = aggDFs.reduce(_ union _)

This returns following output:

scala> df.show(10,false)
+-----------+-----------+-----------+
|item1_value|item2_value|item3_value|
+-----------+-----------+-----------+
|0.9192019  |0.1992019  |0.9955999  |
|0.9292018  |0.2992019  |0.99662018 |
|0.9392017  |0.3992019  |0.99772    |
+-----------+-----------+-----------+


scala> combinedDF.show(10,false)
+-----------+------------------+------------------+---------------------+
|col_name   |sum_of_column     |avg_of_column     |var_of_column        |
+-----------+------------------+------------------+---------------------+
|item1_value|2.7876054         |0.9292018         |9.999800000999957E-5 |
|item2_value|0.8976057000000001|0.2992019         |0.010000000000000002 |
|item3_value|2.9899400800000002|0.9966466933333334|1.1242332201333484E-6|
+-----------+------------------+------------------+---------------------+
Ramdev Sharma
  • 974
  • 1
  • 12
  • 17
  • What was the dynamic all about then? – thebluephantom Jan 16 '20 at 18:37
  • 1
    @BdEngineer There is nothing specifc here for 'var'. aggDFs.reduce(_ union _) is doing union of all Data frames per column aggregation. – Ramdev Sharma Jan 16 '20 at 19:24
  • 1
    @BdEngineer I couldn't check code in java but you make simple union. Spark will optimize it while running it. Regarding each column in Seq, it is only transformation stage. When action will called then only all transformation happens, spark will run them in parallel for each column. You can validate same in Spark UI - SQL tab – Ramdev Sharma Jan 21 '20 at 18:46
  • @Ramdev Sharma can you tell me what is wrong with this broadcast variable accessing ? https://stackoverflow.com/questions/64003697/spark-broadcast-variable-map-giving-null-value – BdEngineer Sep 22 '20 at 05:50