I'm looking for a way to join the two following Spark Datasets:
# city_visits:
person_id city timestamp
-----------------------------------------------
1 Paris 2017-01-01 00:00:00
1 Amsterdam 2017-01-03 00:00:00
1 Brussels 2017-01-04 00:00:00
1 London 2017-01-06 00:00:00
2 Berlin 2017-01-01 00:00:00
2 Brussels 2017-01-02 00:00:00
2 Berlin 2017-01-06 00:00:00
2 Hamburg 2017-01-07 00:00:00
# ice_cream_events:
person_id flavour timestamp
-----------------------------------------------
1 Vanilla 2017-01-02 00:12:00
1 Chocolate 2017-01-05 00:18:00
2 Strawberry 2017-01-03 00:09:00
2 Caramel 2017-01-05 00:15:00
So that for each row in city_visits
, the row in ice_cream_events
with same person_id
and next timestamp
value is joined, leading to this output:
person_id city timestamp ic_flavour ic_timestamp
---------------------------------------------------------------------------
1 Paris 2017-01-01 00:00:00 Vanilla 2017-01-02 00:12:00
1 Amsterdam 2017-01-03 00:00:00 Chocolate 2017-01-05 00:18:00
1 Brussels 2017-01-04 00:00:00 Chocolate 2017-01-05 00:18:00
1 London 2017-01-06 00:00:00 null null
2 Berlin 2017-01-01 00:00:00 Strawberry 2017-01-03 00:09:00
2 Brussels 2017-01-02 00:00:00 Strawberry 2017-01-03 00:09:00
2 Berlin 2017-01-06 00:00:00 null null
2 Hamburg 2017-01-07 00:00:00 null null
Closest solution I've had so far is the following, however this obviously joins every row in ice_cream_events
that matches the conditions, not just the first one:
val cv = city_visits.orderBy("person_id", "timestamp")
val ic = ice_cream_events.orderBy("person_id", "timestamp")
val result = cv.join(ic, ic("person_id") === cv("person_id")
&& ic("timestamp") > cv("timestamp"))
Is there a (preferably efficient) way to specify that the join is desired only on the first matching ice_cream_events
row and not all of them?