In a dataframe I'm trying to identify those rows that have a value in column C2 that does not exist in column C1 in any other row. I tryed the following code:
in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3'])
in_df.show()
+---+----+---+
| C1| C2| C3|
+---+----+---+
| 1|null| A|
| 2| 1| B|
| 3|null| C|
| 4| 11| D|
+---+----+---+
filtered = in_df.filter(in_df.C2.isNotNull())
filtered.show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 2| 1| B|
| 4| 11| D|
+---+---+---+
Now applying a left_anti join is expected to return only the row 4, however I also get row 2:
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 2| 1| B|
| 4| 11| D|
+---+---+---+
If I 'materialize' the filtered DF the result is as expected:
filtered = filtered.toDF(*filtered.columns)
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 4| 11| D|
+---+---+---+
Why is this .toDF needed?