1

I'm trying to extract combined data intervals based on a time series in scala and spark

I have the following data in a dataframe:

Id | State | StartTime           | EndTime
---+-------+---------------------+--------------------
 1 |   R   | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
 1 |   R   | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
 1 |   R   | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
 1 |   W   | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
 1 |   W   | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
 1 |   R   | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
 1 |   R   | 2019-01-02T18:30:00 | 2019-01-02T22:45:00

I need to extract the data into time intervals based on the id and state. The resulting data needs to look like:

Id | State | StartTime           | EndTime
---+-------+---------------------+--------------------
 1 |   R   | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
 1 |   W   | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
 1 |   R   | 2019-01-02T13:45:00 | 2019-01-02T22:45:00

Note that the first three records have been grouped together because the equipment is contiguously in an R state from 2019-01-01T03:00:00 to 2019-01-01T22:00:00, then it switches to a W state for the next two records from 2019-01-01T22:00:00 to 2019-01-02T13:45:00and then back to an R state for the last two records.

Jeff Hornby
  • 12,948
  • 4
  • 40
  • 61
  • Possible duplicate of [Spark SQL window function with complex condition](https://stackoverflow.com/questions/42448564/spark-sql-window-function-with-complex-condition) – 10465355 Mar 08 '19 at 16:50
  • I looked at that question and it is a very different problem – Jeff Hornby Mar 08 '19 at 17:01
  • In such case could edit your question and explain desired logic in detail? Additionally we always welcome [reproducible example](https://stackoverflow.com/q/48427185/10465355) in [tag:apache-spark]. Thank you in advance. – 10465355 Mar 08 '19 at 17:03
  • 2
    turns out the solution is this: https://stackoverflow.com/questions/7420618/combine-rows-when-the-end-time-of-one-is-the-start-time-of-another-oracle translated to spark – Jeff Hornby Mar 08 '19 at 19:44
  • hi @JeffHornby, did you manage to convert this to Spark code? – abiratsis Mar 15 '19 at 08:07

1 Answers1

0

So it turns out that the answer to this is Combine rows when the end time of one is the start time of another (Oracle) translated to Spark.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,row_number}
import spark.implicits._

val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
  .select('Id,'State,'StartTime,'EndTime,
          row_number().over(idSpec).as("idRowNumber"),
          row_number().over(idStateSpec).as("idStateRowNumber"))
  .groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
  .agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))
Jeff Hornby
  • 12,948
  • 4
  • 40
  • 61