8

My dataframe like this

id  value  date    
1   100    2017 
1   null   2016 
1   20     2015 
1   100    2014

I would like to get most recent previous value but ignoring null

id  value  date   recent value
1   100    2017    20
1   null   2016    20
1   20     2015   100
1   100    2014   null

Is there any way to ignore null values while using lead window function?

ZygD
  • 22,092
  • 39
  • 79
  • 102
John
  • 1,531
  • 6
  • 18
  • 30

3 Answers3

10

Is it possible to ignore null values when using lead window function in Spark

It is not.

I would like to get most recent value but ignoring null

Just use last (or first) with ignoreNulls:

def last(columnName: String, ignoreNulls: Boolean): Column

Aggregate function: returns the last value of the column in a group.

The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df = Seq(
  (1, Some(100), 2017), (1, None, 2016), (1, Some(20), 2015), 
  (1, Some(100), 2014)
).toDF("id", "value", "date")

df.withColumn(
  "last_value",
   last("value", true).over(Window.partitionBy("id").orderBy("date"))
).show

+---+-----+----+----------+                                                     
| id|value|date|last_value|
+---+-----+----+----------+
|  1|  100|2014|       100|
|  1|   20|2015|        20|
|  1| null|2016|        20|
|  1|  100|2017|       100|
+---+-----+----+----------+
Community
  • 1
  • 1
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • I like to get the most recent previous value.I updated the post also. Sorry I was not clear before – John Feb 09 '18 at 14:24
  • what if "most recent" is exclusive of the present – MichaelChirico May 02 '18 at 09:51
  • 2
    answering my own q: in this case, you need use (in SQL) `over(partition by id order by date rows between unbounded preceding and 1 preceding)`. not sure how to translate that to Scala – MichaelChirico May 02 '18 at 12:39
  • Following up again: see this question https://stackoverflow.com/questions/36019847/pyspark-forward-fill-with-last-observation-for-a-dataframe which shows how to use `rangeBetween` – MichaelChirico Sep 21 '18 at 07:09
2

Spark 3.2+ provides ignoreNulls inside lead and lag in Scala.

lead(e: Column, offset: Int, defaultValue: Any, ignoreNulls: Boolean): Column

lag(e: Column, offset: Int, defaultValue: Any, ignoreNulls: Boolean): Column


Test input:

import org.apache.spark.sql.expressions.Window

val df = Seq[(Integer, Integer, Integer)](
    (1, 100, 2017),
    (1, null, 2016),
    (1, 20, 2015),
    (1, 100, 2014)
).toDF("id", "value", "date")

lead:

val w = Window.partitionBy("id").orderBy(desc("date"))
val df2 = df.withColumn("lead_val", lead($"value", 1, null, true).over(w))
df2.show()
// +---+-----+----+--------+
// | id|value|date|lead_val|
// +---+-----+----+--------+
// |  1|  100|2017|      20|
// |  1| null|2016|      20|
// |  1|   20|2015|     100|
// |  1|  100|2014|    null|
// +---+-----+----+--------+

lag:

val w = Window.partitionBy("id").orderBy("date")
val df2 = df.withColumn("lead_val", lag($"value", 1, null, true).over(w))
df2.show()
// +---+-----+----+--------+
// | id|value|date|lead_val|
// +---+-----+----+--------+
// |  1|  100|2014|    null|
// |  1|   20|2015|     100|
// |  1| null|2016|      20|
// |  1|  100|2017|      20|
// +---+-----+----+--------+
ZygD
  • 22,092
  • 39
  • 79
  • 102
1

You could do it in two steps:

  1. Create a table with non null values
  2. Join on the original table
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df = Seq(
  (1, Some(100), 2017),
  (1, None, 2016),
  (1, Some(20), 2015),
  (1, Some(100), 2014)
).toDF("id", "value", "date")

// Step 1
val filledDf = df
  .where($"value".isNotNull)
  .withColumnRenamed("value", "recent_value")

// Step 2
val window: WindowSpec = Window.partitionBy("l.id", "l.date").orderBy($"r.date".desc)

val finalDf = df.as("l")
  .join(filledDf.as("r"), $"l.id" === $"r.id" && $"l.date" > $"r.date", "left")
  .withColumn("rn", row_number().over(window))
  .where($"rn" === 1)
  .select("l.id", "l.date", "value", "recent_value")

finalDf.orderBy($"date".desc).show

+---+----+-----+------------+
| id|date|value|recent_value|
+---+----+-----+------------+
|  1|2017|  100|          20|
|  1|2016| null|          20|
|  1|2015|   20|         100|
|  1|2014|  100|        null|
+---+----+-----+------------+
Héctor Lira
  • 79
  • 1
  • 6