19

I have a DF in which I have bookingDt and arrivalDt columns. I need to find all the dates between these two dates.

Sample code:

df = spark.sparkContext.parallelize(
            [Row(vyge_id=1000, bookingDt='2018-01-01', arrivalDt='2018-01-05')]).toDF()
diffDaysDF = df.withColumn("diffDays", datediff('arrivalDt', 'bookingDt'))
diffDaysDF.show()

code output:

+----------+----------+-------+--------+
| arrivalDt| bookingDt|vyge_id|diffDays|
+----------+----------+-------+--------+
|2018-01-05|2018-01-01|   1000|       4|
+----------+----------+-------+--------+

What I tried was finding the number of days between two dates and calculate all the dates using timedelta function and explode it.

dateList = [str(bookingDt + timedelta(i)) for i in range(diffDays)]

Expected output:

Basically, I need to build a DF with a record for each date in between bookingDt and arrivalDt, inclusive.

+----------+----------+-------+----------+
| arrivalDt| bookingDt|vyge_id|txnDt     |
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-01|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-02|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-03|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-04|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-05|
+----------+----------+-------+----------+
pault
  • 41,343
  • 15
  • 107
  • 149
Shankar
  • 8,529
  • 26
  • 90
  • 159

4 Answers4

28

For Spark 2.4+ sequence can be used to create an array containg all dates between bookingDt and arrivalDt. This array can then be exploded.

from pyspark.sql import functions as F

df = df \
  .withColumn('bookingDt', F.col('bookingDt').cast('date')) \
  .withColumn('arrivalDt', F.col('arrivalDt').cast('date'))

df.withColumn('txnDt', F.explode(F.expr('sequence(bookingDt, arrivalDt, interval 1 day)')))\
  .show()

Output:

+-------+----------+----------+----------+
|vyge_id| bookingDt| arrivalDt|     txnDt|
+-------+----------+----------+----------+
|   1000|2018-01-01|2018-01-05|2018-01-01|
|   1000|2018-01-01|2018-01-05|2018-01-02|
|   1000|2018-01-01|2018-01-05|2018-01-03|
|   1000|2018-01-01|2018-01-05|2018-01-04|
|   1000|2018-01-01|2018-01-05|2018-01-05|
+-------+----------+----------+----------+
werner
  • 13,518
  • 6
  • 30
  • 45
19

As long as you're using Spark version 2.1 or higher, you can exploit the fact that we can use column values as arguments when using pyspark.sql.functions.expr():

Code:

import pyspark.sql.functions as f

diffDaysDF.withColumn("repeat", f.expr("split(repeat(',', diffDays), ',')"))\
    .select("*", f.posexplode("repeat").alias("txnDt", "val"))\
    .drop("repeat", "val", "diffDays")\
    .withColumn("txnDt", f.expr("date_add(bookingDt, txnDt)"))\
    .show()
#+----------+----------+-------+----------+
#| arrivalDt| bookingDt|vyge_id|     txnDt|
#+----------+----------+-------+----------+
#|2018-01-05|2018-01-01|   1000|2018-01-01|
#|2018-01-05|2018-01-01|   1000|2018-01-02|
#|2018-01-05|2018-01-01|   1000|2018-01-03|
#|2018-01-05|2018-01-01|   1000|2018-01-04|
#|2018-01-05|2018-01-01|   1000|2018-01-05|
#+----------+----------+-------+----------+
pault
  • 41,343
  • 15
  • 107
  • 149
  • this is exactly what i want.. but is it possible to improve this. seems quite complex.. – Shankar Aug 08 '18 at 15:42
  • One way may be to create one dataframe of dates to join with like @Volodymyr suggested using this method. Essentially select the min booking date and the max arrival date, compute the difference in days, and create one dataframe with all dates inbetween. – pault Aug 08 '18 at 16:47
11

Well, you can do following.

Create a dataframe with dates only:

dates_df # with all days between first bookingDt and last arrivalDt

and then join those df with between condition:

df.join(dates_df, 
  on=col('dates_df.dates').between(col('df.bookindDt'), col('dt.arrivalDt'))
.select('df.*', 'dates_df.dates')

It might work even faster then solution with explode, however you need to figure out what is start and end date for this df. 10 years df will have just 3650 records not that many to worry about.

vvg
  • 6,325
  • 19
  • 36
2

As @vvg suggested:

# I assume, bookindDt has dates range including arrivalDt, 
# otherwise you have to find intersection of unique dates of bookindDt and arrivalDt

dates_df = df.select('bookindDt').distinct()
dates_df = dates_df.withColumnRenamed('bookindDt', 'day_of_listing')

listing_days_df = df.join(dates_df, on=dates_df.day_of_listing.between(df.bookindDt, df.arrivalDt))

Output:

+----------+----------+-------+-------------------+
| arrivalDt| bookingDt|vyge_id|day_of_listing     |
+----------+----------+-------+-------------------+
|2018-01-05|2018-01-01|   1000|2018-01-01         |
+----------+----------+-------+-------------------+
|2018-01-05|2018-01-01|   1000|2018-01-02         |
+----------+----------+-------+-------------------+
|2018-01-05|2018-01-01|   1000|2018-01-03         |
+----------+----------+-------+-------------------+
|2018-01-05|2018-01-01|   1000|2018-01-04         |
+----------+----------+-------+-------------------+
|2018-01-05|2018-01-01|   1000|2018-01-05         |
+----------+----------+-------+-------------------+
Artem Zaika
  • 1,130
  • 13
  • 13