I am trying to join two tables together in PySpark and one join condition is dynamically determined by the content of column in another table.
For example, table 1 looks like
+-----+-----------+
|Acct |Util_Change|
+-----+-----------+
|1 |0.5 |
+-----+-----------+
|2 |0.8 |
+-----+-----------+
table 2 looks like
+----------+-----------+-----------+
|Low_Change|High_Change|CLS |
+----------+-----------+-----------+
|>0 |0.3 |T1 | # This means the util_change should be>0 and <=0.3
+----------+-----------+-----------+
|>0.3 |<0.7 |T2 | # This means the util_change should be>0.3 and <0.7
+----------+-----------+-----------+
|0.7 |1 |T3 | # This means the util_change should be>=0.7 and <=1
+----------+-----------+-----------+
I want to join table 1 and table 2 by matching table1.Util_change
with the Low_change
and High_change
in table 2. As you can see, the comparison operators are defined by table 2.
What should be the best way to code this in PySpark?
Here is the code to create the two tables:
product = [(1, 0.5), (2, 0.8)]
sp = sqlContext.createDataFrame(product, ["Acct", "Util_Change"])
grid = [('>0', '0.3', 'T1'), ('>0.3', '<0.7', 'T2'), ('0.7', '1', 'T3')]
sp2 = sqlContext.createDataFrame(grid, ["Low_Change", "High_Change", "CLS"]