1

I have a column of binary sensor data. I want to identify a consecutive sequence of 1s which denote an event occurring, and also get the interval of time it lasted for. Is this doable with spark? Here is an example of the data I'm working with.

Screenshot

I would be able to do this if I could go through the data row by row, but for that I would need to do a collect() first, but then all my data would be on a single machine. Some ideas that I had were: is there a way to collect the data on the worker nodes, and then do an iterative algorithm on it to generate event information, and then get the data back on the driver. I also read there is thing called structured streaming with spark 2.2, but I'm not sure if that is what I'm looking for.

Any other ideas are welcome.

FYI, I'm working with pyspark, and I'm very new to this.

General Grievance
  • 4,555
  • 31
  • 31
  • 45
donut_person
  • 25
  • 1
  • 6

2 Answers2

1

I started by looking at your other duplicate question (that has since been flagged as a duplicate), so I will answer this one.

Yes you can use a windowing function like you would in a query that uses the partition by command. See How to find longest sequence of consecutive dates?

Using a similar approach, but with dataframe transformations, you can achieve the same thing.

from pyspark.sql import Window
from pyspark.sql.functions import row_number

lst=[[1,1],[2,1],[3,1],[4,1],[5,0],[6,0],[7,0],[8,1],[9,1],[10,1]]
df=spark.createDataFrame(lst,['ID','Sensor'])

#define the window specification
w=Window.partitionBy(df['Sensor']).orderBy(df['ID'])

group_df=df.select('*',(df['ID']-row_number().over(w)).alias('grp')).orderBy('ID')
count_df=group_df.groupBy('grp').count()

#get result by joining sequence counts df back to df containing original columns
group_df\
  .join(count_df,count_df['grp']==group_df['grp'])\
  .select('ID','Sensor','count')\
  .filter('Sensor=1')\
  .orderBy('ID')\
  .show()

Yields the desired sequence length:

+---+------+-----+                                                              
| ID|Sensor|count|
+---+------+-----+
|  1|     1|    4|
|  2|     1|    4|
|  3|     1|    4|
|  4|     1|    4|
|  8|     1|    3|
|  9|     1|    3|
| 10|     1|    3|
+---+------+-----+
DavidWayne
  • 2,450
  • 15
  • 26
0

The workaround you describe (collect on nodes) can be achieved with RDD.mapPartitions. This allows you to provide a function which is mapped across an entire partition, which means you can iterate across consecutive samples within a subset of your data. Note that you would need to make sure that you identify when partitions start or end with a 1, and combine sequences between partitions. It might be a bit ugly, but should be possible.

Note that you will need to sort your data first, if it isn't already.

timchap
  • 503
  • 2
  • 11
  • Thanks for the answer. Why would I need to sort the data first? Also, do you think I could use spark structured streaming to do windowing operations to achieve this task? – donut_person Aug 04 '17 at 13:53