1

I have to implement pandas .apply(function, axis=1) (to apply row wise function) in pyspark. As I am a novice, I am not sure if It can be implemented either through map function or using UDFs. I am not able to find any similar implementation anywhere.

Basically all I want is to pass a row to a function do some operations to create new columns which depend on the values of current and previous rows and then return modified rows to create a new dataframe. One of the function used with pandas is given below:

previous = 1
def row_operation(row):
    global previous
    if pd.isnull(row["PREV_COL_A"])==True or (row["COL_A"]) != (row["PREV_COL_A"]):
        current = 1
    elif row["COL_C"] > cutoff:
        current = previous +1
    elif row["COL_C"]<=cutoff:
        current = previous
    else:
        current = Nan
    previous = current
    return current

Here PREV_COL_A is nothing but COL_A lagged by 1 row.

Please note that this function is the simplest and does not return rows however others do. If anyone can guide me on how to implement row operations in pyspark it would be a great help. TIA

1 Answers1

0

You could use rdd.mapPartition. It will give you an iterator over the rows and you yield out the result rows you want to return. The iterable you are given won't allow you to go index forward or backwards, just return the next row. However you can save off rows as you are processing to do whatever you need to do. For example

def my_cool_function(rows):
    prev_rows = []

    for row in rows:
       # Do some processing with all the rows, and return a result
       yield my_new_row

       if len(prev_rows) >= 2:
           prev_rows = prev_rows[1:]

       prev_rows.append(row)

updated_rdd = rdd.mapPartitions(my_cool_function)

Note, I used a list to track the partitions for the sake of example, but python lists are really arrays which don't have efficient head push/pop methods, so you will probably want to use an actual Queue.

Ryan Widmaier
  • 7,948
  • 2
  • 30
  • 32