Spark 3.2+ provides ignoreNulls
inside lead
and lag
in Scala.
lead(e: Column, offset: Int, defaultValue: Any, ignoreNulls: Boolean): Column
lag(e: Column, offset: Int, defaultValue: Any, ignoreNulls: Boolean): Column
Test input:
import org.apache.spark.sql.expressions.Window
val df = Seq[(Integer, Integer, Integer)](
(1, 100, 2017),
(1, null, 2016),
(1, 20, 2015),
(1, 100, 2014)
).toDF("id", "value", "date")
lead
:
val w = Window.partitionBy("id").orderBy(desc("date"))
val df2 = df.withColumn("lead_val", lead($"value", 1, null, true).over(w))
df2.show()
// +---+-----+----+--------+
// | id|value|date|lead_val|
// +---+-----+----+--------+
// | 1| 100|2017| 20|
// | 1| null|2016| 20|
// | 1| 20|2015| 100|
// | 1| 100|2014| null|
// +---+-----+----+--------+
lag
:
val w = Window.partitionBy("id").orderBy("date")
val df2 = df.withColumn("lead_val", lag($"value", 1, null, true).over(w))
df2.show()
// +---+-----+----+--------+
// | id|value|date|lead_val|
// +---+-----+----+--------+
// | 1| 100|2014| null|
// | 1| 20|2015| 100|
// | 1| null|2016| 20|
// | 1| 100|2017| 20|
// +---+-----+----+--------+