1

Assume we have a DataFrame(df) below in PySpark. And, how to use PySpark to get the duration (in mins) between each biking event and the driving event? And, assume for each biking will have only one corresponding driving event; However, within a day there could be multiple "biking - driving" pairs. Eventually, store the final results into data frame, including biking__time, biking_event, driving_time, driving_event, and each_durations, etc)

Notes: there can be other events between biking and driving, such as a person can start from biking, running, swimming then driving.

One example refers to below table:

The duration of date 03/01/2018 between biking and driving is: 8:12 - 5:12 = 3 hours = 180 mins

TimeDetails Event
1 3/1/2018 5:12 Biking
2 3/1/2018 6:12 Swimming
3 3/1/2018 7:12 Hiking
4 3/1/2018 8:12 Driving
5 3/2/2018 9:12 Biking
6 3/2/2018 10:12 Swimming
7 3/2/2018 11:12 Swimming
8 3/2/2018 12:12 Driving
9 3/2/2018 13:12 Swimming

Below is the sample output:

biking_time event_name1 driving_time event_name2 durations_inMins
1 3/1/2018 5:12 biking 3/1/2018 8:12 driving 180
2 3/2/2018 9:12 biking 3/2/2018 12:12 driving 180

Below is some of my code:

biking_df = df.filter(df.Event == 'Biking)
driving_df = df.filter(df.Event == 'Driving')

Can someone please provide me with some code in PySpark? Thanks a lot

1 Answers1

0

Your exemple (I added another day with a missing Driving record - the solution will now also handle that):

df  = spark.createDataFrame(
  [
('1','3/1/2018 5:12','Biking')
,('2','3/1/2018 6:12','Swimming')
,('3','3/1/2018 7:12','Hiking')
,('4','3/1/2018 8:12','Driving')
,('5','3/2/2018 9:12','Biking')
,('6','3/2/2018 10:12','Swimming')
,('7','3/2/2018 11:12','Swimming')
,('8','3/2/2018 12:12','Driving')
,('9','3/2/2018 13:12','Swimming')
,('10','3/3/2018 9:10','Biking')
,('11','3/3/2018 9:50','Swimming')
,('12','3/3/2018 10:30','Swimming')
,('13','3/3/2018 11:12','Hiking')
  ], ['index','TimeDetails','Event']
)

Solution

from pyspark.sql import functions as F

df = df\
.withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))\
.withColumn('date', F.to_date('TimeDetails'))

#Finding all possible dates in the original dataset:
date_interval = df\
                .agg(
                    F.date_trunc("dd", F.max(F.col("date"))).alias("max_date"),
                    F.date_trunc("dd", F.min(F.col("date"))).alias("min_date"))\
                .withColumn('date_interval', F.explode(F.expr('sequence(min_date, max_date, interval 1 day)')))\
                .select('date_interval')\
                .withColumn('date_interval', F.to_date('date_interval'))

#Imputing those dates on biking and drivig subsets

biking_df = date_interval\
                .join(df.filter(df.Event == 'Biking'),date_interval.date_interval == df.date,'left')\
                .withColumn('Event', F.coalesce(F.col('Event'), F.lit('Biking')))\
                .select('date_interval',F.col('TimeDetails').alias('biking_time'),F.col('Event').alias('event_name1'))

driving_df = date_interval\
                .join(df.filter(df.Event == 'Driving'),date_interval.date_interval == df.date,'left')\
                .withColumn('Event', F.coalesce(F.col('Event'), F.lit('Driving')))\
                .select('date_interval',F.col('TimeDetails').alias('driving_time'),F.col('Event').alias('event_name2'))

result = biking_df\
    .join(driving_df, 'date_interval')\
    .withColumn('durations_inMins',(F.unix_timestamp("driving_time") - F.unix_timestamp('biking_time'))/60)\
    .select('biking_time','event_name1','driving_time','event_name2','durations_inMins')


result.show()

The output:

+-------------------+-----------+-------------------+-----------+----------------+
|        biking_time|event_name1|       driving_time|event_name2|durations_inMins|
+-------------------+-----------+-------------------+-----------+----------------+
|2018-03-01 05:12:00|     Biking|2018-03-01 08:12:00|    Driving|           180.0|
|2018-03-02 09:12:00|     Biking|2018-03-02 12:12:00|    Driving|           180.0|
|2018-03-03 09:10:00|     Biking|               null|    Driving|            null|
+-------------------+-----------+-------------------+-----------+----------------+
Luiz Viola
  • 2,143
  • 1
  • 11
  • 30
  • thank you! On the other hand, "if Biking is started and not finished by Driving. If that's the case, the window function used would need to handle an imputation." how can we implement in this case. Thank you again. –  Feb 25 '22 at 01:40
  • Then, if the corresponding driving is not present, what would define the duration? what would be the rule is biking starts and it's not finished by driving? – Luiz Viola Feb 25 '22 at 07:12
  • we might just need to ignore in this case, if there is no matching driving event. So there will not be a duration. or can we just list out the biking started but there is no corresponding driving event. –  Feb 25 '22 at 07:26
  • OK, and what would be the boundary for each calculation? The same day? So the possibilities are: biking is started on day D. Calculation is done when one of the conditions happen: driving is found in day D or Day D ends and there is no event 'driving'. Is that correct? – Luiz Viola Feb 25 '22 at 07:54
  • yes they should happen within the same day –  Feb 25 '22 at 09:24
  • I updated with another approach also covering that – Luiz Viola Feb 25 '22 at 09:56
  • nice! thank you so much. Meanwhile, if you could take a look at my other two questions related to the problem. https://stackoverflow.com/questions/71261876/how-to-use-pyspark-to-dealing-with-below-data and https://stackoverflow.com/questions/71263972/how-to-use-pyspark-to-handle-get-below-durations really appreciated! –  Feb 25 '22 at 10:17
  • ok! Please mark as an answer and upvote! – Luiz Viola Feb 25 '22 at 10:24
  • thank you I am looking into your solutions and will upvote for sure!!!! thank you again.!!!!!!! –  Feb 26 '22 at 00:59