Update: the root issue was a bug which was fixed in Spark 3.2.0.
Input df structures are identic in both runs, but outputs are different. Only the second run returns desired result (df6
). I know I can use aliases for dataframes which would return desired result.
The question. What is the underlying Spark mechanics in creating df3
? Spark reads df1.c1 == df2.c2
in the join
's on
clause, but it's evident that it does not pay attention to the dfs provided. What's under the hood there? How to anticipate such behaviour?
First run (incorrect df3
result):
data = [
(1, 'bad', 'A'),
(4, 'ok', None)]
df1 = spark.createDataFrame(data, ['ID', 'Status', 'c1'])
df1 = df1.withColumn('c2', F.lit('A'))
df1.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 1| bad| A| A|
#| 4| ok|null| A|
#+---+------+----+---+
df2 = df1.filter((F.col('Status') == 'ok'))
df2.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 4| ok|null| A|
#+---+------+----+---+
df3 = df2.join(df1, (df1.c1 == df2.c2), 'full')
df3.show()
#+----+------+----+----+----+------+----+----+
#| ID|Status| c1| c2| ID|Status| c1| c2|
#+----+------+----+----+----+------+----+----+
#| 4| ok|null| A|null| null|null|null|
#|null| null|null|null| 1| bad| A| A|
#|null| null|null|null| 4| ok|null| A|
#+----+------+----+----+----+------+----+----+
Second run (correct df6
result):
data = [
(1, 'bad', 'A', 'A'),
(4, 'ok', None, 'A')]
df4 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2'])
df4.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 1| bad| A| A|
#| 4| ok|null| A|
#+---+------+----+---+
df5 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2']).filter((F.col('Status') == 'ok'))
df5.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 4| ok|null| A|
#+---+------+----+---+
df6 = df5.join(df4, (df4.c1 == df5.c2), 'full')
df6.show()
#+----+------+----+----+---+------+----+---+
#| ID|Status| c1| c2| ID|Status| c1| c2|
#+----+------+----+----+---+------+----+---+
#|null| null|null|null| 4| ok|null| A|
#| 4| ok|null| A| 1| bad| A| A|
#+----+------+----+----+---+------+----+---+
I can see the physical plans are different in a way that different joins are used internally (BroadcastNestedLoopJoin and SortMergeJoin). But this by itself does not explain why results are different as they should still be same for different internal join types.
df3.explain()
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#23335 = A)
:- *(1) Project [ID#23333L, Status#23334, c1#23335, A AS c2#23339]
: +- *(1) Filter (isnotnull(Status#23334) AND (Status#23334 = ok))
: +- *(1) Scan ExistingRDD[ID#23333L,Status#23334,c1#23335]
+- BroadcastExchange IdentityBroadcastMode, [id=#9250]
+- *(2) Project [ID#23379L, Status#23380, c1#23381, A AS c2#23378]
+- *(2) Scan ExistingRDD[ID#23379L,Status#23380,c1#23381]
df6.explain()
== Physical Plan ==
SortMergeJoin [c2#23459], [c1#23433], FullOuter
:- *(2) Sort [c2#23459 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c2#23459, 200), ENSURE_REQUIREMENTS, [id=#9347]
: +- *(1) Filter (isnotnull(Status#23457) AND (Status#23457 = ok))
: +- *(1) Scan ExistingRDD[ID#23456L,Status#23457,c1#23458,c2#23459]
+- *(4) Sort [c1#23433 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(c1#23433, 200), ENSURE_REQUIREMENTS, [id=#9352]
+- *(3) Scan ExistingRDD[ID#23431L,Status#23432,c1#23433,c2#23434]