I have a spark DataFrame (in Scala) like this:
+---------+-------------+------+---------+------------+
| user_id| item_id| mood| time|sessionBegin|
+---------+-------------+------+---------+------------+
| 1| A| Happy| 0| 0|
| 1| B| Happy| 1| 0|
| 1| C| Happy| 3| 0|
| 1| D| Happy| 5| 0|
| 1| C| Happy| 6| 0|
| 1| D| Sad| 6| 0|
| 1| C| Sad| 10| 0|
| 1| A| Happy| 28| 0|
| 1| E| Happy| 35| 0|
| 1| E| Sad| 60| 0|
| 2| F| Happy| 6| 6|
| 2| E| Happy| 17| 6|
| 2| D| Happy| 20| 6|
| 2| D| Sad| 21| 6|
| 2| E| Happy| 27| 6|
| 2| G| Happy| 37| 6|
| 2| H| Happy| 39| 6|
| 2| G| Sad| 45| 6|
+---------+-------------+------+---------+------------+
I have defined a Window over columns (user_id, sessionBegin) and ordered by time
val window = Window.partitionBy("user_id","sessionBegin").orderBy("time")
Now I want to add a column result
which :
1) Checks if the Mood is Happy
then collect all the item_id
ONLY AFTER the current row && mood = Sad
. Else if Mood is sad
: put empty array.
2) This has to be over the window
I specified above.
(For eg. this dataframe has two windows -> first is (user_id = 1 , sessionBegin = 0) and second is (user_id = 2, sessionBegin = 6)
Hence the resulting DF will be :
+---------+-------------+------+---------+------------+---------+
| user_id| item_id| mood| time|sessionBegin| result|
+---------+-------------+------+---------+------------+---------+
| 1| A| Happy| 0| 0| [D,C,E]|
| 1| B| Happy| 1| 0| [D,C,E]|
| 1| C| Happy| 3| 0| [D,C,E]|
| 1| D| Happy| 5| 0| [D,C,E]|
| 1| C| Happy| 6| 0| [D,C,E]|
| 1| D| Sad| 6| 0| []|
| 1| C| Sad| 10| 0| []|
| 1| A| Happy| 28| 0| [E]|
| 1| E| Happy| 35| 0| [E]|
| 1| E| Sad| 60| 0| []|
| 2| F| Happy| 6| 6| [D,G]|
| 2| E| Happy| 17| 6| [D,G]|
| 2| D| Happy| 20| 6| [D,G]|
| 2| D| Sad| 21| 6| []|
| 2| E| Happy| 27| 6| [G]|
| 2| G| Happy| 37| 6| [G]|
| 2| H| Happy| 39| 6| [G]|
| 2| G| Sad| 45| 6| []|
+---------+-------------+------+---------+------------+---------+
I used collect_set
method using when..otherwise
over the window, but I was unable to figure out two things:
- How to ONLY consider the rows after the current row
- For all the rows with
mood=Happy
how to collect_set theitem_id
ONLY whenmood=sad
?
Any points to how to approach this?