Consider the following pyspark data frame,
df = sqlContext.createDataFrame(
[
('2019-05-08 11:00:00', 'a'),
('2019-05-08 11:02:12', 'b'),
('2019-05-08 11:04:24', 'a'),
('2019-05-08 11:06:36', 'c'),
('2019-05-08 11:08:48', 'c'),
('2019-05-08 11:11:00', 'a'),
('2019-05-08 11:13:12', 'v'),
('2019-05-08 11:23:34', 'd'),
('2019-05-08 11:26:24', 'e'),
('2019-05-08 11:28:36', 'c'),
('2019-05-08 11:30:48', 'b'),
('2019-05-08 11:35:12', 'b'),
('2019-05-08 11:37:24', 'b'),
('2019-05-08 11:44:00', 'a'),
('2019-05-08 11:48:24', 'x'),
('2019-05-08 11:50:36', 'k'),
('2019-05-08 11:55:00', 'b'),
('2019-05-08 12:01:36', 'c')
],
('datetime', 'value')
)
What I 'm trying to (efficiently) do is to find the rate of distinct value
over time for 30 minute windows, opening every 5 minutes. So basically I need to find the rate (countDistinct(value) / (datetime.max() - datetime.min())
) over the time windows, and giving as a result:
- 11:00 - 11:30 -- 6/1716 (a, b, e, d, c, v /(2019-05-08 11:28:36 - 2019-05-08 11:00:00 in seconds))
- 11:05 - 11:35 -- 6/1452 (a, b, e, d, c, v /(2019-05-08 11:30:48 - 2019-05-08 11:06:36 in seconds))
- 11:10 - 11:40
- 11:15 - 11:45
and so on...
I did try to go with the window function, with which I did have some success for the distinct count (which is not supported so I went with F.size(F.collect_set('value').over(w))
) but I could not do it for a custom function. I also tried UDF but again no luck.