1

Let's assume we have a DataFrame(df) defined below in PySpark. And, how to use PySpark to get the duration between the first biking action and the last biking action within the same day. And save the results into a date framework including first_biking_timedeatails, last_biking_timedeatails, durations_bewteween_first_last, etc. Notes: there can be other actions between the first and last biking action. And, if there is only one biking action within a day, then we should not get the duration (since we will not able to do the calculation, such as date 3/3/18)

Below is the example result for the date 3/01/2018:

duration_03_01 = 13:12 (last biking time) - 5:12 (first biking time) = 8 hours

Sample df below:

timedeatils actions
3/1/18 5:12 Biking
3/1/18 6:12 Running
3/1/18 7:12 Swimming
3/1/18 8:12 Running
3/1/18 9:12 Swimming
3/1/18 10:12 Biking
3/1/18 11:12 Biking
3/1/18 12:12 Running
3/1/18 13:12 Biking
3/2/18 4:12 Biking
3/2/18 5:12 Swimming
3/2/18 6:12 Running
3/2/18 7:12 Biking
3/2/18 8:12 Running
3/3/18 4:16 Biking
3/4/18 5:13 Running
3/4/18 6:13 Biking
3/4/18 7:13 Running
3/4/18 8:13 Swimming
3/4/18 9:13 Running
3/4/18 10:13 Running
3/4/18 11:13 Biking

Some of my code

df  = spark.createDataFrame(
      [
    (3/1/2018 5:12','Biking')
    ,(3/1/2018 6:12',Running)
    ,(3/1/2018 7:12',Swimming)
    ,(3/1/2018 8:12',Running)
    ,(3/1/2018 9:12',Swimming)
    ,(3/1/2018 10:12','Biking')
    ,(3/1/2018 11:12','Biking')
    ,(3/1/2018 12:12',Running)
    ,(3/1/2018 13:12','Biking')
    ,(3/2/2018 4:12','Biking')
    ,(3/2/2018 5:12',Swimming)
    ,(3/2/2018 6:12',Running)
    ,(3/2/2018 7:12','Biking')
    ,(3/2/2018 8:12',Running)
    ,(3/3/2018 4:16','Biking')
    ,(3/4/2018 5:13','Biking')
    ,(3/4/2018 6:13',Running)
    ,(3/4/2018 7:13',Running)
    ,(3/4/2018 8:13',Swimming)
    ,(3/4/2018 9:13',Running)
    ,(3/4/2018 10:13',Running)
    ,(3/4/2018 11:13',Biking)
      ], ['TimeDetails','Actions']
    )

And sample output is below:

First_Biking_time action_1 Last_Biking_time action_2 Durations_in_Hour
1 3/1/18 5:12 Biking 3/1/18 13:12 Biking 8
2 3/2/18 4:12 Biking 3/2/18 7:12 Biking 3
3 3/4/18 6:13 Biking 3/4/18 11:13 Biking 5

Can someone please provide me with some code in PySpark? On the other hand, is there any way to solve the problem in PySpark SQL as well?

Thank you

1 Answers1

0

Your df:

df  = spark.createDataFrame(
      [
    ('3/1/2018 5:12','Biking')
    ,('3/1/2018 6:12','Running')
    ,('3/1/2018 7:12','Swimming')
    ,('3/1/2018 8:12','Running')
    ,('3/1/2018 9:12','Swimming')
    ,('3/1/2018 10:12','Biking')
    ,('3/1/2018 11:12','Biking')
    ,('3/1/2018 12:12','Running')
    ,('3/1/2018 13:12','Biking')
    ,('3/2/2018 4:12','Biking')
    ,('3/2/2018 5:12','Swimming')
    ,('3/2/2018 6:12','Running')
    ,('3/2/2018 7:12','Biking')
    ,('3/2/2018 8:12','Running')
    ,('3/3/2018 4:16','Biking')
    ,('3/4/2018 5:13','Biking')
    ,('3/4/2018 6:13','Running')
    ,('3/4/2018 7:13','Running')
    ,('3/4/2018 8:13','Swimming')
    ,('3/4/2018 9:13','Running')
    ,('3/4/2018 10:13','Running')
    ,('3/4/2018 11:13','Biking')
      ], ['TimeDetails','Actions']
    )

Using a window function. You can adapt this solution to other Actions as well:

from pyspark.sql import functions as F
from pyspark.sql import Window

df = df.\
    withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))\
    .withColumn('date', F.to_date('TimeDetails'))\

w = Window.partitionBy('Actions', 'date').orderBy("date")

generic = df\
            .withColumn('first_record', F.first(F.col('TimeDetails'), ignorenulls=True).over(w))\
            .withColumn('last_record', F.last(F.col('TimeDetails'), ignorenulls=True).over(w))\
            .withColumn('Durations_in_Hours',(F.unix_timestamp("last_record") - F.unix_timestamp('first_record'))/3600)\
            .orderBy('TimeDetails')

biking = generic\
            .filter(F.col('Actions') == 'Biking')\
            .select(F.col('first_record').alias('First_Biking_time'),
                    F.col('Actions').alias('action_1'),
                    F.col('last_record').alias('Last_Biking_time'),
                    F.col('Actions').alias('action_2'),
                    F.col('Durations_in_Hours'))\
            .dropDuplicates()\
            .filter(F.col('Durations_in_Hours') != 0)\
            .orderBy('First_Biking_time')
                

biking.show()

Output:

+-------------------+--------+-------------------+--------+------------------+
|  First_Biking_time|action_1|   Last_Biking_time|action_2|Durations_in_Hours|
+-------------------+--------+-------------------+--------+------------------+
|2018-03-01 05:12:00|  Biking|2018-03-01 13:12:00|  Biking|               8.0|
|2018-03-02 04:12:00|  Biking|2018-03-02 07:12:00|  Biking|               3.0|
|2018-03-04 05:13:00|  Biking|2018-03-04 11:13:00|  Biking|               6.0|
+-------------------+--------+-------------------+--------+------------------+
Luiz Viola
  • 2,143
  • 1
  • 11
  • 30
  • if you could take a look at the below question as well. Can I solve the problem by using pyspark data framework instead of using SQL. Please let me know, thank a lot!! link: https://stackoverflow.com/questions/71584013/how-to-use-sql-mysql-to-handle-null-values-and-certain-cases –  Mar 23 '22 at 08:38