2

How to find the time between events in a group?

For example, I have Streaming Source (Kafka) from which I get many columns. This stream is read into spark, preprocessed, cleaned and only these four columns are kept: "ClientTimestamp" ,"sensor_type", "activity", "User_detail".

Now, I want to calculate the total time for which the critical activity existed for each user.

 Clientimestamp         Sensor_type     activity         User_detail
4/11/2021 10:00:00      ultrasonic       critical          user_A
4/11/2021 10:00:00      ultrasonic       normal            user_B            
4/11/2021 10:03:00      ultrasonic       normal            user_A
4/11/2021 10:05:00      ultrasonic       critical          user_B
4/11/2021 10:06:00      ultrasonic       critical          user_A
4/11/2021 10:07:00      ultrasonic       critical          user_A
4/11/2021 10:08:00      ultrasonic       critical          user_B
4/11/2021 10:09:00      ultrasonic       critical          user_B

so for user_A the total time between all critical activity is calculated by finding difference between two critical events and summing up such differences.

(10:00:00 - 10:06:00)+(10:06:00 - 10:07:00)
 therefore for userA critical activity lasted for total minute of (5+1)= 6 minutes.

Similarly for user_B,

(10:05:00 - 10:08:00)+ (10:08:00-10:09:00)
 userB critical activity lasted for total minute of (3+1) = 4 minute

For each window, i want to call a custom function that will calculate totaltime. How to apply a function on the group grouped by window?

df = df.withWatermark("clientTimestamp", "10 minutes")\
       .groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail'), col('activity')) 
       .apply(calculate_time)
Michael Heil
  • 16,250
  • 3
  • 42
  • 77
krish
  • 53
  • 6
  • see https://stackoverflow.com/questions/40006395/applying-udfs-on-groupeddata-in-pyspark-with-functioning-python-example – ggordon Apr 11 '21 at 14:16
  • @ggordon, Hi, I checked this solution long before and tried too in the link here. The link you have provided works best for static dataframe. but in the case of structured streaming where we have unbounded table this doesnot work. Any suggestions? (https://stackoverflow.com/questions/67001737/pyspark-applying-custom-function-on-structured-streaming) – krish Apr 11 '21 at 16:35

1 Answers1

1

It looks like this could be solved by taking the difference between the maximum and minimum time for each User_detail within the Window. Also, a filter on the activity can be applied to ignore "normal" rows.

I do not see a reason why applying a custom function such as "calculate_time" is required here. Please note, I am not completely familiar with Python syntax, but your code could look like below:

df = df \
  .filter(df.activity == "critical") \
  .withWatermark("clientTimestamp", "10 minutes") \
  .groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail')) \
  .agg((max("clientTimestamp") - min("clientTimestamp")).alias("time_difference"))
Michael Heil
  • 16,250
  • 3
  • 42
  • 77