0

Is there a way to natively select the last day of each quarter in PySpark? For example, in a df containing two columns - yyyy_mm_dd and sum. How could I return sum for the last day of each quarter? For the current / in progress quarter it would be good to show the max date instead.

I looked at this solution Get First Date and Last Date of Current Quarter in Python? and it works, however I was wondering if there is a solution using PySpark syntax rather than a udf?

mck
  • 40,932
  • 13
  • 35
  • 50
stackq
  • 491
  • 2
  • 15
  • does this help? https://stackoverflow.com/a/63821792/14165730 – mck Feb 08 '21 at 12:33
  • It helps a bit but seems to take the start date of the previous quarter whereas I need max date in current quarter. Similar to `.where(F.col('yyyy_mm_dd') == last day of q)` – stackq Feb 08 '21 at 12:39

2 Answers2

3

Using a similar approach as in this answer:

df2 = df.withColumn(
    'last_day',
    F.expr("""
        to_date(
            date_trunc('quarter', to_date(input_date) + interval 3 months)
        ) - interval 1 day
    """)
)

df2.show()
+----------+----------+
|input_date|  last_day|
+----------+----------+
|2020-01-21|2020-03-31|
|2020-02-06|2020-03-31|
|2020-04-15|2020-06-30|
|2020-07-10|2020-09-30|
|2020-10-20|2020-12-31|
|2021-02-04|2021-03-31|
+----------+----------+

Then you can filter the rows where input_date == last_day


Edit: I might have misunderstood the question. You can try this approach using group by on the quarter and selecting the last row in each quarter:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'rn', 
    F.row_number().over(Window.partitionBy(F.year('input_date'), F.quarter('input_date')).orderBy(F.desc('input_date')))
)

df2.show()
+----------+---+
|input_date| rn|
+----------+---+
|2021-02-04|  1|
|2020-10-20|  1|
|2020-07-10|  1|
|2020-02-06|  1|
|2020-01-21|  2|
|2020-04-15|  1|
+----------+---+

And filter the rows with rn = 1, which should be the last day in each quarter.

mck
  • 40,932
  • 13
  • 35
  • 50
  • After filtering on `input_date == last_day` I think there would be no rows for incomplete quarters. though i still need to run it – stackq Feb 08 '21 at 12:44
  • I'm using Spark 2.2.0 so date_trunc() is actually unavailable unfortunately – stackq Feb 08 '21 at 12:50
  • @stackq see the edited answer? maybe it will do what you wanted – mck Feb 08 '21 at 12:51
  • Run into an illegal character `q` error. But yes, maybe the original Q wasn't too clear. I'm essentially trying to do the equivalent of `select * from t where yyyy_mm_dd in ('2020-01-31', '2020-06-30', 2020-09-30', '2020-12-31', '2021-03-31' ....)` but without hardcoding dates – stackq Feb 08 '21 at 13:16
  • modified to be hopefully compatible with spark 2.2 ... I used `quarter` and `year`. please try again? @stackq – mck Feb 08 '21 at 13:19
1

You want to deduplicate and keep the max date that exists in each quarter. You can use trunc with Window function :

from pyspark.sql import Window
from pyspark.sql import functions as F

w = Window.partitionBy(F.trunc('date_col', 'quarter')).orderBy(F.col('date_col').desc())

df1 = df.withColumn("rn", F.row_number().over(w)) \
        .filter("rn = 1") \
        .drop("rn")
blackbishop
  • 30,945
  • 11
  • 55
  • 76
  • trunc doesn't support quarter? – mck Feb 08 '21 at 14:43
  • @mck yes, it supports! why? – blackbishop Feb 08 '21 at 14:52
  • the docs you linked seem to suggest only year/month, and I tried quarter and I got null... not sure if it's a spark version problem though – mck Feb 08 '21 at 14:53
  • 1
    @mck you're right! the function exists since 1.5 but support for quarter was added in Spark 3... [spark 3](https://spark.apache.org/docs/latest/api/sql/#trunc) / [spark 2.4](https://spark.apache.org/docs/2.4.0/api/sql/#trunc). was looking at the wrong docs. – blackbishop Feb 08 '21 at 15:02