I am seeing a spark/sql.AnalysisException when joining Dataframes/set which themselves are the result of previous joins
case class Left(a: Int, b: String, c: Boolean)
case class Right(a: Int, b: String, c: Int)
// Boilerplate
import spark.implicits._
val left = Seq(
Left(1, "1", true),
Left(2, "2", true),
Left(3, "3", true)
).toDF
val right = Seq(
Right(1, "1", 1),
Right(2, "2", 2),
Right(3, "3", 3)
).toDF
val joined1 = left
.join(right, left("a") === right("a"), "inner")
.select(left("a"), right("b"), left("c"))
val joined2 = joined1
.join(left, joined1("b") === left("b"))
The actual error message is:
Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) b#4 missing from b#49,b#15,c#5,a#48,c#50,a#3 in operator !Join Inner, (b#15 = b#4);;
!Join Inner, (b#15 = b#4)
:- Project [a#3, b#15, c#5]
: +- Join Inner, (a#3 = a#14)
: :- LocalRelation [a#3, b#4, c#5]
: +- LocalRelation [a#14, b#15, c#16]
+- LocalRelation [a#48, b#49, c#50]
So I guess the issue is that the columns in joined2 are expressed in terms of the columns from the parent tables and joined1("b"), i.e. b#4 does not exist.
I understand that this is soluble by aliasing the tables and referring to columns as functions.column("leftAlias.column") however I want to pour the result back into a Dataset so I would presumably have to re-rename the columns before doing so.
Can anyone suggest a more elegant solution/workaround?
Many thanks
Terry