0

I am trying to join two tables together in PySpark and one join condition is dynamically determined by the content of column in another table.

For example, table 1 looks like

+-----+-----------+
|Acct |Util_Change|
+-----+-----------+
|1    |0.5        |         
+-----+-----------+
|2    |0.8        |
+-----+-----------+

table 2 looks like

+----------+-----------+-----------+
|Low_Change|High_Change|CLS        |
+----------+-----------+-----------+
|>0        |0.3        |T1         | # This means the util_change should be>0 and <=0.3  
+----------+-----------+-----------+
|>0.3      |<0.7       |T2         | # This means the util_change should be>0.3 and <0.7  
+----------+-----------+-----------+
|0.7       |1          |T3         | # This means the util_change should be>=0.7 and <=1  
+----------+-----------+-----------+

I want to join table 1 and table 2 by matching table1.Util_change with the Low_change and High_change in table 2. As you can see, the comparison operators are defined by table 2.

What should be the best way to code this in PySpark?

Here is the code to create the two tables:

product = [(1, 0.5), (2, 0.8)]
sp = sqlContext.createDataFrame(product, ["Acct", "Util_Change"])

grid = [('>0', '0.3', 'T1'), ('>0.3', '<0.7', 'T2'), ('0.7', '1', 'T3')]
sp2 = sqlContext.createDataFrame(grid, ["Low_Change", "High_Change", "CLS"]
Nanan
  • 55
  • 1
  • 10
  • do you have any code you've done so far? – iurii_n Oct 01 '18 at 16:02
  • I have updated the code to create the two tables. – Nanan Oct 01 '18 at 16:08
  • Related: [How to do range lookup and search in PySpark](https://stackoverflow.com/questions/51506913/how-to-do-range-lookup-and-search-in-pyspark/51522021#51522021) and [Spark Equivalent of IF Then ELSE](https://stackoverflow.com/questions/39048229/spark-equivalent-of-if-then-else). – pault Oct 01 '18 at 16:43
  • Thanks for sharing the link, but they are not related to my questions. – Nanan Oct 01 '18 at 16:47
  • @Nanan please look at the first link again. It does a join based on a range look up, which is precisely what you want to do here. (Granted there's also some other stuff in there that is unrelated to your question.) The main thing is that you can do a `crossJoin` and add a where clause like `.where("table1.Util_Change BETWEEN table2.Low_Change AND table2.High_Change")` – pault Oct 01 '18 at 17:52
  • Thansk, @pault! I was the person who posted the question in the first link. The difference between that post and this post is that in this post, the comparison operators were specified in the table, whereas, the comparison opertors in the first link were also >= lower_bound and < higher_bound. – Nanan Oct 01 '18 at 18:07
  • @Nanan wow I can't believe I did not notice you posted the first question. You can still use a combination of the two links I shared- use `pyspark.sql.functions.when` to specify what to do based on the presence of the comparison operator. I'll post a solution if I have time. – pault Oct 01 '18 at 18:14

0 Answers0