1

Is there a way to replace null values in spark data frame with next row not null value. There is additional row_count column added for windows partitioning and ordering. More specifically, I'd like to achieve the following result:

      +---------+-----------+      +---------+--------+
      | row_count |       id|      |row_count |     id|
      +---------+-----------+      +------+-----------+
      |        1|       null|      |     1|        109|
      |        2|        109|      |     2|        109|
      |        3|       null|      |     3|        108|
      |        4|       null|      |     4|        108|
      |        5|        108| =>   |     5|        108|
      |        6|       null|      |     6|        110|
      |        7|        110|      |     7|        110|
      |        8|       null|      |     8|       null|
      |        9|       null|      |     9|       null|
      |       10|       null|      |    10|       null|
      +---------+-----------+      +---------+--------+

I tried with below code, It is not giving proper result.

      val ss = dataframe.select($"*", sum(when(dataframe("id").isNull||dataframe("id") === "", 1).otherwise(0)).over(Window.orderBy($"row_count")) as "value")
      val window1=Window.partitionBy($"value").orderBy("id").rowsBetween(0, Long.MaxValue)
      val selectList=ss.withColumn("id_fill_from_below",last("id").over(window1)).drop($"row_count").drop($"value")
xyz_scala
  • 463
  • 1
  • 4
  • 21
  • 1
    Possible duplicate of [Spark / Scala: forward fill with last observation](https://stackoverflow.com/questions/33621319/spark-scala-forward-fill-with-last-observation) – user10938362 May 21 '19 at 11:30
  • @ user10938362 this is a different one. In this fill null with First value. means in this used first_value feature of the window function. In this solution fill data top to down but we need to fill data down to top. – xyz_scala May 21 '19 at 14:43
  • you can change the order of your data to row_count desc then apply the solution from https://stackoverflow.com/questions/33621319/spark-scala-forward-fill-with-last-observation – abiratsis May 22 '19 at 15:21
  • I have added a solution using only window functions in another SO question : https://stackoverflow.com/a/58876725/2166220 – Brown nightingale Nov 15 '19 at 12:08
  • https://johnpaton.net/posts/forward-fill-spark/ This post helped me. This was the best solution I could find. – Jorrick Sleijster Feb 18 '20 at 13:44

1 Answers1

0

Here is a approach

  1. Filter the non nulls (dfNonNulls)
  2. Filter the nulls (dfNulls)
  3. Find the right value for null id, using join and Window function
  4. Fill the null dataframe (dfNullFills)
  5. union dfNonNulls and dfNullFills

data.csv

row_count,id
1,
2,109
3,
4,
5,108
6,
7,110
8,
9,
10,
var df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("data.csv")

var dfNulls = df.filter(
  $"id".isNull
).withColumnRenamed(
  "row_count","row_count_nulls"
).withColumnRenamed(
  "id","id_nulls"
)

val dfNonNulls = df.filter(
  $"id".isNotNull
).withColumnRenamed(
  "row_count","row_count_values"
).withColumnRenamed(
  "id","id_values"
)

dfNulls = dfNulls.join(
  dfNonNulls, $"row_count_nulls" lt $"row_count_values","left"
).select(
  $"id_nulls",$"id_values",$"row_count_nulls",$"row_count_values"
)

val window = Window.partitionBy("row_count_nulls").orderBy("row_count_values")

val dfNullFills = dfNulls.withColumn(
  "rn", row_number.over(window)
).where($"rn" === 1).drop("rn").select(
  $"row_count_nulls".alias("row_count"),$"id_values".alias("id"))

dfNullFills .union(dfNonNulls).orderBy($"row_count").show()

which results in

+---------+----+
|row_count|  id|
+---------+----+
|        1| 109|
|        2| 109|
|        3| 108|
|        4| 108|
|        5| 108|
|        6| 110|
|        7| 110|
|        8|null|
|        9|null|
|       10|null|
+---------+----+
Ranga Vure
  • 1,922
  • 3
  • 16
  • 23