I have a dataframe which looks like this
Account | Contract Number | Reading Start Date | City | Value |
---|---|---|---|---|
123 | ABC123 | 19FEB2021:00:00:00.000000 | GREEN | 60115 |
123 | ABC123 | 20FEB2021:00:00:00.000000 | GREEN | 24621 |
123 | ABC123 | 22FEB2021:00:00:00.000000 | GREEN | 68816 |
567 | XYZ456 | 20FEB2021:00:00:00.000000 | GREEN | 71542 |
567 | XYZ456 | 21FEB2021:00:00:00.000000 | GREEN | 60115 |
567 | XYZ456 | 22FEB2021:00:00:00.000000 | GREEN | 24621 |
if you see the dates are missing in-between for contract number "ABC123" so I want to fill in the date in between and keep the value field null. Also fill forward the dates for all the contracts till 2 days before today. My desired output dataframe is like this (please note the first dates for account numbers are different)
Account | Contract Number | Reading Start Date | City | Value |
---|---|---|---|---|
123 | ABC123 | 19FEB2021:00:00:00.000000 | GREEN | 60115 |
123 | ABC123 | 20FEB2021:00:00:00.000000 | GREEN | 24621 |
123 | ABC123 | 21FEB2021:00:00:00.000000 | GREEN | Null |
123 | ABC123 | 22FEB2021:00:00:00.000000 | GREEN | 68816 |
123 | ABC123 | 23FEB2021:00:00:00.000000 | GREEN | Null |
567 | XYZ456 | 20FEB2021:00:00:00.000000 | GREEN | 71542 |
567 | XYZ456 | 21FEB2021:00:00:00.000000 | GREEN | 60115 |
567 | XYZ456 | 22FEB2021:00:00:00.000000 | GREEN | 24621 |
567 | XYZ456 | 23FEB2021:00:00:00.000000 | GREEN | Null |
I tried to do this in pandas with ffill and was succesfull. However I want to be able to do this in PYSPARK for distributed computing but I dont know how to do so. I tried various codes available on stackover flow but it does not work.
My PANDAS code is as per below
df = pd.DataFrame(pandas_df,columns=['READING_START_DATE','CONTRACT_NUMBER'])
df.head()
from datetime import date
def expand_dates(ser):
return pd.DataFrame({'READING_START_DATE': pd.date_range(ser['READING_START_DATE'].min(), date.today(), freq='D')})
newdf = df.groupby(['CONTRACT_NUMBER']).apply(expand_dates).reset_index()\
.merge(df, how='left')[['CONTRACT_NUMBER', 'READING_START_DATE']].ffill()
dfmerge = pd.merge(newdf,pandas_df[['READING_START_DATE','CONTRACT_NUMBER','TOTAL_KWH']],on=('READING_START_DATE','CONTRACT_NUMBER'), how='left')