53
val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),("hyd", Array(1.5, 0.5, 0.9, 3.2)),("tvm", Array(8.0, 2.9, 9.1, 2.5))))
val df1= rdd.toDF("id", "vals")
val rdd1 = sc.parallelize(Seq(("vskp","ap"),("hyd","tel"),("bglr","kkt")))
val df2 = rdd1.toDF("id", "state")
val df3 = df1.join(df2,df1("id")===df2("id"),"left")

The join operation works fine but when I reuse the df2 I am facing unresolved attributes error

val rdd2 = sc.parallelize(Seq(("vskp", "Y"),("hyd", "N"),("hyd", "N"),("tvm", "Y")))
val df4 = rdd2.toDF("id","existance")
val df5 = df4.join(df2,df4("id")===df2("id"),"left")

ERROR: org.apache.spark.sql.AnalysisException: resolved attribute(s)id#426

Rajita
  • 633
  • 1
  • 5
  • 8
  • This is most probably related to https://issues.apache.org/jira/browse/SPARK-10925, i.e. ambiguity in naming of the id column – Erik Schmiegelow Aug 16 '17 at 12:20
  • but in the first case its works fine. And I am mentioning the reference also. I tried renaming the id in df4 to id_new. and still am not able to resolve the error. Is it beacuse of some lineage issue with JAVARDD? I tried keeping checkpoints. But still the same error – Rajita Aug 16 '17 at 12:24
  • Also see: https://stackoverflow.com/questions/40062298/resolved-attributes-missing-when-performing-join-on-pyspark/46203915 - the full error message is "resolved attribute(s) missing ..." – Matthias May 07 '19 at 13:07
  • This might be the worst/most-hackish fix ever but aliasing the dataframes i.e. `df_alias = df.alias('df_alias)` and reversing the order of the join i.e. changing `df1_alias.join(df2_alias . . .)` to `df2_alias.join(df1_ailas . . .)` fixed the issue for me – akki Jul 24 '19 at 01:57

13 Answers13

27

As mentioned in my comment, it is related to https://issues.apache.org/jira/browse/SPARK-10925 and, more specifically https://issues.apache.org/jira/browse/SPARK-14948. Reuse of the reference will create ambiguity in naming, so you will have to clone the df - see the last comment in https://issues.apache.org/jira/browse/SPARK-14948 for an example.

Erik Schmiegelow
  • 2,739
  • 1
  • 18
  • 22
  • 8
    "last comment" was stated relatively... not sure it applies any longer. Please quote here – MichaelChirico Sep 19 '18 at 06:17
  • 3
    Correct, that would be this one : https://issues.apache.org/jira/browse/SPARK-14948?focusedCommentId=15497086&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15497086 – Erik Schmiegelow Sep 19 '18 at 11:15
  • Here is an explanation of the root cause for this error https://github.com/apache/spark/pull/25107 – SergiyKolesnikov Aug 30 '19 at 12:15
  • 1
    Is no longer the last comment, here it is in case you guys are looking for: https://issues.apache.org/jira/browse/SPARK-14948?focusedCommentId=15497086&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15497086 – JBoy Oct 22 '19 at 08:45
25

If you have df1, and df2 derived from df1, try renaming all columns in df2 such that no two columns have identical name after join. So before the join:

so instead of df1.join(df2...

do

# Step 1 rename shared column names in df2.
df2_renamed = df2.withColumnRenamed('columna', 'column_a_renamed').withColumnRenamed('columnb', 'column_b_renamed')

# Step 2 do the join on the renamed df2 such that no two columns have same name.
df1.join(df2_renamed)
Tomer Ben David
  • 8,286
  • 1
  • 43
  • 24
14

This issue really killed a lot of my time and I finally got an easy solution for it.

In PySpark, for the problematic column, say colA, we could simply use

import pyspark.sql.functions as F

df = df.select(F.col("colA").alias("colA"))

prior to using df in the join.

I think this should work for Scala/Java Spark too.

Vlad
  • 8,225
  • 5
  • 33
  • 45
Jason CHAN
  • 6,155
  • 1
  • 13
  • 11
  • Thankfully the derived table had only two columns so this workaround was easy to write but really awkward. Is this the same old SPARK-14948 bug from 2016, that was for Spark 1.6.0? – Pablo Adames Oct 02 '20 at 06:30
  • 1
    wow! this worked for me, can't imagine I wasted so much time trying to figure out the root cause for this issue :(( – SatZ Aug 05 '21 at 09:53
  • Very smart! Thank you! – Eb Abadi Sep 03 '21 at 17:29
9

just rename your columns and put the same name. in pyspark:

for i in df.columns:
    df = df.withColumnRenamed(i,i)
Rishabh Sahrawat
  • 2,437
  • 1
  • 15
  • 32
mhmdburton
  • 101
  • 1
  • 2
  • 2
    plz explain bit more and provide code to help your answer – Galzor Dec 05 '19 at 09:04
  • I ran through all the comments in different pages but I couldn’t find the answer. So first I tried to join the tables by duplicating the key and changing the column names. So my column x in table a corresponded on column y on table b and the error disappeared. Then I renamed my columns on both side just after the aggregator function and I put the same name as before. It works like magic. So if you have columns a,b,c after the aggregator function rename them to a,b,c Ps. I've already provided the code – mhmdburton Dec 06 '19 at 10:16
  • Worked for me. Thanks for this simple, yet very effective solution. – Siddharth Satpathy Aug 04 '20 at 08:56
  • This should be higher up! Very simple and working work-around for these time consuming issues. – AutomatedChaos Apr 14 '21 at 11:13
3

In my case this error appeared during self join of same table. I was facing the below issue with Spark SQL and not the dataframe API:

org.apache.spark.sql.AnalysisException: Resolved attribute(s) originator#3084,program_duration#3086,originator_locale#3085 missing from program_duration#1525,guid#400,originator_locale#1524,EFFECTIVE_DATETIME_UTC#3157L,device_timezone#2366,content_rpd_id#734L,originator_sublocale#2355,program_air_datetime_utc#3155L,originator#1523,master_campaign#735,device_provider_id#2352 in operator !Deduplicate [guid#400, program_duration#3086, device_timezone#2366, originator_locale#3085, originator_sublocale#2355, master_campaign#735, EFFECTIVE_DATETIME_UTC#3157L, device_provider_id#2352, originator#3084, program_air_datetime_utc#3155L, content_rpd_id#734L]. Attribute(s) with the same name appear in the operation: originator,program_duration,originator_locale. Please check if the right attribute(s) are used.;;

Earlier I was using below query,

    SELECT * FROM DataTable as aext
             INNER JOIN AnotherDataTable LAO 
ON aext.device_provider_id = LAO.device_provider_id 

Selecting only required columns before joining solved the issue for me.

      SELECT * FROM (
    select distinct EFFECTIVE_DATE,system,mso_Name,EFFECTIVE_DATETIME_UTC,content_rpd_id,device_provider_id 
from DataTable 
) as aext
         INNER JOIN AnotherDataTable LAO ON aext.device_provider_id = LAO.device_provider_id 
Jeevan
  • 8,532
  • 14
  • 49
  • 67
1

I got the same issue when trying to use one DataFrame in two consecutive joins.

Here is the problem: DataFrame A has 2 columns (let's call them x and y) and DataFrame B has 2 columns as well (let's call them w and z). I need to join A with B on x=z and then join them together on y=z.

(A join B on A.x=B.z) as C join B on C.y=B.z

I was getting the exact error that in the second join it was complaining "resolved attribute(s) B.z#1234 ...".

Following the links @Erik provided and some other blogs and questions, I gathered I need a clone of B.

Here is what I did:

val aDF = ...
val bDF = ...
val bCloned = spark.createDataFrame(bDF.rdd, bDF.schema)
aDF.join(bDF, aDF("x") === bDF("z")).join(bCloned, aDF("y") === bCloned("z"))
Iraj Hedayati
  • 1,478
  • 17
  • 23
1

@Json_Chans answer is pretty good because it does not require any resource intensive operation. Anyhow, when dealing with huge amounts of columns you need some generic function to handle that stuff on the fly and not code hundreds of columns manually.

Luckily, you can derive that function from the Dataframe itself so that you do not need any additional code except of a one-liner (at least in Python respectively pySpark):

import pyspark.sql.functions as f

df # Some Dataframe you have the "resolve(d) attribute(s)" error with

df = df.select([ f.col( column_name ).alias( column_name) for column_name in df.columns])

Since the correct string representation of a column is still stored in the columns-attribute of the Dataframe(df.columns: list), you can just reset it with itself - That's done with the .alias() (note: This still results in a new Dataframe since Dataframes are immutable, meaning they cannot be changed).

Markus
  • 2,265
  • 5
  • 28
  • 54
0

For java developpers, try to call this method:

private static Dataset<Row> cloneDataset(Dataset<Row> ds) {
    List<Column> filterColumns = new ArrayList<>();
    List<String> filterColumnsNames = new ArrayList<>();
    scala.collection.Iterator<StructField> it = ds.exprEnc().schema().toIterator();
    while (it.hasNext()) {
        String columnName = it.next().name();
        filterColumns.add(ds.col(columnName));
        filterColumnsNames.add(columnName);
    }
    ds = ds.select(JavaConversions.asScalaBuffer(filterColumns).seq()).toDF(scala.collection.JavaConverters.asScalaIteratorConverter(filterColumnsNames.iterator()).asScala().toSeq());
    return ds;
}

on both datasets just before the joining, it clone the datasets into new ones:

df1 = cloneDataset(df1); 
df2 = cloneDataset(df2);
Dataset<Row> join = df1.join(df2, col("column_name"));
// if it didn't work try this
final Dataset<Row> join = cloneDataset(df1.join(df2, columns_seq)); 
Abdennacer Lachiheb
  • 4,388
  • 7
  • 30
  • 61
0

It will work if you do the below.

suppose you have a dataframe. df1 and if you want to cross join the same dataframe, you can use the below

df1.toDF("ColA","ColB").as("f_df").join(df1.toDF("ColA","ColB").as("t_df"), 
   $"f_df.pcmdty_id" === 
   $"t_df.assctd_pcmdty_id").select($"f_df.pcmdty_id",$"f_df.assctd_pcmdty_id")
slfan
  • 8,950
  • 115
  • 65
  • 78
0

From my experience, we have 2 solutions 1) clone DF 2) rename columns that have ambiguity before joining tables. (don't forget to drop duplicated join key)

Personally I prefer the second method, because cloning DF in the first method takes time, especially if data size is big.

Zhenyi Lin
  • 111
  • 1
  • 5
  • "cloning DF in the first method takes time, especially if data size is big": discards cloning option to solve problem for big tables – PALEN Oct 08 '19 at 22:43
0

[TLDR]

Break the AttributeReference shared between columns in parent DataFrame and derived DataFrame by writing the intermediate DataFrame to file system and reading it again.

Ex:

val df1 = spark.read.parquet("file1")
df1.createOrReplaceTempView("df1")
val df2 = spark.read.parquet("file2")
df2.createOrReplaceTempView("df2")

val df12 = spark.sql("""SELECT * FROM df1 as d1 JOIN df2 as d2 ON d1.a = d2.b""")
df12.createOrReplaceTempView("df12")

val df12_ = spark.sql(""" -- some transformation -- """)
df12_.createOrReplaceTempView("df12_")

val df3 = spark.read.parquet("file3")
df3.createOrReplaceTempView("df3")

val df123 = spark.sql("""SELECT * FROM df12_ as d12_ JOIN df3 as d3 ON d12_.a = d3.c""")
df123.createOrReplaceTempView("df123")

Now joining with top level DataFrame will lead to "unresolved attribute error"

val df1231 = spark.sql("""SELECT * FROM df123 as d123 JOIN df1 as d1 ON d123.a = d1.a""") 

Solution: d123.a and d1.a share same AttributeReference break it by writing intermediate table df123 to file system and reading again. now df123write.a and d1.a does not share AttributeReference

val df123 = spark.sql("""SELECT * FROM df12 as d12 JOIN df3 as d3 ON d12.a = d3.c""")
df123.createOrReplaceTempView("df123")

df123.write.parquet("df123.par")
val df123write = spark.read.parquet("df123.par")
spark.catalog.dropTempView("df123")
df123write.createOrReplaceTempView("df123")

val df1231 = spark.sql("""SELECT * FROM df123 as d123 JOIN df1 as d1 ON d123.a = d1.a""") 

Long story:

We had complex ETLs with transformation and self joins of DataFrames, performed at multiple levels. We faced "unresolved attribute" error frequently and we solved it by selecting required attribute and performing join on the top level table instead of directly joining with the top level table this solved the issue temporarily but when we applied some more transformation on these DataFrame and joined with any top level DataFrames, "unresolved attribute" error raised its ugly head again.

This was happening because DataFrames in bottom level were sharing the same AttributeReference with top level DataFrames from which they were derived [more details]

So we broke this reference sharing by writing just 1 intermediate transformed DataFrame and reading it again and continuing with our ETL. This broke sharing AttributeReference between bottom DataFrames and Top DataFrames and we never again faced "unresolved attribute" error.

This worked for us because as we moved from top level DataFrame to bottom performing transformation and join our data shrank than initial DataFrames that we started, it also improved our performance as data size was less and spark didn't have to traverse back the DAG all the way to the last persisted DataFrame.

Jeevan
  • 8,532
  • 14
  • 49
  • 67
0

Thanks to Tomer's Answer

For scala - The issue came up when I tried to use the column in the self-join clause, to fix it use the method

// To `and` all the column conditions
def andAll(cols: Iterable[Column]): Column =
   if (cols.isEmpty) lit(true)
   else cols.tail.foldLeft(cols.head) { case (soFar, curr) => soFar.and(curr) }

// To perform join different col name
def renameColAndJoin(leftDf: DataFrame, joinCols: Seq[String], joinType: String = "inner")(rightDf: DataFrame): DataFrame = {

   val renamedCols: Seq[String]          = joinCols.map(colName => s"${colName}_renamed")
   val zippedCols: Seq[(String, String)] = joinCols.zip(renamedCols)

   val renamedRightDf: DataFrame = zippedCols.foldLeft(rightDf) {
     case (df, (origColName, renamedColName)) => df.withColumnRenamed(origColName, renamedColName)
   }

   val joinExpr: Column = andAll(zippedCols.map {
     case (origCol, renamedCol) => renamedRightDf(renamedCol).equalTo(rightDf(origCol))
   })

   leftDf.join(renamedRightDf, joinExpr, joinType)

}
0

In my case, Checkpointing the original dataframe fixed the issue.