0

I have two data sets with the following data:

enter image description here

Given the interchange_rate value in the source file, I need to get the closest match of rebate_rate from the lookup table. In the image you can see that the transaction_id 2 has the highest interchange_rate and therefore, I need to get the highest rebate_rate; transaction_id 1 is the lowest interchange_rate and therefore get the lowest rebate_rate.

The red column I did manually using Excel just to show it as an example, but that's the expected output.

My initial idea is to loop through the rows in the source file, and for each line search for the closest match in the lookup table. But I'm not a very experienced PySpark developer. I'm looking for help to write code to accomplish this task.

My first try was to use the foreach() method in the source file dataframe, but I get a PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

def get_rebate(row):
    buyer_name = row["buyer_name"]
    df_buyer = df_lookup.where(f"buyer_name == '{buyer_name}'")
    row["rebate_rate"] = df_buyer.select("rebate_rate").first()

    return row

# df_final is the source file after a few cosmetics transformations. I need to add a new column "rebate_rate" to it
df_final.foreach(lambda x: get_rebate(x))
Croves
  • 400
  • 3
  • 11
  • 1
    maybe sort each dataframe and merge them? this only works for two dataframes of same size, but that might be the case for you (I'm not sure) – Anton May 15 '23 at 14:29
  • @Anton thank you for the insight, I was overthinking and didn't realize that I could just sort. I will double-check with the BAs to make sure the dataframes will always be of the same size – Croves May 15 '23 at 14:39

1 Answers1

0

Iteration is very imperative and won't really work in distributed environment like PySpark (unless a dataset is small enough, under a single node's memory capacity).

Just like in programming, there are functional and imperative languages so you can say what you want, not how to do it, respectively.

In PySpark, what you want to do is join. There are quite a few different variants of joins and am fairly confident one could give you want you want.


Worry not, if PySpark notices that one side of a join operator is below certain threshold, the side is going to be transformed into a lookup table and distributed across all the executor nodes in a cluster.

That's one of the many optimizations you don't have to be even aware of to get the most our of PySpark. Since you asked, the name of the fastest join is Broadcast Hash Join.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420