1

I'm using pyspark to perform a join of two tables with a relatively complex join condition (using greater than/smaller than in the join conditions). This works fine, but breaks down as soon as I add a fillna command before the join.

The code looks something like this:

join_cond = [
    df_a.col1 == df_b.colx,
    df_a.col2 == df_b.coly,
    df_a.col3 >= df_b.colz
]

df = (
    df_a
    .fillna('NA', subset=['col1'])
    .join(df_b, join_cond, 'left')
)

This results in an error like this:

org.apache.spark.sql.AnalysisException: Resolved attribute(s) col1#4765 missing from col1#6488,col2#4766,col3#4768,colx#4823,coly#4830,colz#4764 in operator !Join LeftOuter, (((col1#4765 = colx#4823) && (col2#4766 = coly#4830)) && (col3#4768 >= colz#4764)). Attribute(s) with the same name appear in the operation: col1. Please check if the right attribute(s) are used.

It looks like spark no longer recognizes col1 after performing the fillna. (The error does not come up if I comment that out.) The problem is that I do need that statement. (And in general I've simplified this example a lot.)

I've looked at this question, but these answers do not work for me. Specifically, using .alias('a') after the fillna doesn't work because then spark does not recognize the a in the join condition.

Could someone:

  • Explain exactly why this is happening and how I can avoid it in the future?
  • Advise me on a way to solve it?

Thanks in advance for your help.

Willem
  • 976
  • 9
  • 24

1 Answers1

2

What is happening?

In order to "replace" empty values, a new dataframe is created that contains new columns. These new columns have the same names like the old ones but are effectively completely new Spark objects. In the Scala code you can see that the "changed" columns are newly created ones while the original columns are dropped.

A way to see this effect is to call explain on the dataframe before and after replacing the empty values:

df_a.explain()

prints

== Physical Plan ==
*(1) Project [_1#0L AS col1#6L, _2#1L AS col2#7L, _3#2L AS col3#8L]
+- *(1) Scan ExistingRDD[_1#0L,_2#1L,_3#2L]

while

df_a.fillna(42, subset=['col1']).explain()

prints

== Physical Plan ==
*(1) Project [coalesce(_1#0L, 42) AS col1#27L, _2#1L AS col2#7L, _3#2L AS col3#8L]
+- *(1) Scan ExistingRDD[_1#0L,_2#1L,_3#2L]

Both plans contain a column called col1, but in the first case the internal representation is called col1#6L while the second one is called col1#27L.

When the join condition df_a.col1 == df_b.colx now is associated with the column col1#6L the join will fail if only the column col1#27L is part of the left table.

How can the problem be solved?

The obvious way would be to move the `fillna` operation before the definition of the join condition:
df_a = df_a.fillna('NA', subset=['col1'])
join_cond = [
    df_a.col1 == df_b.colx,
[...]

If this is not possible or wanted you can change the join condition. Instead of using a column from the dataframe (df_a.col1) you can use a column that is not associated with any dataframe by using the col function. This column works only based on its name and therefore ignores when the column is replaced in the dataframe:

from pyspark.sql import functions as F
join_cond = [
    F.col("col1") == df_b.colx,
    df_a.col2 == df_b.coly,
    df_a.col3 >= df_b.colz
]

The downside of this second approach is that the column names in both tables must be unique.

werner
  • 13,518
  • 6
  • 30
  • 45