0

I've a dataframe with high volume of data and "n" number of columns.

df_avg_calc: org.apache.spark.sql.DataFrame = [col1: double, col2: double ... 4 more fields]
+------------------+-----------------+------------------+-----------------+-----+-----+
|              col1|             col2|              col3|             col4| col5| col6|
+------------------+-----------------+------------------+-----------------+-----+-----+
|              null|             null|              null|             null| null| null|
|              14.0|              5.0|              73.0|             null| null| null|
|              null|             null|             28.25|             null| null| null|
|              null|             null|              null|             null| null| null|
|33.723333333333336|59.78999999999999|39.474999999999994|82.09666666666666|101.0|53.43|
|             26.25|             null|              null|              2.0| null| null|
|              null|             null|              null|             null| null| null|
|             54.46|           89.475|              null|             null| null| null|
|              null|            12.39|              null|             null| null| null|
|              null|             58.0|             19.45|              1.0| 1.33|158.0|
+------------------+-----------------+------------------+-----------------+-----+-----+

I need to perform rowwise average keeping in mind not to consider the cell with "null" for averaging.

This needs to be implemented in Spark / Scala. I've tried to explain the same as in the attached image

rowise average

What I have tried so far :

By referring - Calculate row mean, ignoring NAs in Spark Scala

val df = df_raw.schema.fieldNames.filter(f => f.contains("colname")) 
val rowMeans = df_raw.select(df.map(f => col(f)).reduce(+) / lit(df.length) as "row_mean") 

Where df_raw contains columns which needs to be aggregated (of course rowise). There are more than 80 columns. Arbitrarily they have data and null, count of Null needs to be ignored in the denominator while calculating average. It works fine, when all the column contain data, even a single Null in a column returns Null

Edit:

I've tried to adjust this answer by Terry Dactyl

def average(l: Seq[Double]): Option[Double] = {
  val nonNull = l.flatMap(i => Option(i))
  if(nonNull.isEmpty) None else Some(nonNull.reduce(_ + _).toDouble / nonNull.size.toDouble)
}

val avgUdf = udf(average(_: Seq[Double]))

val rowAvgDF = df_avg_calc.select(avgUdf(array($"col1",$"col2",$"col3",$"col4",$"col5",$"col6")).as("row_avg"))
rowAvgDF.show(10,false)

rowAvgDF: org.apache.spark.sql.DataFrame = [row_avg: double]
+------------------+
|row_avg           |
+------------------+
|0.0               |
|15.333333333333334|
|4.708333333333333 |
|0.0               |
|61.58583333333333 |
|4.708333333333333 |
|0.0               |
|23.989166666666666|
|2.065             |
|39.63             |
+------------------+
zero323
  • 322,348
  • 103
  • 959
  • 935
Sandy
  • 139
  • 2
  • 9

2 Answers2

1

Spark >= 2.4

It is possible to use aggregate:

val row_mean = expr("""aggregate(
  CAST(array(_1, _2, _3) AS array<double>), 
  -- Initial value
  -- Note that aggregate is picky about types
  CAST((0.0 as sum, 0.0 as n) AS struct<sum: double, n: double>), 
  -- Merge function
  (acc, x) -> (
    acc.sum + coalesce(x, 0.0), 
    acc.n + CASE WHEN x IS NULL THEN 0.0 ELSE 1.0 END), 
  -- Finalize function
  acc -> acc.sum / acc.n)""")

Usage:

df.withColumn("row_mean", row_mean).show

Result:

+----+----+----+--------+
|  _1|  _2|  _3|row_mean|
+----+----+----+--------+
|null|null|null|    null|
| 2.0|null|null|     2.0|
|50.0|34.0|null|    42.0|
| 1.0| 2.0| 3.0|     2.0|
+----+----+----+--------+

Version independent

Compute sum and count of NOT NULL columns and divide one over another:

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

def row_mean(cols: Column*) = {
  // Sum of values ignoring nulls
  val sum = cols
    .map(c => coalesce(c, lit(0)))
    .foldLeft(lit(0))(_ + _)
  // Count of not null values
  val cnt = cols
    .map(c => when(c.isNull, 0).otherwise(1))
    .foldLeft(lit(0))(_ + _)
  sum / cnt
}

Example data:

val df = Seq(
  (None, None, None), 
  (Some(2.0), None, None),
  (Some(50.0), Some(34.0), None),
  (Some(1.0), Some(2.0), Some(3.0))
).toDF

Result:

df.withColumn("row_mean", row_mean($"_1", $"_2", $"_3")).show
+----+----+----+--------+
|  _1|  _2|  _3|row_mean|
+----+----+----+--------+
|null|null|null|    null|
| 2.0|null|null|     2.0|
|50.0|34.0|null|    42.0|
| 1.0| 2.0| 3.0|     2.0|
+----+----+----+--------+
zero323
  • 322,348
  • 103
  • 959
  • 935
-1
def average(l: Seq[Integer]): Option[Double] = {
  val nonNull = l.flatMap(i => Option(i))
  if(nonNull.isEmpty) None else Some(nonNull.reduce(_ + _).toDouble / nonNull.size.toDouble)
}

val avgUdf = udf(average(_: Seq[Integer]))

val df = List((Some(1),Some(2)), (Some(1), None), (None, None)).toDF("a", "b")

val avgDf = df.select(avgUdf(array(df.schema.map(c => col(c.name)): _*)).as("average"))

avgDf.collect

res0: Array[org.apache.spark.sql.Row] = Array([1.5], [1.0], [null])

Testing on the data you supplied gives the correct result:

val df = List(
  (Some(10),Some(5), Some(5), None, None),
  (None, Some(5), Some(5), None, Some(5)),
  (Some(2), Some(8), Some(5), Some(1), Some(2)), 
  (None, None, None, None, None)
).toDF("col1", "col2", "col3", "col4", "col5")

Array[org.apache.spark.sql.Row] = Array([6.666666666666667], [5.0], [3.6], [null])

Note if you have columns you do not want included make sure they are filtered when populating the array passed to the UDF.

Finally:

val df = List(
  (Some(14), Some(5), Some(73), None.asInstanceOf[Option[Integer]], None.asInstanceOf[Option[Integer]], None.asInstanceOf[Option[Integer]])
).toDF("col1", "col2", "col3", "col4", "col5", "col6")

Array[org.apache.spark.sql.Row] = Array([30.666666666666668])

Which again is the correct result.

If you want to use Doubles...

def average(l: Seq[java.lang.Double]): Option[java.lang.Double] = {
  val nonNull = l.flatMap(i => Option(i))
  if(nonNull.isEmpty) None else Some(nonNull.reduce(_ + _) / nonNull.size.toDouble)
}

val avgUdf = udf(average(_: Seq[java.lang.Double]))

val df = List(
  (Some(14.0), Some(5.0), Some(73.0), None.asInstanceOf[Option[java.lang.Double]], None.asInstanceOf[Option[java.lang.Double]], None.asInstanceOf[Option[java.lang.Double]])
).toDF("col1", "col2", "col3", "col4", "col5", "col6")

val avgDf = df.select(avgUdf(array(df.schema.map(c => col(c.name)): _*)).as("average"))

avgDf.collect

Array[org.apache.spark.sql.Row] = Array([30.666666666666668]) 
Terry Dactyl
  • 1,839
  • 12
  • 21
  • Thanks, it is not returning "Null" but average is not coming right. It is considering even "Null" values in total number of column values – Sandy Nov 07 '18 at 16:08
  • Not in the example above it isn't. Can you post more details about your schema? – Terry Dactyl Nov 07 '18 at 16:09
  • I've attached the image of the dataframe and the out come. For example 2nd data row should be 14+5+73/3 = 30.66 it is giving 15.33 (14+5+73/6, infact, it is considering 3+3 in denominator, it should be only 3, as 3 are Null counts) – Sandy Nov 07 '18 at 16:49
  • Check your data. As you can see above my code returns the correct result. In your example you are providing floating point numbers which is different to your original example which provided Integers. – Terry Dactyl Nov 07 '18 at 17:01
  • Its all "Double"..........df_avg_calc.printSchema root |-- col1: double (nullable = true) |-- col2: double (nullable = true) |-- col3: double (nullable = true) |-- col4: double (nullable = true) |-- col5: double (nullable = true) |-- col6: double (nullable = true) – Sandy Nov 07 '18 at 17:14