I need advice on how to get future 3 incidents for each event , please see the below input and outputs.
input:
+-------+-----+--------------------+--------------------+
|eventId|incId| eventDate| incDate|
+-------+-----+--------------------+--------------------+
| 1| 123|2018-02-09 10:01:...|2018-02-09 10:02:...|
| 2| 0|2018-02-09 10:02:...| null|
| 3| 124|2018-02-09 10:03:...|2018-02-09 10:03:...|
| 4| 0|2018-02-09 10:04:...| null|
| 5| 125|2018-02-09 10:05:...|2018-02-10 11:03:...|
| 6| 0|2018-02-09 10:06:...| null|
| 7| 126|2018-02-09 10:07:...|2018-02-10 11:04:...|
| 8| 127|2018-02-09 10:08:...|2018-02-10 09:05:...|
| 9| 0|2018-02-09 10:09:...| null|
| 10| 0|2018-02-10 11:30:...| null|
| 11| 0|2018-02-10 11:40:...| null|
+-------+-----+--------------------+--------------------+
input can be created from
val df=sc.parallelize(
| Seq((1,123,"2/9/2018 10:01:00","2/9/2018 10:02:00"),
| (2,0,"2/9/2018 10:02:00",""),
| (3,124,"2/9/2018 10:03:00","2/9/2018 10:03:00"),
| (4,0,"2/9/2018 10:04:00",""),
| (5,125,"2/9/2018 10:05:00","2/10/2018 11:03:00"),
| (6,0,"2/9/2018 10:06:00",""),
| (7,126,"2/9/2018 10:07:00","2/10/2018 11:04:00"),
| (8,127,"2/9/2018 10:08:00","2/10/2018 09:05:00"),
| (9,0,"2/9/2018 10:09:00",""),
| (10,0,"2/10/2018 11:30:00",""),
| (11,0,"2/10/2018 11:40:00","")
| )).toDF("eventId","incId","eventDate1","incDate1").withColumn("eventDate", from_unixtime(unix_timestamp(col("eventDate1"),"MM/dd/yyyy HH:mm:ss")).cast("timestamp")).withColumn("incDate", from_unixtime(unix_timestamp(col("incDate1"),"MM/dd/yyyy HH:mm:ss")).cast("timestamp")).drop("eventDate1","incDate1")
output expected:
+-------+--------------------+-----+--------------------+----+----+----+
|eventId| eventDate|incId| incDate|inc1|inc2|inc3|
+-------+--------------------+-----+--------------------+----+----+----+
| 1|2018-02-09 10:01:...| 123|2018-02-09 10:02:...| 124| 127| 125|
| 2|2018-02-09 10:02:...| 0| null| 124| 127| 125|
| 3|2018-02-09 10:03:...| 124|2018-02-09 10:03:...| 127| 125| 126|
| 4|2018-02-09 10:04:...| 0| null| 125| 126|null|
| 5|2018-02-09 10:05:...| 125|2018-02-10 11:03:...| 125| 126|null|
| 6|2018-02-09 10:06:...| 0| null| 125| 126|null|
| 7|2018-02-09 10:07:...| 126|2018-02-10 11:04:...| 125| 126|null|
| 8|2018-02-09 10:08:...| 127|2018-02-10 09:05:...| 125| 126|null|
| 9|2018-02-09 10:09:...| 0| null| 125| 126|null|
+-------+--------------------+-----+--------------------+----+----+----+
explanation on how to get output
that means , get the next 3 incId's for each eventId(eventDate < IncDate). PS:I have tried Window.partitionBy etc, but could not get any proper results.