I have two dataframe one contains data like this
first dataframe is something like this
+-----+-----------+-----+----------------------+
|value|mergeValues|table|columnName |
+-----+-----------+-----+----------------------+
|1 |1,2,3 | |columnName1 |
|2 |4,5,6,7 | |columnName1 |
|3 |8,9 | |columnName1 |
|1 |1,2,3 | |columnName4 |
|2 |4,5,6,7 | |columnName4 |
|3 |8,9 | |columnName4 |
|1 |1,2,3 | |columnName5 |
|2 |4,5,6,7 | |columnName5 |
|3 |8,9 | |columnName5 |
|1 |1,2,3 | |columnName6 |
+-----+-----------+-----+----------------------+
Another dataframe structure is shown below
columnName1 | columnName2 | columnName3 |columnName4 |columnName5 | columnName6
1
3
2
4
5
Now I have to create mapped dataframe like this.
Mapping logic is : get value from 2ndDF and check firstdf mergeValue if that contains then map to firstdf value. here value of 2nd df columnName1 is 1 it is present in firstDf mergeValues list map it to firstDf[value] which is 1. same for 2,3,4,5,6,7 ...
columnName1 | columnName2 | columnName3 |columnName4 |columnName5 | columnName6
1
1
1
2
2
To do this i'm using UDF way but it is failing, what is the correct way to create this data frame.
My code is:
val firstDF=sparkSession.read.load(first)
val testDF = sparkSession.read.load(test)
val populateColumn: ((String, String, String) => String) = (mergeValues: String, value: String,
actualValue: String) => {
if (mergeValues.contains(actualValue.trim)) {
value
} else {
actualValue
}
}
val populateColumnUdf = udf(populateColumn)
val firstDFList=firstDF.collect
firstDFList.foreach(Case => {
println(Case)
testDF.withColumn(Case.getAs("columnName"), populateColumnUdf(Case.getAs("mergeValues"),
Case.getAs("value"), col(Case.getAs("columnName"))))
})
testDF.show
This is the error i'm getting
java.lang.String cannot be cast to org.apache.spark.sql.Column java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Column