As blackbishop said, you can't use lag function to retrieve changing value of a column. As you're using the scala API, you can develop your own User-Defined Aggregate Function
You create the following case classes, representing the row you're currently reading and your aggregator's buffer:
case class InputRow(A: Integer, B: Integer, C: Integer, D: Integer)
case class Buffer(var E: Integer, var A: Integer)
Then you use them to define your RecursiveAggregator
custom aggregator:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
object RecursiveAggregator extends Aggregator[InputRow, Buffer, Buffer] {
override def zero: Buffer = Buffer(null, null)
override def reduce(buffer: Buffer, currentRow: InputRow): Buffer = {
buffer.A = if (buffer.E == null) currentRow.A else buffer.E
buffer.E = buffer.A - (math.max(currentRow.B, currentRow.C) + currentRow.D)
buffer
}
override def merge(b1: Buffer, b2: Buffer): Buffer = {
throw new NotImplementedError("should be used only over ordered window")
}
override def finish(reduction: Buffer): Buffer = reduction
override def bufferEncoder: Encoder[Buffer] = ExpressionEncoder[Buffer]
override def outputEncoder: Encoder[Buffer] = ExpressionEncoder[Buffer]
}
Finally you transform your RecursiveAggregator
to an User-Defined aggregate function that you apply on your input
dataframe:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, udaf}
val recursiveAggregator = udaf(RecursiveAggregator)
val window = Window.orderBy("Id")
val result = input
.withColumn("computed", recursiveAggregator(col("A"), col("B"), col("C"), col("D")).over(window))
.select("Id", "computed.A", "B", "C", "D", "computed.E")
If you take your question's dataframe as input
dataframe, you get the following result
dataframe:
+---+---+---+---+---+---+
|Id |A |B |C |D |E |
+---+---+---+---+---+---+
|1 |100|10 |20 |5 |75 |
|2 |75 |5 |10 |5 |60 |
|3 |60 |7 |2 |3 |50 |
|4 |50 |1 |3 |7 |40 |
+---+---+---+---+---+---+