2

I have a log file composed of "Events", "Time", "UserId".

+------------+----------------+---------+
|   Events   |      Time      | UserId  |
+------------+----------------+---------+
| ClickA     | 7/6/16 10:00am | userA   |
+------------+----------------+---------+
| ClickB     | 7/6/16 12:00am | userA   |
+------------+----------------+---------+

I would like, for each users, to compute the average time between events. How do you guys solve this problem? In a traditional programming environment I would go through each events for an user and calculate the time delta between events n and n-1, adding this value to an array A. I would then compute the average for each value in A. How can I do this with Spark?

Ahmet
  • 802
  • 1
  • 5
  • 18

1 Answers1

4

Ignoring date parsing it looks like a job for a window function followed by a simple aggregation so roughly you need something like this:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{lag, avg}

val df = Seq(
  ("ClickA", "2016-06-07 10:00:00", "UserA"),
  ("ClickB", "2016-06-07 12:00:00", "UserA")
).toDF("events", "time", "userid").withColumn("time", $"time".cast("timestamp"))

val w = Window.partitionBy("userid").orderBy("time")

// Difference between consecutive events in seconds
val diff = $"time".cast("long") - lag($"time", 1).over(w).cast("long")

df.withColumn("diff", diff).groupBy("userid").agg(avg($"diff"))
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you zero323! Do you know how I can cast this string (5/1/2016 4:03:34 PM) to a timestamp? I couldn't find the proper way with pyspark. – Ahmet Jul 14 '16 at 22:06
  • Pretty much as shown here: http://stackoverflow.com/a/36095322/1560062 but you'll have to adjust format (https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html) – zero323 Jul 14 '16 at 22:11