I am working on a personal PySpark project for learning purposes and I have a peculiar problem.
I have a dataframe (df) with N columns, in which I want to subtract each column out of the next (e.g. col1 - col2
, col2 - col3
, ..., col(N+1) - colN
) and save the resulting differences column in another dataframe.
I generate this df by parsing a JSON, saving to a pandas dataframe (schema: dates column, columns for each item) transposing the columns to rows (to have a single Items column and columns for each date) and then transforming it in a spark df. I do this because it seems that row by row operations in Spark are fairly difficult to implement.
I move the first column (the Items
column) of the df to a new dataframe (ndf) so I am left with only the following schema (header is comprised of dates and the data is only integers):
Date1 | Date2 | Date3 | ... | DateN |
---|---|---|---|---|
104 | 98 | 98 | ... | 0 |
223 | 135 | 80 | ... | 0 |
143 | 122 | 114 | ... | 0 |
91 | 79 | 73 | ... | 0 |
I want to subtract the ints of column Date2 out of the ints from column Date1 (e.g. df.Date1 - df.Date2
) and the resulting column of values (with the header of the larger column - Date1
) to be saved/appended in the already existing ndf dataframe (the one in which I moved the column earlier). Then move on to subtract column Date2 and column Date3 (df.Date2 - df.Date3
), and so on until column Date(N+1) - DateN
, then stop.
The new Dataframe (ndf), created earlier from the Items column, would look like this:
Items | Date1 | Date2 | ... |
---|---|---|---|
Item1 | 6 | 0 | ... |
Item2 | 88 | 55 | ... |
Item3 | 21 | 8 | ... |
item4 | 12 | 6 | ... |
Practically, I want to see the number with which each item has increased from one date to the next.
I was thinking that doing it in a for loop. Something like:
# get list of column headers
dates = df.columns
# for index and header in list
for idx, date in enumerate(dates):
if idx < len(dates)-1:
# calculate df columns subtraction and add differences column to ndf
df = df.withColumn(f'diff-{date}', F.when((df[date] - df[dates[idx+1]]) < 0, 0)
.otherwise(df[date] - df[dates[idx+1]]))
ndf = ndf.join(df.select(f'diff-{date}'), how='full')
But this is very slow and I have a feeling that a for loop does not really take into account Spark's advantages and it might be way slower than using map/lambda.