7

Update: the root issue was a bug which was fixed in Spark 3.2.0.


Input df structures are identic in both runs, but outputs are different. Only the second run returns desired result (df6). I know I can use aliases for dataframes which would return desired result.

The question. What is the underlying Spark mechanics in creating df3? Spark reads df1.c1 == df2.c2 in the join's on clause, but it's evident that it does not pay attention to the dfs provided. What's under the hood there? How to anticipate such behaviour?

First run (incorrect df3 result):

data = [
    (1, 'bad', 'A'),
    (4, 'ok', None)]
df1 = spark.createDataFrame(data, ['ID', 'Status', 'c1'])
df1 = df1.withColumn('c2', F.lit('A'))
df1.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  1|   bad|   A|  A|
#|  4|    ok|null|  A|
#+---+------+----+---+

df2 = df1.filter((F.col('Status') == 'ok'))
df2.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  4|    ok|null|  A|
#+---+------+----+---+

df3 = df2.join(df1, (df1.c1 == df2.c2), 'full')
df3.show()

#+----+------+----+----+----+------+----+----+
#|  ID|Status|  c1|  c2|  ID|Status|  c1|  c2|
#+----+------+----+----+----+------+----+----+
#|   4|    ok|null|   A|null|  null|null|null|
#|null|  null|null|null|   1|   bad|   A|   A|
#|null|  null|null|null|   4|    ok|null|   A|
#+----+------+----+----+----+------+----+----+

Second run (correct df6 result):

data = [
    (1, 'bad', 'A', 'A'),
    (4, 'ok', None, 'A')]
df4 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2'])
df4.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  1|   bad|   A|  A|
#|  4|    ok|null|  A|
#+---+------+----+---+

df5 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2']).filter((F.col('Status') == 'ok'))
df5.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  4|    ok|null|  A|
#+---+------+----+---+

df6 = df5.join(df4, (df4.c1 == df5.c2), 'full')
df6.show()

#+----+------+----+----+---+------+----+---+
#|  ID|Status|  c1|  c2| ID|Status|  c1| c2|
#+----+------+----+----+---+------+----+---+
#|null|  null|null|null|  4|    ok|null|  A|
#|   4|    ok|null|   A|  1|   bad|   A|  A|
#+----+------+----+----+---+------+----+---+

I can see the physical plans are different in a way that different joins are used internally (BroadcastNestedLoopJoin and SortMergeJoin). But this by itself does not explain why results are different as they should still be same for different internal join types.

df3.explain()

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#23335 = A)
:- *(1) Project [ID#23333L, Status#23334, c1#23335, A AS c2#23339]
:  +- *(1) Filter (isnotnull(Status#23334) AND (Status#23334 = ok))
:     +- *(1) Scan ExistingRDD[ID#23333L,Status#23334,c1#23335]
+- BroadcastExchange IdentityBroadcastMode, [id=#9250]
   +- *(2) Project [ID#23379L, Status#23380, c1#23381, A AS c2#23378]
      +- *(2) Scan ExistingRDD[ID#23379L,Status#23380,c1#23381]

df6.explain()

== Physical Plan ==
SortMergeJoin [c2#23459], [c1#23433], FullOuter
:- *(2) Sort [c2#23459 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(c2#23459, 200), ENSURE_REQUIREMENTS, [id=#9347]
:     +- *(1) Filter (isnotnull(Status#23457) AND (Status#23457 = ok))
:        +- *(1) Scan ExistingRDD[ID#23456L,Status#23457,c1#23458,c2#23459]
+- *(4) Sort [c1#23433 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(c1#23433, 200), ENSURE_REQUIREMENTS, [id=#9352]
      +- *(3) Scan ExistingRDD[ID#23431L,Status#23432,c1#23433,c2#23434]
ZygD
  • 22,092
  • 39
  • 79
  • 102

2 Answers2

4

Joins depend on the structure of joined dataframes, but how you built those dataframes can have influence too. If the two dataframes you join share the same lineage, you can have ambiguous columns issues in join condition, leading to what you're describing in your question.

In you first run, as you built df2 from df1, the two dataframes share the same lineage. When you join those two dataframes, you're actually doing a self-join, with Spark selecting the wrong columns belonging to only one of the joined dataframes as join condition resulting to a cartesian product followed by an always false filter.

In your second run, as the two dataframes were built independently, join condition is rightly defined with an equality between two columns, each column belonging to a different dataframe. Thus Spark performs a classic join.


Detailed explanation

As pltc explains in his answer, in your first run Spark does not select the right columns for your join. Let's find out why.

What's under the hood?

Let's start by getting the physical plans of df1 and df2 using explain. Here is the physical plan for df1:

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]

And here is the physical plan for df2:

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
   +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]

You can see in first line starting by (1) Project that the two dataframes df1 and df2 have the same column names and ids: [ID#0L, Status#1, c1#2, A AS c2#6]. It is not surprising because df2 was created from df1, so you can see df2 as df1 with additional transformations. So we have the following references:

  • df1.c1 <=> df2.c1 <=> c1#2
  • df1.c2 <=> df2.c2 <=> A AS c2#6

And when you join df1 and df2, it means that you do a self-join. And all the following combinations of your condition will be translated as c1#2 = A AS c2#6, which will leave you with the simplified join condition c1#2 = A:

  • df1.c1 = df2.c2
  • df1.c2 = df2.c1
  • df2.c1 = df1.c2
  • df2.c2 = df1.c1

When you perform a self-join in Spark, Spark will regenerate column ids of the right dataframe to avoid having same column ids in the final dataframe. So in your case it will rewrite column ids of df1. So column c1#2 will refer to column c1 of df2.

Now your condition doesn't contain any columns from df1, then Spark will choose to perform cartesian product as join strategy. As one of the two dataframes is small enough to be broadcasted, the selected algorithm will be BroadcastNestedLoopJoin. This is what the physical plan of df3 shows:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#2 = A)
:- *(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
:  +- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
:     +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
+- BroadcastExchange IdentityBroadcastMode, [id=#75]
   +- *(2) Project [ID#46L, Status#47, c1#48, A AS c2#45]
      +- *(2) Scan ExistingRDD[ID#46L,Status#47,c1#48]

Note that the four new column ids of df1 are now [ID#46L, Status#47, c1#48, A AS c2#45].

And when you execute this plan, as for the unique row of df2, the value of c1 is null which is different from A, thus join condition is always false. As you chose full outer join, you get the three rows (two from df1, one from df2) with null in columns coming from the other dataframe:

+----+------+----+----+----+------+----+----+
|  ID|Status|  c1|  c2|  ID|Status|  c1|  c2|
+----+------+----+----+----+------+----+----+
|   4|    ok|null|   A|null|  null|null|null|
|null|  null|null|null|   1|   bad|   A|   A|
|null|  null|null|null|   4|    ok|null|   A|
+----+------+----+----+----+------+----+----+

Why for the second run I have the desired output?

For the second run, you create two independent dataframes. So if we look at the physical plan of df4 and df5, you can see that the column ids are different. Here is the physical plan of df4:

== Physical Plan ==
*(1) Scan ExistingRDD[ID#98L,Status#99,c1#100,c2#101]

And here is the physical plan of df5:

== Physical Plan ==
*(1) Filter (isnotnull(Status#124) AND (Status#124 = ok))
+- *(1) Scan ExistingRDD[ID#123L,Status#124,c1#125,c2#126]

Your join condition is c1#100 = c2#126, c1#100 is c1 column from df4 and c2#126 is c2 column from df5. Each end of equality in join condition is from different dataframes, so Spark can perform the join as you expected.

Why this is not detected as Ambiguous Self Join?

Since Spark 3.0, Spark checks that the columns you're using for join are not ambiguous. If you inverted the order of df2 and df1 when joining them as follows:

df3 = df1.join(df2, (df1.c1 == df2.c2), 'full')

you would get the following error:

pyspark.sql.utils.AnalysisException: Column c2#6 are ambiguous.

So why don't we have this error when executing df2.join(df1, ...)?

You have your answer in the file DetectAmbiguousSelfJoin in Spark's code:

// When self-join happens, the analyzer asks the right side plan to generate
// attributes with new exprIds. If a plan of a Dataset outputs an attribute which
// is referred by a column reference, and this attribute has different exprId than
// the attribute of column reference, then the column reference is ambiguous, as it
// refers to a column that gets regenerated by self-join.

It means that when doing df2.join(df1, ...), we will only check columns used in join condition against df1. As in our case we didn't perform any transformation on df1, contrary to df2 that was filtered, exprIds of df1 columns didn't change and thus no ambiguous columns error is raised.

I've created an issue on Spark Jira about this behaviour, see SPARK-36874 (the bug was fixed in version 3.2.0).

How to anticipate such behaviour?

You have to be very careful about whether your join is a self join. If you start from a dataframe df1, perform some transformation on it to get df2, and then join df1 and df2 you risk getting such behaviour. To mitigate that, you should always put the original dataframe as first dataframe when doing a join, so having df1.join(df2, ...) instead of df2.join(df1, ...). By doing so, you will get an Analysis Exception: Column x are ambiguous if Spark doesn't manage to select the right columns.

ZygD
  • 22,092
  • 39
  • 79
  • 102
Vincent Doba
  • 4,343
  • 3
  • 22
  • 42
0

Spark for some reason doesn't distinguish your c1 and c2 columns correctly. This is the fix for df3 to have your expected result:

df3 = df2.alias('df2').join(df1.alias('df1'), (F.col('df1.c1') == F.col('df2.c2')), 'full')
df3.show()

# Output
# +----+------+----+----+---+------+----+---+
# |  ID|Status|  c1|  c2| ID|Status|  c1| c2|
# +----+------+----+----+---+------+----+---+
# |   4|    ok|null|   A|  1|   bad|   A|  A|
# |null|  null|null|null|  4|    ok|null|  A|
# +----+------+----+----+---+------+----+---+
pltc
  • 5,836
  • 1
  • 13
  • 31