3

Consider the following pyspark data frame,

df = sqlContext.createDataFrame(
    [
        ('2019-05-08 11:00:00', 'a'),
        ('2019-05-08 11:02:12', 'b'),
        ('2019-05-08 11:04:24', 'a'),
        ('2019-05-08 11:06:36', 'c'),
        ('2019-05-08 11:08:48', 'c'),
        ('2019-05-08 11:11:00', 'a'),
        ('2019-05-08 11:13:12', 'v'),
        ('2019-05-08 11:23:34', 'd'),
        ('2019-05-08 11:26:24', 'e'),
        ('2019-05-08 11:28:36', 'c'),
        ('2019-05-08 11:30:48', 'b'),
        ('2019-05-08 11:35:12', 'b'),
        ('2019-05-08 11:37:24', 'b'),
        ('2019-05-08 11:44:00', 'a'),
        ('2019-05-08 11:48:24', 'x'),
        ('2019-05-08 11:50:36', 'k'),
        ('2019-05-08 11:55:00', 'b'),
        ('2019-05-08 12:01:36', 'c')
    ],
    ('datetime', 'value')
)

What I 'm trying to (efficiently) do is to find the rate of distinct value over time for 30 minute windows, opening every 5 minutes. So basically I need to find the rate (countDistinct(value) / (datetime.max() - datetime.min())) over the time windows, and giving as a result:

  • 11:00 - 11:30 -- 6/1716 (a, b, e, d, c, v /(2019-05-08 11:28:36 - 2019-05-08 11:00:00 in seconds))
  • 11:05 - 11:35 -- 6/1452 (a, b, e, d, c, v /(2019-05-08 11:30:48 - 2019-05-08 11:06:36 in seconds))
  • 11:10 - 11:40
  • 11:15 - 11:45

and so on...

I did try to go with the window function, with which I did have some success for the distinct count (which is not supported so I went with F.size(F.collect_set('value').over(w))) but I could not do it for a custom function. I also tried UDF but again no luck.

Sotos
  • 51,121
  • 6
  • 32
  • 66
  • This post may be useful: https://stackoverflow.com/questions/33207164/spark-window-functions-rangebetween-dates – pault May 10 '19 at 14:19
  • @pault thanks for the link. I did see that as well whilst trying to solve it but my problem is how to apply a custom function (either UDF or pure spark) – Sotos May 10 '19 at 14:28
  • Could you please post the expected output based on your sample ? It could help to understand exactly what you want – Steven May 10 '19 at 14:31
  • @Steven edited. Apologies for the poor format of the expected output - It is meant to be a data frame – Sotos May 10 '19 at 14:40
  • @sotos if it is supposed to be a dataframe, you could probably generate a table using any excel like soft and then post it here... – Steven May 10 '19 at 14:55

1 Answers1

2

I am not sure it is the most optimized way of doing it, but here is one solution :

from pyspark.sql import functions as F, Window

df = df.withColumn("window", F.window("datetime", "5 minutes"))

df = df.withColumn(
    "start",
    F.unix_timestamp(F.col('window.start'))
)

df = df.withColumn(
    "cnt", 
    F.size(F.collect_set("value").over(Window.partitionBy().orderBy("start").rangeBetween(0,1799)))
)

df = df.withColumn(
    "end", 
    F.unix_timestamp(F.max("datetime").over(Window.partitionBy().orderBy("start").rangeBetween(0,1799)))
)

df = df.withColumn(
    "start", 
    F.unix_timestamp(F.min("datetime").over(Window.partitionBy().orderBy("start").rangeBetween(0,1799)))
)


df.select(
    F.col("window.start").alias("range_start"),
    (F.unix_timestamp(F.col("window.start"))+1800).cast("timestamp").alias("range_end"),
    (F.col('cnt')/(F.col("end")-F.col("start"))).alias("ratio")
).distinct().show()

+-------------------+-------------------+--------------------+
|        range_start|          range_end|               ratio|
+-------------------+-------------------+--------------------+
|2019-05-08 11:00:00|2019-05-08 11:30:00|0.003496503496503...|
|2019-05-08 11:05:00|2019-05-08 11:35:00|0.004132231404958678|
|2019-05-08 11:10:00|2019-05-08 11:40:00|0.003787878787878788|
|2019-05-08 11:20:00|2019-05-08 11:50:00|0.004026845637583893|
|2019-05-08 11:25:00|2019-05-08 11:55:00|0.004132231404958678|
|2019-05-08 11:30:00|2019-05-08 12:00:00|0.002754820936639...|
|2019-05-08 11:35:00|2019-05-08 12:05:00|0.003156565656565...|
|2019-05-08 11:40:00|2019-05-08 12:10:00|0.004734848484848485|
|2019-05-08 11:45:00|2019-05-08 12:15:00|0.005050505050505051|
|2019-05-08 11:50:00|2019-05-08 12:20:00|0.004545454545454545|
|2019-05-08 11:55:00|2019-05-08 12:25:00|0.005050505050505051|
|2019-05-08 12:00:00|2019-05-08 12:30:00|                null|
+-------------------+-------------------+--------------------+

Here is another version that I found more coherent :

df = df.withColumn("window", F.window("datetime", "5 minutes"))

df_range = df.select(F.window("datetime", "5 minutes").getItem("start").alias("range_start"))
df_range = df_range.select(
    "range_start",
    (F.unix_timestamp(F.col("range_start"))+1800).cast("timestamp").alias("range_end")
).distinct()


df_ratio = df.join(
    df_range, 
    how='inner',
    on=( (df.datetime >= df_range.range_start) & (df.datetime < df_range.range_end) )
)

df_ratio = df_ratio.groupBy(
    "range_start",
    "range_end",
).agg(
    F.max("datetime").alias("max_datetime"),
    F.min("datetime").alias("min_datetime"),
    F.size(F.collect_set("value")).alias("nb")
)

df_ratio.select(
    "range_start",
    "range_end",
    (F.col('nb')/(F.unix_timestamp('max_datetime')-F.unix_timestamp('min_datetime'))).alias("ratio")    
).show()

+-------------------+-------------------+--------------------+                  
|        range_start|          range_end|               ratio|
+-------------------+-------------------+--------------------+
|2019-05-08 11:00:00|2019-05-08 11:30:00|0.003496503496503...|
|2019-05-08 11:05:00|2019-05-08 11:35:00|0.004132231404958678|
|2019-05-08 11:10:00|2019-05-08 11:40:00|0.003787878787878788|
|2019-05-08 11:20:00|2019-05-08 11:50:00|0.004026845637583893|
|2019-05-08 11:25:00|2019-05-08 11:55:00|0.004132231404958678|
|2019-05-08 11:30:00|2019-05-08 12:00:00|0.002754820936639...|
|2019-05-08 11:35:00|2019-05-08 12:05:00|0.003156565656565...|
|2019-05-08 11:40:00|2019-05-08 12:10:00|0.004734848484848485|
|2019-05-08 11:45:00|2019-05-08 12:15:00|0.005050505050505051|
|2019-05-08 11:50:00|2019-05-08 12:20:00|0.004545454545454545|
|2019-05-08 11:55:00|2019-05-08 12:25:00|0.005050505050505051|
|2019-05-08 12:00:00|2019-05-08 12:30:00|                null|
+-------------------+-------------------+--------------------+
Steven
  • 14,048
  • 6
  • 38
  • 73
  • Thank you for the reply. I can not understand the `cnt` values. What I want to calculate is the rate of distinct. So for the first window (11 - 11:30), it should give 0.0035 (or 6 / 1716 which translates to N_distinct / (2019-05-08 11:28:36 - 2019-05-08 11:00:00)) – Sotos May 10 '19 at 14:27
  • Yes. That's what I am looking for. Thank you. I will let it run during the weekend and get the benchmarks on Monday. – Sotos May 10 '19 at 14:56
  • 1
    @Sotos updated with another version that i found more coherent – Steven May 10 '19 at 15:13
  • Thank you @Steven. I will check that one out as well and let you know about performance. – Sotos May 10 '19 at 15:38
  • Why does it seem to be skipping a range? (11:15 - 11:45 is missing) – Sotos May 13 '19 at 06:49
  • @Sotos I see .... I generate the ranges but if there is nothing for a range of 5 min, then I do not create the range. It means you have to generate lines which is a pain in the *ss on that kind of system. Maybe your real use case won't have this problem. – Steven May 15 '19 at 08:18
  • Ahh, ok I get it. There might be a case like that in real life but It will not affect it. Worst case scenario is to get the flag 5 minutes later. Well, that does it. Once again, thank you for the help. – Sotos May 15 '19 at 08:32