0

I am working on a personal PySpark project for learning purposes and I have a peculiar problem.

I have a dataframe (df) with N columns, in which I want to subtract each column out of the next (e.g. col1 - col2, col2 - col3, ..., col(N+1) - colN) and save the resulting differences column in another dataframe.

I generate this df by parsing a JSON, saving to a pandas dataframe (schema: dates column, columns for each item) transposing the columns to rows (to have a single Items column and columns for each date) and then transforming it in a spark df. I do this because it seems that row by row operations in Spark are fairly difficult to implement.

I move the first column (the Items column) of the df to a new dataframe (ndf) so I am left with only the following schema (header is comprised of dates and the data is only integers):

Date1 Date2 Date3 ... DateN
104 98 98 ... 0
223 135 80 ... 0
143 122 114 ... 0
91 79 73 ... 0

I want to subtract the ints of column Date2 out of the ints from column Date1 (e.g. df.Date1 - df.Date2) and the resulting column of values (with the header of the larger column - Date1) to be saved/appended in the already existing ndf dataframe (the one in which I moved the column earlier). Then move on to subtract column Date2 and column Date3 (df.Date2 - df.Date3), and so on until column Date(N+1) - DateN, then stop.

The new Dataframe (ndf), created earlier from the Items column, would look like this:

Items Date1 Date2 ...
Item1 6 0 ...
Item2 88 55 ...
Item3 21 8 ...
item4 12 6 ...

Practically, I want to see the number with which each item has increased from one date to the next.

I was thinking that doing it in a for loop. Something like:

# get list of column headers
dates = df.columns
# for index and header in list
for idx, date in enumerate(dates):
    if idx < len(dates)-1:
        # calculate df columns subtraction and add differences column to ndf
        df = df.withColumn(f'diff-{date}', F.when((df[date] - df[dates[idx+1]]) < 0, 0)
                        .otherwise(df[date] - df[dates[idx+1]]))
        ndf = ndf.join(df.select(f'diff-{date}'), how='full')

But this is very slow and I have a feeling that a for loop does not really take into account Spark's advantages and it might be way slower than using map/lambda.

derzemel
  • 91
  • 8
  • 2
    This looks like an unnecessarily verbose approach. You can just use the dataframe without any transpose, and use window functions to get the difference from the previous row. See [this question](https://stackoverflow.com/questions/36725353/applying-a-window-function-to-calculate-differences-in-pyspark/) for an example. – mck Apr 03 '21 at 07:48
  • Thank you @mck . Your answer pointed me in a good direction and it helped me to figure it out. – derzemel Apr 05 '21 at 12:56

1 Answers1

0

I found 2 solutions:

  1. For the transposed dataframe, as I had it in my question above, a user on reddit r/dataengineering helped me with the solution:

    # list to save column subtractions
    col_defs = []
    # grab the date columns
    date_cols = df.columns[1:]
    # for index and column
    for i, date in enumerate(date_cols):
        if i > 0:
            # save the difference between each 2 columns to the list
            col_defs.append((df[date_cols[i - 1]] - df[date]).alias(date))
    # result df containing only the items column and the differences for each date
    result = df.select('county', *col_defs)
    
  2. If I do not transpose the dataframe I can apply Window functions, as recommended by @mck in a comment to the question. I prefer this way as I avoid transposing, also the columns number will remain constant. This resource on PySpark Window Functions was very helpful for me to understand the way they function:

Dates Item1 Item2 ...
Date1 104 223 ...
Date2 98 135 ...
Date3 98 80 ...
# list to save column subtractions
colDiffs= []
# get only the item columns
itemCols = df.columns[1:]
# Window function spec to partition the entire df and sort it by Dates descending as there are no dates that show multiple times.
windowSpec = Window.partitionBy().orderBy(F.col('Dates').desc())
# for each item column
for item in itemCols:
    # add a new column, itemdiff, to the df containing the same numbers but shifted up by one 
    # e.g. if a column X contains the numbers [1, 2, 3], applying the lead window function with 1 as argument, will shift everything up by 1 and the new Xdiff column will contain [2, 3, none]
    df = df.withColumn(f'{item}diff', lead(item, 1).over(windowSpec))
    # append the difference between the current and the lead colum to the list
    colDiffs.append((df[item] - df[f'{item}diff']).alias(item))
# get the final df containing the subtraction results
result = df.select('Dates', *colDiffs)
derzemel
  • 91
  • 8
  • How does it work for subtracting two [string columns within a single dataframe](https://stackoverflow.com/q/69540138/10452700) in PySpark? – Mario Oct 12 '21 at 12:03