2

I'm using pyspark to generate a dataframe where I need to update 'amt' column with previous row's 'amt' value only when amt = 0.

For example, below is my dataframe

+---+-----+
| id|amt  |
+---+-----+
|  1|    5|
|  2|    0|
|  3|    0|
|  4|    6|
|  5|    0|
|  6|    3|
+---+-----+

Now, I want the following DF to be created. whenever amt = 0, modi_amt col will contain previous row's non zero value, else no change.

+---+-----+----------+
| id|amt  |modi_amt  |
+---+-----+----------+
|  1|    5|         5|
|  2|    0|         5|
|  3|    0|         5|
|  4|    6|         6|
|  5|    0|         6|
|  6|    3|         3|
+---+-----+----------+

I'm able to get the previous rows value but need help for the rows where multiple 0 amt appears (example, id = 2,3)

code I'm using :

from pyspark.sql.window import Window
my_window = Window.partitionBy().orderBy("id")
DF= DF.withColumn("prev_amt", F.lag(DF.amt).over(my_window))
DF= DF.withColumn("modi_amt",when(DF.amt== 0,DF.prev_amt).otherwise(DF.amt)).drop('prev_amt')

I'm getting the below DF

+---+-----+----------+
| id|amt  |modi_amt  |
+---+-----+----------+
|  1|    5|         5|
|  2|    0|         5|
|  3|    0|         0|
|  4|    6|         6|
|  5|    0|         6|
|  6|    3|         3|
+---+-----+----------+

basically id 3 also should have modi_amt = 5

ZygD
  • 22,092
  • 39
  • 79
  • 102
Nabarun Chakraborti
  • 219
  • 1
  • 5
  • 14
  • By your own logic id 3 should be zero because the original amt there is zero. – Prakash S Mar 05 '19 at 11:44
  • logic is replace any amt = 0 with previous non-zero value. here id#1 amt = 5 and both id#2 and id#3 are having amt = 0, hence the modi_amt for these 2 ids will be 5 – Nabarun Chakraborti Mar 05 '19 at 11:55
  • Possible duplicate of [Mapping timeseries data to previous datapoints and averages](https://stackoverflow.com/questions/32526328/mapping-timeseries-data-to-previous-datapoints-and-averages) – abiratsis Mar 05 '19 at 14:00

1 Answers1

6

I've used the below approach to get the output and it's working fine,

from pyspark.sql.window import Window
my_window = Window.partitionBy().orderBy("id")
# this will hold the previous col value
DF= DF.withColumn("prev_amt", F.lag(DF.amt).over(my_window))

# this will replace the amt 0 with previous column value, but not consecutive rows having 0 amt.  
DF = DF.withColumn("amt_adjusted",when(DF.prev_amt == 0,DF.prev_OffSet).otherwise(DF.amt))

# define null for the rows where both amt and amt_adjusted are having 0 (logic for consecutive rows having 0 amt)
DF = DF.withColumn('zeroNonZero', when((DF.amt== 0)&(DF.amt_adjusted == 0),lit(None)).otherwise(DF.amt_adjusted))

# replace all null values with previous Non zero amt row value
DF= DF.withColumn('modi_amt',last("zeroNonZero", ignorenulls= True).over(Window.orderBy("id").rowsBetween(Window.unboundedPreceding,0)))

Is there any other better approach?

Nabarun Chakraborti
  • 219
  • 1
  • 5
  • 14
  • I think you could just replace the 0s in amt column with nulls straight away and then use last() as you have done, I don't see the use in getting the prev_amt value first – fenrisulfr Mar 05 '19 at 15:25