0

I want to create a monthly lag in order to calculate monthly returns.

This is my dataframe:

REG_DT_YYYYMM TYPE_CD production
202005 FC 412316860416
202005 LG 420906795008
202005 LK 429496729600
202006 FC 438086664192
202006 LG 446676598784
202006 LK 455266533376
202007 FC 463856467968
202007 LG 472446402560
202007 LK 481036337152
202008 FC 489626271744
202008 LG 498216206336
202008 LK 506806140928
202009 FC 515396075520
202009 LG 523986010112
202009 LK 532575944704
202010 FC 541165879296
202010 LG 549755813888
202010 LK 558345748480
202010 LT 566935683072
202011 FC 575525617664
202011 LG 584115552256
202011 LK 592705486848
202011 LT 601295421440
202012 FC 609885356032
202012 LG 618475290624
202012 LK 627065225216
202012 LT 635655159808
202101 FC 644245094400
202101 LG 652835028992
202101 LK 661424963584
202101 LT 670014898176

I now want to create a lag so that I can compare monthly production values. My problem lies in the fact that we have an extra row LT from 202010 and later. Also, in the future there could be even more rows.

mck
  • 40,932
  • 13
  • 35
  • 50
wokter
  • 226
  • 3
  • 14
  • which values do you want to compare? – mck Feb 22 '21 at 14:54
  • The production numbers, sorry for not specifying this. – wokter Feb 22 '21 at 15:05
  • for each type_cd? – mck Feb 22 '21 at 15:05
  • Yes, that is correct. – wokter Feb 22 '21 at 15:09
  • Does this answer your question? [Applying a Window function to calculate differences in pySpark](https://stackoverflow.com/questions/36725353/applying-a-window-function-to-calculate-differences-in-pyspark) – mck Feb 22 '21 at 15:11
  • I dont think so, as that examples works with a fixed lag. It only looks to the production column, while it also has to look to the 2 other columns. – wokter Feb 22 '21 at 15:15
  • 1
    How about `df.withColumn('lag_production', F.lag('production').over(Window.partitionBy('TYPE_CD').orderBy('REG_DT_YYYYMM')))` – mck Feb 22 '21 at 15:16

1 Answers1

1

You can get the previous row for each type by setting a partition in the lag window:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'lag_production', 
    F.lag('production').over(
        Window.partitionBy('TYPE_CD')
              .orderBy('REG_DT_YYYYMM')
    )
)
mck
  • 40,932
  • 13
  • 35
  • 50