2

How can I create a UDF to programatically replace null values in a spark dataframe in each column with the column mean value. for instance in the example data col1 null value will have a value of ((2+4+6+8+5)/5) = 5.

Example data:

col1    col2    col3
2       null    3
4       3       3
6       5       null
8       null    2
null    6       4
5       2       8

Desired Data:

col1    col2    col3
2       4       3
4       3       3
6       5       4
8       4       2
5       6       4
5       2       8
Nazilla
  • 581
  • 1
  • 7
  • 17
  • in Pure SQl this could be accomplished by cross joining a table for each column and using coalesce(col1, crossJoinTBL.Col1Avg) but that's not really a UDF. if you were to pass in table column and use dynamic SQL to calculate the avg and use coalesce again that may work... – xQbert Jul 13 '16 at 16:00

1 Answers1

4

Generally speaking there is no need for UDF here. All you really is aggregated table:

val df = Seq(
  (Some(2), None, Some(3)), (Some(4), Some(3), Some(3)),
  (Some(6), Some(5), None), (Some(8), None, Some(2)),
  (None, Some(6), Some(4)), (Some(5), Some(2), Some(8))
).toDF("col1", "col2", "col3").alias("df")

val means = df.agg(df.columns.map(c => (c -> "avg")).toMap)

And broadcasted Cartesian with coalesce:

val exprs = df.columns.map(c => coalesce(col(c), col(s"avg($c)")).alias(c))

df.join(broadcast(means)).select(exprs: _*)
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Excellent. That works perfectly. Thank you very much. had to add the following libraries. import sqlctx.implicits._ import org.apache.spark.sql.functions.{coalesce, lit, broadcast} – Nazilla Jul 14 '16 at 09:42
  • 1
    zero323 your Scala Spark skills are insane... A little more elaboration what how your super beautiful and compact code is actually working would be appreciated though. – Boern Nov 23 '16 at 13:37
  • Also, this line `df.join(broadcast(means)).select(exprs: _*)` does take very long compared to all other statements in my code. Is there maybe a better way to do this? Thanks in advance. – Boern Nov 29 '16 at 10:44
  • In Spark versions 2.0+, replace ``df.join` by `df.crossJoin` to avoid a `org.apache.spark.sql.AnalysisException` – Boern Feb 22 '17 at 11:22