In my code table_df
has some columns on which I am doing some calculations like min, max, mean etc. and I want to create new_df with specified schema new_df_schema. In my logic, I have written spark-sql for calculations and appending each new generated row to initially empty new_df and at the end, it results in new_df
with all calculated values for all columns.
But the problem is when the columns are more in number it leads to performance issue. Can this be done without using union() function or any other approach to increase performance?
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import sparkSession.sqlContext.implicits._
val table_df = Seq(
(10, 20, 30, 40, 50),
(100, 200, 300, 400, 500),
(111, 222, 333, 444, 555),
(1123, 2123, 3123, 4123, 5123),
(1321, 2321, 3321, 4321, 5321)
).toDF("col_1", "col_2", "col_3", "col_4", "col_5")
table_df.show(false)
table_df.createOrReplaceTempView("table_df")
val new_df_schema = StructType(
StructField("Column_Name", StringType, false) ::
StructField("number_of_values", LongType, false) ::
StructField("number_of_distinct_values", LongType, false) ::
StructField("distinct_count_with_nan", LongType, false) ::
StructField("distinct_count_without_nan", LongType, false) ::
StructField("is_unique", BooleanType, false) ::
StructField("number_of_missing_values", LongType, false) ::
StructField("percentage_of_missing_values", DoubleType, false) ::
StructField("percentage_of_unique_values", DoubleType, false) ::
StructField("05_PCT", DoubleType, false) ::
StructField("25_PCT", DoubleType, false) ::
StructField("50_PCT", DoubleType, false) ::
StructField("75_PCT", DoubleType, false) ::
StructField("95_PCT", DoubleType, false) ::
StructField("max", DoubleType, false) ::
StructField("min", DoubleType, false) ::
StructField("mean", DoubleType, false) ::
StructField("std", DoubleType, false) ::
StructField("skewness", DoubleType, false) ::
StructField("kurtosis", DoubleType, false) ::
StructField("range", DoubleType, false) ::
StructField("variance", DoubleType, false) :: Nil
)
var new_df = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], new_df_schema)
for (c <- table_df.columns) {
val num = sparkSession.sql(
s"""SELECT
| '$c' AS Column_Name,
| COUNT(${c}) AS number_of_values,
| COUNT(DISTINCT ${c}) AS number_of_distinct_values,
| COUNT(DISTINCT ${c}) AS distinct_count_with_nan,
| (COUNT(DISTINCT ${c}) - 1) AS distinct_count_without_nan,
| (COUNT(${c}) == COUNT(DISTINCT ${c})) AS is_unique,
| (COUNT(*) - COUNT(${c})) AS number_of_missing_values,
| ((COUNT(*) - COUNT(${c}))/COUNT(*)) AS percentage_of_missing_values,
| (COUNT(DISTINCT ${c})/COUNT(*)) AS percentage_of_unique_values,
| APPROX_PERCENTILE($c,0.05) AS 05_PCT,
| APPROX_PERCENTILE($c,0.25) AS 25_PCT,
| APPROX_PERCENTILE($c,0.50) AS 50_PCT,
| APPROX_PERCENTILE($c,0.75) AS 75_PCT,
| APPROX_PERCENTILE($c,0.95) AS 95_PCT,
| MAX($c) AS max,
| MIN($c) AS min,
| MEAN($c) AS mean,
| STD($c) AS std,
| SKEWNESS($c) AS skewness,
| KURTOSIS($c) AS kurtosis,
| (MAX($c) - MIN($c)) AS range,
| VARIANCE($c) AS variance
| FROM
| table_df""".stripMargin)
.toDF()
new_df = new_df.union(num) // this results performance issue when then number of columns in table_df is more
}
new_df.show(false)
==================================================
table_df:
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10 |20 |30 |40 |50 |
|100 |200 |300 |400 |500 |
|111 |222 |333 |444 |555 |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+
new_df:
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
|Column_Name|number_of_values|number_of_distinct_values|distinct_count_with_nan|distinct_count_without_nan|is_unique|number_of_missing_values|percentage_of_missing_values|percentage_of_unique_values|05_PCT|25_PCT|50_PCT|75_PCT|95_PCT|max |min |mean |std |skewness |kurtosis |range |variance |
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
|col_1 |5 |5 |5 |4 |true |0 |0.0 |1.0 |10.0 |100.0 |111.0 |1123.0|1321.0|1321.0|10.0|533.0 |634.0634826261484 |0.4334269738367067 |-1.7463346405299973|1311.0|402036.5 |
|col_2 |5 |5 |5 |4 |true |0 |0.0 |1.0 |20.0 |200.0 |222.0 |2123.0|2321.0|2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738682 |-1.799741951675132 |2301.0|1302313.7 |
|col_3 |5 |5 |5 |4 |true |0 |0.0 |1.0 |30.0 |300.0 |333.0 |3123.0|3321.0|3321.0|30.0|1421.4|1649.399072389699 |0.3979251063785061 |-1.8119558312496054|3291.0|2720517.3 |
|col_4 |5 |5 |5 |4 |true |0 |0.0 |1.0 |40.0 |400.0 |444.0 |4123.0|4321.0|4321.0|40.0|1865.6|2157.926620624529 |0.39502047381456235|-1.8165124206347685|4281.0|4656647.3 |
|col_5 |5 |5 |5 |4 |true |0 |0.0 |1.0 |50.0 |500.0 |555.0 |5123.0|5321.0|5321.0|50.0|2309.8|2666.59027598917 |0.3935246673563026 |-1.8186685628112493|5271.0|7110703.699999999|
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+