0

I want to fill nan values in spark using the last good known observation - see: Spark / Scala: fill nan with last good observation

My current solution used window functions in order to accomplish the task. But this is not great, as all values are mapped into a single partition. val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) } should work a lot better. But strangely my fill function is not executed. What is wrong with my code?

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

Here is the full example code:

import java.sql.Date

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

case class FooBar(foo: Date, bar: String)

object WindowFunctionExample extends App {

  Logger.getLogger("org").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
    .setAppName("foo")
    .setMaster("local[*]")

  val spark: SparkSession = SparkSession
    .builder()
    .config(conf)
    .enableHiveSupport()
    .getOrCreate()

  import spark.implicits._

  val myDff = Seq(("2016-01-01", "first"), ("2016-01-02", "second"),
    ("2016-wrongFormat", "noValidFormat"),
    ("2016-01-04", "lastAssumingSameDate"))
  val recordsDF = myDff
    .toDF("foo", "bar")
    .withColumn("foo", 'foo.cast("Date"))
    .as[FooBar]
  recordsDF.show

  def notMissing(row: FooBar): Boolean = {
    row.foo != null
  }

  val toCarry = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }.collectAsMap
  println("###################### carry ")
  println(toCarry)
  println(toCarry.foreach(println))
  println("###################### carry ")
  val toCarryBd = spark.sparkContext.broadcast(toCarry)

  def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = {
    var lastNotNullRow: FooBar = toCarryBd.value(i).get
    iter.map(row => {
      if (!notMissing(row))1
        FooBar(lastNotNullRow.foo, row.bar)
      else {
        lastNotNullRow = row
        row
      }
    })
  }

  // The algorithm does not step into the for loop for filling the null values. Strange
  val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
  val imputedDF = imputed.toDS()

  println(imputedDF.orderBy($"foo").collect.toList)
  imputedDF.show
  spark.stop
}

edit

I fixed the code as outlined by the comment. But the toCarryBd contains None values. How can this happen as I did filter explicitly for

def notMissing(row: FooBar): Boolean = {row.foo != null}
iter.filter(notMissing(_)).toSeq.lastOption

non None values.

(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)

This leads to NoSuchElementException: None.getwhen trying to access toCarryBd.

Community
  • 1
  • 1
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292

1 Answers1

2

Firstly, if your foo field can be null, I would recommend creating the case class as:

case class FooBar(foo: Option[Date], bar: String)

Then, you can rewrite your notMissing function to something like:

def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined
Daniel de Paula
  • 17,362
  • 9
  • 71
  • 72
  • Can you explain why the map yields a lot of `None` entries? – Georg Heiler Dec 18 '16 at 15:31
  • @GeorgHeiler `iter.filter(notMissing(_)).toSeq.lastOption` will return none if the Seq is empty. – Daniel de Paula Dec 19 '16 at 13:12
  • Thanks. And why is it executed 8 times when the original df only contains 4 rows? – Georg Heiler Dec 19 '16 at 13:14
  • @GeorgHeiler It's executed once per partition. You probably are running your code with 8 total executor cores. – Daniel de Paula Dec 19 '16 at 13:15
  • I built an example here https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 still none values are put into the map for the "empty" partitions. Do you have any idea how to fix that? FooBar(lastNotNullRow.getOrElse(FooBar(Option(Date.valueOf("‌​2016-01-01")), "DUMMY")).foo, foo.bar) Will just use the default case. (edit: updated URL) – Georg Heiler Dec 21 '16 at 13:53