I have a PySpark Dataframe in the following format:
+-------+----------+---------------------+
| event | consumer | timestamp |
+-------+----------+---------------------+
| E | 1 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 13:30:00 |
| E | 1 | 2020-09-09 14:20:00 |
| T | 1 | 2020-09-09 14:35:00 |
| T | 2 | 2020-09-09 13:20:00 |
| E | 2 | 2020-09-09 13:25:00 |
| E | 2 | 2020-09-09 14:45:00 |
| T | 2 | 2020-09-09 14:50:00 |
+-------+----------+---------------------+
Is there a way to iterate through a group partitioned by the consumer
and ordered by the timestamp
and set values to a new column?
The new column will define the session_timestamp
. That is the logic behind it:
- A session only starts with an event
E
. - If a new event happens in one hour after the session starts, it belongs to that session.
- If an event happened more than one hour of the event who started the session, it belongs to another session (that's what happens between lines 2 and 3 in the DataFrame).
So the result for the Dataframe above is:
+-------+----------+---------------------+---------------------+
| event | consumer | timestamp | session_timestamp |
+-------+----------+---------------------+---------------------+
| E | 1 | 2020-09-09 13:15:00 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 13:30:00 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 14:20:00 | 2020-09-09 14:20:00 |
| T | 1 | 2020-09-09 14:35:00 | 2020-09-09 14:20:00 |
| T | 2 | 2020-09-09 13:20:00 | Null |
| E | 2 | 2020-09-09 13:25:00 | 2020-09-09 13:25:00 |
| E | 2 | 2020-09-09 14:45:00 | 2020-09-09 14:45:00 |
| T | 2 | 2020-09-09 14:50:00 | 2020-09-09 14:45:00 |
+-------+----------+---------------------+---------------------+
Is there a way to do it on Pyspark?