The tranformation you are looking for can be achieved in two steps:
- Generate all posible combinations where
df["TimeStamp2"] >= df[TimeStamp1"]
using a self join
. This forms our candidate_df
.
- We prune the
candidate_df
to retrieve the expected rows by finding the row containing the minimum TimeStamp2
across rows containing for TimeStamp1
. We do this my partitioning the candidate_df
by TimeStamp1
then ordering by TimeStamp2
ascending and returning the first row.
If you have threshold for the "maximum nearness" (i.e) maximum difference between TimeStamp1
and nearest TimeStamp2
, then the solution can be optimized to reduce size of candidate_df
.
Working Example
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(datetime.strptime("2021-11-01T01:55:29.473", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T01:55:28.205", "%Y-%m-%dT%H:%M:%S.%f"), "A"),
(datetime.strptime("2021-11-01T01:55:30.474", "%Y-%m-%dT%H:%M:%S.%f"), 3, datetime.strptime("2021-11-01T01:55:31.205", "%Y-%m-%dT%H:%M:%S.%f"), "B"),
(datetime.strptime("2021-11-01T05:01:55.247", "%Y-%m-%dT%H:%M:%S.%f"), 195, datetime.strptime("2021-11-01T03:44:14.208", "%Y-%m-%dT%H:%M:%S.%f"), "C"),
(datetime.strptime("2021-11-01T05:01:56.247", "%Y-%m-%dT%H:%M:%S.%f"), 67, datetime.strptime("2021-11-01T05:41:56.205", "%Y-%m-%dT%H:%M:%S.%f"), "D"),
(datetime.strptime("2021-11-01T09:41:30.264", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T09:41:29.405", "%Y-%m-%dT%H:%M:%S.%f"), "E"),
(datetime.strptime("2021-11-01T09:41:32.264", "%Y-%m-%dT%H:%M:%S.%f"), 67, datetime.strptime("2021-11-01T09:41:35.205", "%Y-%m-%dT%H:%M:%S.%f"), "F"),]
df = spark.createDataFrame(data, ("TimeStamp1", "Value1", "TimeStamp2", "Value2",))
candidate_df = df.alias("l").join(df.alias("r"), F.col("r.TimeStamp2") >= F.col("l.TimeStamp1"))\
.selectExpr("l.TimeStamp1 as TimeStamp1",
"l.Value1 as Value1",
"r.TimeStamp2 as TimeStamp2",
"r.Value2 as Value2")
window_spec = Window.partitionBy("TimeStamp1").orderBy("TimeStamp2")
candidate_df.withColumn("rn" ,F.row_number().over(window_spec))\
.filter(F.col("rn") == 1)\
.drop("rn", "TimeStamp2")\
.show(200, False)
Output
+-----------------------+------+------+
|TimeStamp1 |Value1|Value2|
+-----------------------+------+------+
|2021-11-01 01:55:29.473|131 |B |
|2021-11-01 01:55:30.474|3 |B |
|2021-11-01 05:01:55.247|195 |D |
|2021-11-01 05:01:56.247|67 |D |
|2021-11-01 09:41:30.264|131 |F |
|2021-11-01 09:41:32.264|67 |F |
+-----------------------+------+------+