0

I have a Pyspark dataframe containing logs, with each row corresponding to the state of the system at the time it is logged, and a group number. I would like to find the lengths of the time periods for which each group is in an unhealthy state.

For example, if this were my table:

TIMESTAMP | STATUS_CODE | GROUP_NUMBER
--------------------------------------
02:03:11  | healthy     | 000001
02:03:04  | healthy     | 000001
02:03:03  | unhealthy   | 000001
02:03:00  | unhealthy   | 000001
02:02:58  | healthy     | 000008
02:02:57  | healthy     | 000008
02:02:55  | unhealthy   | 000001
02:02:54  | healthy     | 000001
02:02:50  | healthy     | 000007
02:02:48  | healthy     | 000004

I would want to return Group 000001 having an unhealthy time period of 9 seconds (from 02:02:55 to 02:03:04).

Other groups could also have unhealthy time periods, and I would want to return those as well.

Due to the possibility of consecutive rows with the same status, and since rows of different groups are interspersed, I am struggling to find a way to do this efficiently.

I cannot convert the Pyspark dataframe to a Pandas dataframe, as it is much too large.

How can I efficiently determine the lengths of these time periods?

Thanks so much!

  • did any of the posted solution worked? – rogue-one May 23 '17 at 15:03
  • @rogue-one - thanks, your implementation was very well-explained and detailed! Walking through the steps, it seems quite brilliant and clever. An extension of this - is there a way to get the length of each time period for which a group is in the unhealthy state (e.g. if Group 000001 has multiple periods of unhealthy state, return the length of each)? –  May 23 '17 at 19:26
  • if you do a `t2.timestamp_value - t1.timestamp_value` without the sum and group by in my query, you should be able to get the duration.. – rogue-one May 23 '17 at 19:53

2 Answers2

0

One straightforward way (may be not optimal) is:

  1. Map to [K,V] with GROUP_NUMBER as the Key K
  2. Use repartitionAndSortWithinPartitions, so you will have all data for every single group in the same partition and have them sorted by TIMESTAMP. Detailed explanation how it works is in this answer: Pyspark: Using repartitionAndSortWithinPartitions with multiple sort Critiria
  3. And finally use mapPartitions to get an iterator over sorted data in single partition, so you could easily find the answer you needed. (explanation for mapPartitions: How does the pyspark mapPartitions function work?)
Timofey Chernousov
  • 1,284
  • 8
  • 12
0

the pyspark with spark-sql solution would look like this.

First we create the sample data-set. In addition to the dataset we generate row_number field partition on group and order by the timestamp. then we register the generated dataframe as a table say table1

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import unix_timestamp

df = spark.createDataFrame([
('2017-01-01 02:03:11','healthy','000001'),
('2017-01-01 02:03:04','healthy','000001'),
('2017-01-01 02:03:03','unhealthy','000001'),
('2017-01-01 02:03:00','unhealthy','000001'),
('2017-01-01 02:02:58','healthy','000008'),
('2017-01-01 02:02:57','healthy','000008'),
('2017-01-01 02:02:55','unhealthy','000001'),
('2017-01-01 02:02:54','healthy','000001'),
('2017-01-01 02:02:50','healthy','000007'),
('2017-01-01 02:02:48','healthy','000004')
],['timestamp','state','group_id'])

df = df.withColumn('rownum', row_number().over(Window.partitionBy(df.group_id).orderBy(unix_timestamp(df.timestamp))))

df.registerTempTable("table1")

once the dataframe is registered as a table (table1). the required data can be computed as below using spark-sql

>>> spark.sql("""
... SELECT t1.group_id,sum((t2.timestamp_value - t1.timestamp_value)) as duration
... FROM
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1 WHERE state = 'unhealthy') t1
... LEFT JOIN
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1) t2
... ON t1.group_id = t2.group_id
... AND t1.rownum = t2.rownum - 1
... group by t1.group_id
... """).show()
+--------+--------+
|group_id|duration|
+--------+--------+
|  000001|       9|
+--------+--------+

the sample dateset had unhealthy data for group_id 00001 only. but this solution works for cases other group_ids with unhealthy state.

rogue-one
  • 11,259
  • 7
  • 53
  • 75