3

I have a PySpark Dataframe in the following format:

+-------+----------+---------------------+
| event | consumer |      timestamp      |
+-------+----------+---------------------+
| E     |        1 | 2020-09-09 13:15:00 |
| E     |        1 | 2020-09-09 13:30:00 |
| E     |        1 | 2020-09-09 14:20:00 |
| T     |        1 | 2020-09-09 14:35:00 |
| T     |        2 | 2020-09-09 13:20:00 |
| E     |        2 | 2020-09-09 13:25:00 |
| E     |        2 | 2020-09-09 14:45:00 |
| T     |        2 | 2020-09-09 14:50:00 |
+-------+----------+---------------------+

Is there a way to iterate through a group partitioned by the consumer and ordered by the timestamp and set values to a new column?

The new column will define the session_timestamp. That is the logic behind it:

  • A session only starts with an event E.
  • If a new event happens in one hour after the session starts, it belongs to that session.
  • If an event happened more than one hour of the event who started the session, it belongs to another session (that's what happens between lines 2 and 3 in the DataFrame).

So the result for the Dataframe above is:

+-------+----------+---------------------+---------------------+
| event | consumer |      timestamp      |  session_timestamp  |
+-------+----------+---------------------+---------------------+
| E     |        1 | 2020-09-09 13:15:00 | 2020-09-09 13:15:00 |
| E     |        1 | 2020-09-09 13:30:00 | 2020-09-09 13:15:00 |
| E     |        1 | 2020-09-09 14:20:00 | 2020-09-09 14:20:00 |
| T     |        1 | 2020-09-09 14:35:00 | 2020-09-09 14:20:00 |
| T     |        2 | 2020-09-09 13:20:00 | Null                |
| E     |        2 | 2020-09-09 13:25:00 | 2020-09-09 13:25:00 |
| E     |        2 | 2020-09-09 14:45:00 | 2020-09-09 14:45:00 |
| T     |        2 | 2020-09-09 14:50:00 | 2020-09-09 14:45:00 |
+-------+----------+---------------------+---------------------+

Is there a way to do it on Pyspark?

  • Does this answer your question? [Spark Window Functions - rangeBetween dates](https://stackoverflow.com/questions/33207164/spark-window-functions-rangebetween-dates) – Ofek Hod Oct 15 '20 at 15:53
  • Hi @OfekHod thank you for the reply, but it does not answer since the unbounded preceding will get the previous lines with 1 hour difference. If we used this solution, row 3 from the DataFrame above would receive the timestamp from the previous row, what is wrong. – Rafael Castelao Oct 15 '20 at 17:14
  • If I get you right, you should just change `PRECEDING` to `FOLLOWING` and get the desired result, if not, please explain more precisely what you are looking for. – Ofek Hod Oct 15 '20 at 19:11
  • `FOLLOWING` won't help neither. In the example above, a session starts with an event `E`, so for consumer 1 it starts at 13:15. Since the next event is less than 1 hour from starting event (13:15), it is part of the same event. However, event 3 is more than 1 hour ahead of starting event, even though it is less than 1 hour from it's previous event, which is now part of 13:15 session. So event 3 starts a new session (14:20). This dynamics does not work with a window function using preceding or following and an interval range. – Rafael Castelao Oct 15 '20 at 21:53
  • You right, anyway, I think you can get it work with `PERCDEING`: firstly partition window by event and order by timestamp, then check for every current row X in window the earliest row from the last hour (Y), and check the following conditions: if no Y found- set `X.session_timestamp = X.timestamp`, if Y found and `Y.timestamp == Y.session_timestamp` (i.e Y is start of session)- set `X.session_timestamp = Y.session_timestamp`, if found Y and `Y.timestamp != Y.session_timestamp` (i.e Y is not a start of a session)- start a new session for X by assigning `X.session_timestamp = X.timestamp`. – Ofek Hod Oct 15 '20 at 22:36
  • If you struggle with this or the solution doesn't meet your requirements I can try to write the code myself and post an answer – Ofek Hod Oct 15 '20 at 22:37
  • @OfekHod thank you for the reply. I am having a hard time trying to code what you explained. If you could help me with the code, it would be awesome. – Rafael Castelao Oct 16 '20 at 00:00
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/223126/discussion-between-rafael-castelao-and-ofek-hod). – Rafael Castelao Oct 16 '20 at 01:44

1 Answers1

0

As @Ofek told in comments, window function will help you. Here give you an example with scala, you can rewrite it in python by yourself. (Consider user defined aggregate function in pyspark is not easy, here collect and use udf handle it)

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df = <your-dataframe>

val findSessionStartTime = udf((rows: Seq[Seq[Any]]) => {
  val parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  var result: Date = null
  for (row <- rows.reverse) {
    val event = row(0)
    val time = parser.parse(row(1).toString)
    if (event == "E") {
      if (result == null || result.getTime - time.getTime < 3600000) {
        result = time
      }
    }
  }
  if (result == null)
    null
  else
    parser.format(result)
})

df.withColumn("events", collect_list(array($"event", $"timestamp")).over(Window
  .partitionBy($"consumer")
  .orderBy($"timestamp")))
  .withColumn("session_timestamp", findSessionStartTime($"events"))
  .drop("events")
  .show(false)

And the result is following:

(Besides, your sample result in description is incorrect. The time between 2020-09-09 14:20:00 and 2020-09-09 13:30:00 is 50 minutes < 1 hour)

+-----+--------+-------------------+-------------------+
|event|consumer|timestamp          |session_timestamp  |
+-----+--------+-------------------+-------------------+
|E    |1       |2020-09-09 13:15:00|2020-09-09 13:15:00|
|E    |1       |2020-09-09 13:30:00|2020-09-09 13:15:00|
|E    |1       |2020-09-09 14:20:00|2020-09-09 13:15:00|
|T    |1       |2020-09-09 14:35:00|2020-09-09 13:15:00|
|T    |2       |2020-09-09 13:20:00|null               |
|E    |2       |2020-09-09 13:25:00|2020-09-09 13:25:00|
|E    |2       |2020-09-09 14:45:00|2020-09-09 14:45:00|
|T    |2       |2020-09-09 14:50:00|2020-09-09 14:45:00|
+-----+--------+-------------------+-------------------+
Dean Xu
  • 4,438
  • 1
  • 17
  • 44