I was troubleshooting my Spark (2.3) app and realized that I was getting incorrect results if I did not cache the dataset after crossJoin. Do we know if it's required to cache a dataset after crossJoin?
Here's what my Dataset looks like:
+--------+------------+
| id| nameGroupId|
+--------+------------+
| joe san| 6302870528|
|john san|936302870528|
+--------+------------+
When I execute the code below with the .cache() statement uncommented, the callUDF("leftNameTokenComplement") UDF is invoked with the following parameters, which is what I would expect:
df1.id="joe san" & df2.id="john san"
But, when I execute the same block of code with .cache() statement commented, the callUDF("leftNameTokenComplement") UDF is invoked with the following parameters. This is crazy because the filter() right before the UDF would have stopped this from happening.
df1.id="joe san" & df2.id="joe san"
Dataset<Row> connectedDf = sparkSession.read().json(this.workspaceDir + "connected-df");
connectedDf.as("df1")
.crossJoin(connectedDf.as("df2"))
//.cache() //correct results only when cached!
.filter( "df1.id < df2.id" )
.withColumn("leftComp", functions.callUDF("leftNameTokenComplement", col("df1.id"), col("df2.id") ))
.filter( functions.size(col("leftComp")).equalTo(1) )
.withColumn("src1", functions.callUDF("firstInArray", col("leftComp")) )
.withColumn("matchedPhonic", functions.callUDF("isNameTokenPhonicMatch", col("src1")) )
.filter( col("matchedPhonic").equalTo(true))
.show();