I'm attempting to create a new column in my Spark Dataframe that is based on:
a previous value of this column (i.e. the new value in the column is based on the values above it, which in turn are based on...)
a very complex conditional statement (24 different conditions) depending on the values of other columns (and the lagged value of variable itself)
For example, something like the logic in this loop:
for row, i in df:
if row.col1 == "a":
row.col4 = row.col1 + row.col3
row.col5 = 11
if row.col1 == "b":
if row.col3 == 1:
row.col4 = lag(row.col4) + row.col1 + row.col2
row.col5 = 14
if row.col3 == 0:
row.col4 = lag(row.col4) + row.col1 + row.col3)
row.col5 = 17
if row.col1 == "d":
if row.col3 == 1:
row.col4 = 99
row.col5 = 19
if lag(row.col4) == 99:
row.col4 = lag(row.col4) + row.col5
row.col5 = etc...
(...plus another 21 possible values of c
and d
)
Example
I want to convert this:
w = Window.orderBy(col("col1").asc())
df = spark.createDataFrame([
("a", 2, 0),
("b", 3, 1),
("b", 4, 0),
("d", 5, 1),
("e", 6, 0),
("f", 7, 1)
], ["col1", "col2","col3"])
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| 2| 0|
| b| 3| 1|
| b| 4| 0|
| d| 5| 1|
| e| 6| 0|
| f| 7| 1|
+----+----+----+
...into this:
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
|col1|col2|col3|col4 >(explanation) |col5 >(also uses complex logic) |
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
| a| 2| 0|a0 >(because (col1==a) ==> col1+col3) |11 > |
| b| 3| 1|a0b3 >(because (col1==b & col3==1) ==> lag(col4)+col1+col2)|14 > |
| b| 4| 0|a0b3b0 >(because (col1==b & col3==0) ==> lag(col4)+col1+col3)|17 > |
| d| 5| 1|99 >(because (col1==d) ==> 99) |19 > |
| e| 6| 0|9919 >(because (lag(col4)==99) ==> lag(col4)+col5 |e6 > |
| f| 7| 1|etc... >etc... |etc..>etc... |
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
Is this at all possible in Spark? Nothing I've tried has worked:
- I haven't found a way to feed the output of a UDF back into the next UDF calculation
- The conditional + self-reference makes storing previous values in temporary columns basically impossible.
- I tried using gigantic
when
clauses but I get tripped up referencing the lagged values of the column itself within thewithColumn()
statement. Another problem with thewhen()
+lag()
approach is that other variables are referencing the lagged variable, and the lagged variable is referencing other variables. (in other words, there is just one lagged value getting fed into each row, but that value interacts differently with other variables based on the conditions met by that row.