4

I am using the spark 2.0.1 and want to fill nan values with the last good known value in the column.

The only reference for spark I could find Spark / Scala: forward fill with last observation or Fill in null with previously known good value with pyspark which seem to use RDD.

I would rather like to stay in the data frame / dataset world and possible handle multiple nan values. Is this possible?

My assumption is that the data (initially loaded from e.g. a CSV file is ordered by time and this order is preserved in the distributed setting e.g. filling by close / last good known value is correct. Maybe filling with the previous value is enough as for most records there are no 2 or more nan records in a row. Does this actually hold? The point is that a

myDf.sort("foo").show

Would destroy any order e.g. all null values will come first.

A small example:

import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
         .toDF("foo","bar")
         .withColumn("foo", 'foo.cast("Date"))
         .as[FooBar]

Results in

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

I would like to fix the value with the last good known value. How can I achieve this?

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|2016-01-02|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

edit

in my case, it would be good enough to fill the value from the row above, as there are only very limited faulty values.

edit2

I try to add an index column

val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
    .toDF("foo", "bar")
    .withColumn("foo", 'foo.cast("Date"))
    .as[FooBar]
    .withColumn("rowId", monotonically_increasing_id())

And then fill with the last value.

myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show

But that reads the following warning: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. How could I introduce meaningful partitions?

+----------+--------------------+-----+----------+
|       foo|                 bar|rowId|    fooLag|
+----------+--------------------+-----+----------+
|2016-01-01|               first|    0|      null|
|2016-01-02|              second|    1|2016-01-01|
|      null|       noValidFormat|    2|2016-01-02|
|2016-01-04|lastAssumingSameDate|    3|      null|
+----------+--------------------+-----+----------+
Community
  • 1
  • 1
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
  • That is nicer but only compiles if set to `df.filter('foo.isNull).count`. Do you have any thoughts regarding the second part? – Georg Heiler Nov 14 '16 at 15:50
  • But that would us a fixed value / column. I would prefer to use the latest good / non-NaN value per column to fix the missing numbers. – Georg Heiler Nov 14 '16 at 15:55
  • I was not sure what is exact criteria to replace `NaN`/`null`. so pls check this link [pyspark - fillna()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.fillna) – mrsrinivas Nov 14 '16 at 15:59
  • @MRSrinivas sorry for not understanding your point here. But if I get the documentation df.na.fill() either takes a `singleFixedScalarValue` or a dict with a mapping from colname -> `singleFixedScalarValue` and fills the NaN values with the respective `singleFixedScalarValue`. And I would rather not want to specifiy such a value beforehand. Plus there will be multiple NaN values in this column and I would want to fill with next / last good value. – Georg Heiler Nov 14 '16 at 16:02
  • We need `UDAF` to do that. – mrsrinivas Nov 14 '16 at 16:10
  • You mean a UDF? Why aggregate? I am thinking about using a window function. However, this would have the downside of a fixed window. – Georg Heiler Nov 14 '16 at 16:52
  • in UDAF we will get the control to read previous or next records. So we can edit the data and return it. instead of aggregating it. – mrsrinivas Nov 14 '16 at 16:58
  • @MRSrinivas please see my last edit. I found some resources on SF dealing with this problem but either could not apply their solution / it dit not work well in the dataframe / dataset world. – Georg Heiler Nov 15 '16 at 09:23

2 Answers2

2

//filling null fields with last non known null I tried and this actually worked !!

val dftxt1 = spark.read.option("header","true").option("sep","\t").csv("/sdata/ph/com/r/ph_com_r_ita_javelin/inbound/abc.txt").toDF("line_name", "merge_key", "line_id")
dftxt2.select("line_name","merge_key","line_id").write.mode("overwrite").insertInto("dbname.tablename")

val df = spark.sql("select * from dbname.tablename")

val Df1 = df.withColumn("rowId", monotonically_increasing_id())

import org.apache.spark.sql.expressions.Window

val partitionWindow = Window.orderBy("rowId")

val Df2 = Df1.withColumn("line_id", last("line_id", true) over (partitionWindow))

Df2.show
vimuth
  • 5,064
  • 33
  • 79
  • 116
Sanskar Suman
  • 63
  • 1
  • 12
1

This is an intermediate answer. However, it is not great as no partitions / only a single partition is used. I am still looking for a better way to solve the problem

df
    .withColumn("rowId", monotonically_increasing_id())
    .withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId))
    .withColumn("columnWithNullReplaced",
      when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull")

    )

edit

I am working on building a better solution using mapPartitionsWithIndex https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 is not complete yet.

edit2

adding

if (i == 0) {
          lastNotNullRow = toCarryBd.value.get(i + 1).get
        } else {
          lastNotNullRow = toCarryBd.value.get(i - 1).get
        }

will lead to the desired result.

Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
  • Me too - There must be a simpler / cleaner way. – codeaperature Dec 15 '16 at 02:06
  • Although this is a nice solution, it only solves the problem for one None value after the last seen value. If you have two consecutive values, it wouldn't work. Hence I search for a different approach. What I ended up with was what is described here: https://johnpaton.net/posts/forward-fill-spark – Jorrick Sleijster Feb 18 '20 at 13:45