I am new to pyspark. I want to compare two tables. If the the value in one of the column does not match, I want to print out that column name in a new column. Using, Compare two dataframes Pyspark link, I am able to get that result. Now, I want to filter the new table based on the newly created column.
df1 = spark.createDataFrame([
[1, "ABC", 5000, "US"],
[2, "DEF", 4000, "UK"],
[3, "GHI", 3000, "JPN"],
[4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])
df2 = spark.createDataFrame([
[1, "ABC", 5000, "US"],
[2, "DEF", 4000, "CAN"],
[3, "GHI", 3500, "JPN"],
[4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])
from pyspark.sql.functions import *
#from pyspark.sql.functions import col, array, when, array_remove
# get conditions for all columns except id
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c != 'id']
select_expr =[
col("id"),
*[df2[c] for c in df2.columns if c != 'id'],
array_remove(array(*conditions_), "").alias("column_names")
]
df3 = df1.join(df2, "id").select(*select_expr)
df3.show()
DF3:
+------+---------+--------+------+--------------+
| id | |name | sal | Address | column_names |
+------+---------+--------+------+--------------+
| 1| ABC | 5000 | US | [] |
| 2| DEF | 4000 | CAN | [address] |
| 3| GHI | 3500 | JPN | [sal] |
| 4| JKL_M | 4800 | CHN | [name,sal] |
+------+---------+--------+------+--------------+
This is the step where I am getting an error message.
df3.filter(df3.column_names!="")
Error: cannot resolve '(column_names = '')' due to data type mismatch: differing types in '(column_names = '')' (array<string> and string).
I want the following result
DF3:
+------+---------+--------+------+--------------+
| id | |name | sal | Address | column_names |
+------+---------+--------+------+--------------+
| 1| DEF | 4000 | CAN | [address] |
| 2| GHI | 3500 | JPN | [sal] |
| 3| JKL_M | 4800 | CHN | [name,sal] |
+------+---------+--------+------+--------------+