0

This is an extension to an earlier question I raised here How to calculate difference between dates excluding weekends in PySpark 2.2.0. My spark dataframe looks like below and can be generated with the accompanying code:

df = spark.createDataFrame([(1, "John Doe", "2020-11-30",1),(2, "John Doe", "2020-11-27",2),(4, "John Doe", "2020-12-01",0),(5, "John Doe", "2020-10-02",1),\
                          (6, "John Doe", "2020-12-03",1),(7, "John Doe", "2020-12-04",1)],
                            ("id", "name", "date","count"))

+---+--------+----------+-----+
| id|    name|      date|count|
+---+--------+----------+-----+
|  5|John Doe|2020-10-02|    1|
|  2|John Doe|2020-11-27|    2|
|  1|John Doe|2020-11-30|    1|
|  4|John Doe|2020-12-01|    0|
|  6|John Doe|2020-12-03|    1|
|  7|John Doe|2020-12-04|    1|
+---+--------+----------+-----+

I am trying to calculate cumulative sums over a period of 2,3,4,5 & 30 days. Below is a sample code for 2 days and the resulting table.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
days = lambda i: i * 86400
windowval_2 = Window.partitionBy("name").orderBy(F.col("date").cast("timestamp").cast("long")).rangeBetween(days(-1), days(0))
windowval_3 = Window.partitionBy("name").orderBy(F.col("date").cast("timestamp").cast("long")).rangeBetween(days(-2), days(0))
windowval_4 = Window.partitionBy("name").orderBy(F.col("date").cast("timestamp").cast("long")).rangeBetween(days(-3), days(0))
df = df.withColumn("cum_sum_2d_temp",F.sum("count").over(windowval_2))


+---+--------+----------+-----+---------------+
| id|    name|      date|count|cum_sum_2d_temp|
+---+--------+----------+-----+---------------+
|  5|John Doe|2020-10-02|    1|              1|
|  2|John Doe|2020-11-27|    2|              2|
|  1|John Doe|2020-11-30|    1|              1|
|  4|John Doe|2020-12-01|    0|              1|
|  6|John Doe|2020-12-03|    1|              1|
|  7|John Doe|2020-12-04|    1|              2|
+---+--------+----------+-----+---------------+

What I am trying to do is when calculating the date range, the calculation excludes weekends i.e. in my table 2020-11-27 is a Friday and 2020-11-30 is Monday. The diff between them is 1 if we exclude Sat & Sun. I want the cumulative sum of 2020-11-27 and 2020-11-30 values in front of 2020-11-30 in the 'cum_sum_2d_temp' column which should be 3. I am looking to combine the solution to my earlier question to the date range.

ZygD
  • 22,092
  • 39
  • 79
  • 102
vagautam
  • 81
  • 11
  • Have you tried using `filter` with a custom predicate function to check for your specific needs? – OakenDuck Dec 04 '20 at 23:06
  • @OakenDuck I was using `df.withColumn("sum_2d",F.when(workdaysUDF(F.col("date"),F.lag(F.col("date"),1).over(windowVal_gen))==1,\ F.col("sum_2d_temp")).otherwise(F.col("count")))` to check datediff between two dates but the drawback of this approach is that as I do not have daily data for every id, I do not know how many consecutive dates to check for each cum sum. As I said in OP, I am looking to sum over 2,3,4 & 5 days, so datediff between two consecutive rows can be anything, like 3, 4 etc. – vagautam Dec 04 '20 at 23:35
  • @OakenDuck Sorry, I missed adding the UDF for businessdays `workdaysUDF = F.udf(lambda date1, date2: int(np.busday_count(date2, date1)) if (date1 is not None and date2 is not None) else None, IntegerType())` – vagautam Dec 04 '20 at 23:35

1 Answers1

1

Calculate the date_dif relative to the earliest date:

import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

df = spark.createDataFrame([(1, "John Doe", "2020-11-30",1),(2, "John Doe", "2020-11-27",2),(4, "John Doe", "2020-12-01",0),(5, "John Doe", "2020-10-02",1),\
                          (6, "John Doe", "2020-12-03",1),(7, "John Doe", "2020-12-04",1)],
                            ("id", "name", "date","count"))

workdaysUDF = F.udf(lambda date1, date2: int(np.busday_count(date2, date1)) if (date1 is not None and date2 is not None) else None, IntegerType())
df = df.withColumn("date_dif", workdaysUDF(F.col('date'), F.first(F.col('date')).over(Window.partitionBy('name').orderBy('date'))))

windowval = lambda days: Window.partitionBy('name').orderBy('date_dif').rangeBetween(-days, 0) 
df = df.withColumn("cum_sum",F.sum("count").over(windowval(2)))
df.show()

+---+--------+----------+-----+--------+-------+
| id|    name|      date|count|date_dif|cum_sum|
+---+--------+----------+-----+--------+-------+
|  5|John Doe|2020-10-02|    1|       0|      1|
|  2|John Doe|2020-11-27|    2|      40|      2|
|  1|John Doe|2020-11-30|    1|      41|      3|
|  4|John Doe|2020-12-01|    0|      42|      3|
|  6|John Doe|2020-12-03|    1|      44|      1|
|  7|John Doe|2020-12-04|    1|      45|      2|
+---+--------+----------+-----+--------+-------+
mck
  • 40,932
  • 13
  • 35
  • 50
  • Thank you, this works perfectly for me.However, could you please elaborate on what the windowval lambda function is doing. – vagautam Dec 06 '20 at 04:54
  • 1
    @vagautam it just makes the number of days in the window to be dynamic – mck Dec 06 '20 at 07:01