4

How can I calculate number of days between two dates ignoring weekends, using pyspark?

This is the exact same question as here, only I need to do this with pyspark.

I tried using a udf:

import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def dateDiffWeekdays(end, start):
    return int(np.busday_count(start, end)) # numpy returns an `numpy.int64` type.

When using this udf, I get an error message:

ModuleNotFoundError: No module named 'numpy'

Does anyone know how to solve this? Or better yet, to solve this without a udf in native pyspark?

EDIT: I have numpy installed. Outside of a udf it works just fine.

Willem
  • 976
  • 9
  • 24
  • `numpy` is a python dependency, and it is not available by default with spark. in your local setup you you can do `pip install numpy`. On a cluster setup, it is a bit more complicated, and you will also need to have numpy available on the cluster. – SQL.injection Sep 28 '20 at 11:40
  • 1
    I have `numpy` installed in Python. I've used it countless times. But if I understand correctly, you're saying it also needs to be installed on the spark cluster? – Willem Sep 28 '20 at 13:01
  • yes, all the cluster machines need to have `numpy` – SQL.injection Sep 28 '20 at 15:18
  • are you experiencing this error local or in the cluster? – SQL.injection Sep 28 '20 at 15:18

2 Answers2

7

For Spark 2.4+ it is possible to get the number of days without the usage of numpy or udf. Using the built-in SQL functions is sufficient.

Following roughly this answer we can

  1. create an array of dates containing all days between begin and end by using sequence
  2. transform the single days into a struct holding the day and its day of week value
  3. filter out the days that are Saturdays and Sundays
  4. get the size of the remaining array
#create an array containing all days between begin and end
(df.withColumn('days', F.expr('sequence(begin, end, interval 1 day)'))
#keep only days where day of week (dow) <= 5 (Friday)
.withColumn('weekdays', F.expr('filter(transform(days, day->(day, extract(dow_iso from day))), day -> day.col2 <=5).day')) 
#count how many days are left
.withColumn('no_of_weekdays', F.expr('size(weekdays)')) 
#drop the intermediate columns
.select('begin', 'end', 'no_of_weekdays') 
.show(truncate=False))

Output:

+----------+----------+--------------+
|begin     |end       |no_of_weekdays|
+----------+----------+--------------+
|2020-09-19|2020-09-20|0             |
|2020-09-21|2020-09-24|4             |
|2020-09-21|2020-09-25|5             |
|2020-09-21|2020-09-26|5             |
|2020-09-21|2020-10-02|10            |
|2020-09-19|2020-10-03|10            |
+----------+----------+--------------+

For Spark <= 2.3 you would have to use an udf. If numpy is a problem a solution inspired by this answer can be used.

from datetime import timedelta
@F.udf
def dateDiffWeekdays(end, start):
    daygenerator = (start + timedelta(x) for x in range((end - start).days + 1))
    return sum(1 for day in daygenerator if day.isoweekday() <= 5)

df.withColumn("no_of_weekdays", dateDiffWeekdays(df.end, df.begin)).show()
werner
  • 13,518
  • 6
  • 30
  • 45
  • 2
    It is worth adding the first part of this answer to [get all the dates between two dates in Spark DataFrame](https://stackoverflow.com/questions/51745007/get-all-the-dates-between-two-dates-in-spark-dataframe). Also note that this only works for spark 2.4+ – pault Sep 28 '20 at 18:46
  • This is actually an important comment, as I am currently working with version 2.3 ... Is there any way to get something similar in spark 2.3? – Willem Sep 30 '20 at 08:00
  • 1
    @Willem I am afraid no. Most of the SQL functions are only available starting from version 2.4. I have added a numpy-free udf version to the answer that should work with 2.3 – werner Sep 30 '20 at 18:15
  • what exactly is the "day -> day.col2" do in this line ?: .withColumn('weekdays', F.expr('filter(transform(days, day->(day, extract(dow_iso from day))), day -> day.col2 <=5).day')) – Hansanho Oct 07 '21 at 14:57
  • 1
    @Hansanho this is the second parameter of the [filter](https://spark.apache.org/docs/3.1.1/api/sql/index.html#filter) function. The first parameter of the function is the result of `transform(...)`. `transform` returns an array of structs. Each struct constists of two columns: `col1` (which is the original day) and `col2` (which is the day of week). The filter function `day -> day.col2 <=5` now keeps only the days from Monday to Friday. I admit that the naming of the variable `day` is here a bit misleading, as this variable is not related to the variable `day` inside of `transform(...)` – werner Oct 07 '21 at 18:03
  • The issue I am finding with this is that the `sequence` function in `step 1` creates an array which counts both the `begin` & `end` as part of the array, what is the best way to make this exclusive on say the `begin` date say so that it doesn't include it? – Curious Dec 02 '21 at 14:39
  • Could we have something like `(df.withColumn('days', F.expr('sequence(date_add(begin, 1), end, interval 1 day)'))` Not sure if this makes sense but I don't want the array to contain the starting date since to me this would give the wrong interval. – Curious Dec 02 '21 at 14:54
  • 1
    @nishcs yes, your approach to skip the first day works for me. Another option would be to insert [slice](https://spark.apache.org/docs/3.1.1/api/sql/index.html#slice) after the first step: `.withColumn('days', F.expr('slice(days, 2, size(days))'))`. This removes the first element of the array created in step1 – werner Dec 04 '21 at 12:55
  • Cool & say I have a list of other dates (perhaps holidays etc) what is the best way of removing those dates from the selection? – Curious Dec 09 '21 at 09:40
  • Also I am coming across an error of `Illegal Sequence Boundary` when using this in a particular process, which (I think), stems from the fact that some end dates are `undefined/ null`. Has anyone else had this issue when using this method and or knows what could cause it and perhaps how to resolve? Not sure what the best way to handle this is? Could we add something to this which outputs say a particular value in this scenario? – Curious Dec 09 '21 at 16:09
  • @nishcs the solution for removing other dates would depend on how many days there are to be removed. If there aren't to many, you could filter them out in step 3. If there are many dates you could put them into a second dataframe and then use anti join. But that's probably worth a new question. For your second comment maybe you could provide some data to reproduce the behavoiur (maybe also in a new question)? – werner Dec 09 '21 at 19:59
  • @werner Just realised the issue, its the fact that the end date is chronologically before the start date (the start and end dates are dataframe columns). What's the best way to deal with this? Would it make sense to create a intermediate column `df.withColumn('check', F.expr('case when start > end then 1 else 0 end'))` and then run the calculations if this is true (not sure if this works/ makes sense) `df.withColumn('days', F.expr('case when check == 0 then sequence(begin, end, interval 1 day else -10)'))` Want to make sure it runs for cases when `end > start ` else default to some value. – Curious Dec 13 '21 at 19:29
0

following @werner approach, I got the result but there were some discrepancies with the usage of buit-in DOW_ISO function.

"DAYOFWEEK_ISO",("DOW_ISO") - ISO 8601 based day of the week for datetime as Monday(1) to Sunday(7) (ps: ref)

Using weekday(date) - Returns the day of the week for date/timestamp (0 = Monday, 1 = Tuesday, ..., 6 = Sunday). This met the requirment.

F.expr('filter(transform(days, day->(day, weekday(day))), day -> day.col2 <= 4).day')
  • Please provide additional details in your answer. As it's currently written, it's hard to understand your solution. – Community Aug 27 '21 at 11:06