0

I need to calculate col(current_month) / previous(month) partitioned by id the issue is the data is not continuous so I can't do a lag(price) partition by month as the previous row to 2018-04-01 is 2018-02-01.

Instead of my below example_1 using join or example_2 using nested when calls I was hoping for a more elegant solution, something like lag('price').over(partitionBy('id').rangeBetween('1 month',0) <- pseudo code is this possible? or is there an 3rd alternative I haven't thought of?

Input:
+----------+-----+--------+-------+
|     month|price|quantity|     id|
+----------+-----+--------+-------+
|2018-01-01| 3.96|    53.0|abc##10|
|2018-02-01| 3.96|    49.0|abc##10|
|2018-04-01| 3.81|   150.0|abc##10|
|2018-05-01| 3.81|    14.0|abc##10|
|2018-06-01| 3.73|    13.0|abc##10|
|2018-08-01| 2.97|    27.0|abc##10|
|2018-09-01| 2.97|    22.0|abc##10|
|2018-10-01| 2.97|    10.0|abc##10|
|2018-11-01| 2.97|    35.0|abc##10|
|2018-12-01| 2.97|    99.0|abc##10|
+----------+-----+--------+-------+

output:
# I need the previous month's column to calculate col(current_month) / previous(month) partitioned by id
+----------+-----+--------+-------+----------------------+
|     month|price|quantity|     id| previous_months_price|
+----------+-----+--------+-------+----------------------+
|2018-01-01| 3.96|    53.0|abc##10|                  null|
|2018-02-01| 3.96|    49.0|abc##10|                  53.0|
|2018-04-01| 3.81|   150.0|abc##10|                  null|
|2018-05-01| 3.81|    14.0|abc##10|                 150.0|
|2018-06-01| 3.73|    13.0|abc##10|                  14.0|
|2018-08-01| 2.97|    27.0|abc##10|                  null|
|2018-09-01| 2.97|    22.0|abc##10|                  27.0|
|2018-10-01| 2.97|    10.0|abc##10|                  22.0|
|2018-11-01| 2.97|    35.0|abc##10|                  10.0|
|2018-12-01| 2.97|    99.0|abc##10|                  35.0|
+----------+-----+--------+-------+----------------------+
example_1
lj = df_t.select(
            'id',
            F.add_months('month',1).alias('month'),
            F.col('price').alias('previous_months_price'),
)

df_t.join(lj, ['id','month'], how='left')
example_2
nxt_dt = F.add_months('month',1)
df_t.withColumn(
    'previous_month_price',
    F.when(
        nxt_dt == F.expr('lag(month) over (partition by id order by id,month)'),
        F.expr('lag(quantity) over (partition by id order by id,month)'),
    )
)
Topde
  • 581
  • 5
  • 12

1 Answers1

2

When using the window function, also include the month of the previous row. When calculating the result column set it to null if the difference of the current and previous month is not equal 1.

df = spark.read.option("header", True).csv(...) \
    .withColumn("month", F.to_date("month"))

w = Window.partitionBy("id").orderBy("month")
df.withColumn("prev_qty", F.lag("quantity").over(w)) \
    .withColumn("prev_month", F.lag("month").over(w)) \
    .withColumn("previous_months_qty", F.expr("case when last_day(add_months(prev_month,1)) = last_day(month) then prev_qty else null end")) \
    .drop("prev_qty", "prev_month") \
    .show()

prints

+----------+-----+--------+-------+-------------------+
|     month|price|quantity|     id|previous_months_qty|
+----------+-----+--------+-------+-------------------+
|2018-01-01| 3.96|    53.0|abc##10|               null|
|2018-02-01| 3.96|    49.0|abc##10|               53.0|
|2018-04-01| 3.81|   150.0|abc##10|               null|
|2018-05-01| 3.81|    14.0|abc##10|              150.0|
|2018-06-01| 3.73|    13.0|abc##10|               14.0|
|2018-08-01| 2.97|    27.0|abc##10|               null|
|2018-09-01| 2.97|    22.0|abc##10|               27.0|
|2018-10-01| 2.97|    10.0|abc##10|               22.0|
|2018-11-01| 2.97|    35.0|abc##10|               10.0|
|2018-12-01| 2.97|    99.0|abc##10|               35.0|
+----------+-----+--------+-------+-------------------+
werner
  • 13,518
  • 6
  • 30
  • 45
  • Thanks, What does the rowsBetween(-1,-1) do in this case? I can't see what difference it makes here – Topde Sep 10 '20 at 20:57
  • You are right, we don't need this part. I have removed the `rowsBetween` from the code – werner Sep 10 '20 at 21:06
  • `month(month) - month(prev_month) == 1` only works in the same year. use add_months instead – jxc Sep 10 '20 at 21:21
  • I was going to say the same but it actually does work because of the orderBy, I feel a bit uneasy about replying on order in spark imo – Topde Sep 10 '20 at 21:25
  • I've updated my answer just before your comment, I should've mentioned I already have some working code with your suggestion. I was hoping there would be a more elegant solution out there, in my mind the rangeBetwen would work but if I could lag over rangeBetween one month window but it's either impossible or I'm too stupid to – Topde Sep 10 '20 at 21:34
  • @jxc thanks for the hint, I have updated my answer. I have added another `month(...)` call around the the `add_month` in case the day is not always 1. Now there might be an issue when there are 12 months in between two rows – werner Sep 10 '20 at 21:35
  • I know I posted the question but if you look at my example_2 I don't use month() so it doesn't have that issue – Topde Sep 10 '20 at 21:37
  • @werner, I think `add_months(prev_month,1) = month` should work. add_months is the function to set the day correctly instead of just 30 days off. or `trunc()` the dates to 'Month' in case some of them are not the first day. – jxc Sep 10 '20 at 21:42
  • 2
    or `last_day(add_months(prev_month,1)) = last_day(month)` in case the days are different with the original months. – jxc Sep 10 '20 at 21:48
  • This brings me to the reason I asked this question, I'm running this on a 1b rows dataframe and these multiple nested function calls add an overhead that is felt more than on smaller dfs, I can't help but feel there's an easier way to do it perhaps with rangeBetween(1 month) – Topde Sep 10 '20 at 21:53
  • @Dee, if the months are all saved as the first day of each month, then add_months() should always keep the day to `1`, no need to take last_day() – jxc Sep 10 '20 at 22:00
  • @jxc thanks for the `last_day`. I didnt know that function – werner Sep 11 '20 at 16:46
  • @Dee I think at least with `Window` it is not possible to work with dates. If you check the code of [WindowSpec](https://github.com/apache/spark/blob/f6322d1cb149983fbcd5b90a804eeda0fe4e8a49/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala#L32) you can see that the boundaries are only based on row numbers – werner Sep 11 '20 at 16:56
  • https://stackoverflow.com/questions/33207164/spark-window-functions-rangebetween-dates it is possible – Topde Sep 11 '20 at 21:17