1

Let's say I have the following Spark frame:

+-------------------+--------+
|timestamp          |UserName|
+-------------------+--------+
|2021-08-11 04:05:06|A       |
|2021-08-11 04:15:06|B       |
|2021-08-11 09:15:26|A       |
|2021-08-11 11:04:06|B       |
|2021-08-11 14:55:16|A       |
|2021-08-13 04:12:11|B       |
+-------------------+--------+

I want to build time-series data for desired time resolution based on events counts for each user.

  • Note1: obliviously after groupbying on UserName & counting based on desired time frame\resolution, time frames need to be kept with spark frame. (maybe use of Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming )
  • Note2: needs to fill the missing gap for a specific time frame and replace 0 if there are no events.
  • Note3: I'm not interested in using UDF or hacking it via toPandas().

So let's say for 24hrs (daily) time frame expected results should be like below after groupBy:

+------------------------------------------+-------------+-------------+
|window_frame_24_Hours                     | username A  | username B  |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 23:59:59}|3            |2            |
|{2021-08-12 00:00:00, 2021-08-12 23:59:59}|0            |0            |
|{2021-08-13 00:00:00, 2021-08-13 23:59:59}|0            |1            |
+------------------------------------------+-------------+-------------+

Edit1: in case of 12hrs time frame\resolution:

+------------------------------------------+-------------+-------------+
|window_frame_12_Hours                     | username A  | username B  |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 11:59:59}|2            |2            |
|{2021-08-11 12:00:00, 2021-08-11 23:59:59}|1            |0            |
|{2021-08-12 00:00:00, 2021-08-12 11:59:59}|0            |0            |
|{2021-08-12 12:00:00, 2021-08-12 23:59:59}|0            |0            |
|{2021-08-13 00:00:00, 2021-08-13 11:59:59}|0            |1            |
|{2021-08-13 12:00:00, 2021-08-13 23:59:59}|0            |0            |
+------------------------------------------+-------------+-------------+
Mario
  • 1,631
  • 2
  • 21
  • 51

1 Answers1

1

Group by time window '1 day' + UserName to count then group by window frame and pivot user names:

from pyspark.sql import functions as F

result = df.groupBy(
    F.window("timestamp", "1 day").alias("window_frame_24_Hours"),
    "UserName"
).count().groupBy("window_frame_24_Hours").pivot("UserName").agg(
   F.first("count")
).na.fill(0)

result.show(truncate=False)

#+------------------------------------------+---+---+
#|window_frame_24_Hours                     |A  |B  |
#+------------------------------------------+---+---+
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0  |1  |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3  |2  |
#+------------------------------------------+---+---+

If you need the missing dates, you'll have to generate all dates using sequence on min and max timestamp then join with original dataframe:

intervals_df = df.withColumn(
    "timestamp",
    F.date_trunc("day", "timestamp")
).selectExpr(
    "sequence(min(timestamp), max(timestamp + interval 1 day), interval 1 day) as dates"
).select(
    F.explode(
        F.expr("transform(dates, (x, i) -> IF(i!=0, struct(date_trunc('dd', dates[i-1]) as start, dates[i] as end), null))")
    ).alias("frame")
).filter("frame is not null").crossJoin(
    df.select("UserName").distinct()
)

result = intervals_df.alias("a").join(
    df.alias("b"),
    F.col("timestamp").between(F.col("frame.start"), F.col("frame.end"))
    & (F.col("a.UserName") == F.col("b.UserName")),
    "left"
).groupBy(
    F.col("frame").alias("window_frame_24_Hours")
).pivot("a.UserName").agg(
    F.count("b.UserName")
)

result.show(truncate=False)

#+------------------------------------------+----------+----------+
#|window_frame_24_Hours                     |username_A|username_B|
#+------------------------------------------+----------+----------+
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0         |1         |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3         |2         |
#|{2021-08-12 00:00:00, 2021-08-13 00:00:00}|0         |0         |
#+------------------------------------------+----------+----------+
blackbishop
  • 30,945
  • 11
  • 55
  • 76
  • Alright, but based on *Note2* How about the missing time frame 2021-08-12 which need to be filed by 0 for users? (maybe `crossJoin( )` & `join("left")` like [this](https://stackoverflow.com/a/69963120/10452700) Is it possible to come up with an idea to escalate this for other desired time resolutions like 12hrs, 8 hrs like [this](https://stackoverflow.com/a/69868036/10452700)? – Mario Jan 31 '22 at 13:06
  • Thanks for the update; however, I'm not sure `window ("1 day")` would be the best practice since, as *Note1* mentioned, the aim is to reach any desired timeframe\resolution when we aggregate the time events like using `F.hour` & `cast()` on hours to make it for. I want to have a solution not to *limit* the event aggregation for just *24hrs* but for other time frames. Is it possible? – Mario Jan 31 '22 at 13:30
  • Another problem is in the real scenario I have lots of `UserName`s not just "A" & "B" and I need to use general form of `F.sum(F.when(F.col("b.UserName") == "A", 1).otherwise(0)).alias("username_A")` like `.agg().na.fill(0)` Please let me know what is the best approach. – Mario Jan 31 '22 at 13:42
  • 1
    @Mario then use pivot. see update – blackbishop Jan 31 '22 at 13:48
  • What if the timeframe was **not** 24hrs/daily once done by [Event-time Aggregation](https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html) then what is the best approach for reaching the correct time-series spark frame in your updated solution so that we can process event count of every user within the related *timeframe* ? Is it not better to [cast on the *hour*](https://stackoverflow.com/a/69868036/10452700)? I don't know what would be its consequence on filling the missing interval times process. – Mario Jan 31 '22 at 18:23
  • Please find *Edit1* at the end of the post. – Mario Jan 31 '22 at 19:39
  • I used pivot method successfully and works good for daily (24hrs), I'm not sure `F.window("timestamp", "12 hour").alias("window_frame_12_Hours"),` based on *Edit1* could solve the problem completely to keep sequenced data but it looks Ok. – Mario Feb 01 '22 at 18:26