44

In standard SQL, when you join a table to itself, you can create aliases for the tables to keep track of which columns you are referring to:

SELECT a.column_name, b.column_name...
FROM table1 a, table1 b
WHERE a.common_field = b.common_field;

There are two ways I can think of to achieve the same thing using the Spark DataFrame API:

Solution #1: Rename the columns

There are a couple of different methods for this in answer to this question. This one just renames all the columns with a specific suffix:

df.toDF(df.columns.map(_ + "_R"):_*)

For example you can do:

df.join(df.toDF(df.columns.map(_ + "_R"):_*), $"common_field" === $"common_field_R")

Solution #2: Copy the reference to the DataFrame

Another simple solution is to just do this:

val df: DataFrame = ....
val df_right = df

df.join(df_right, df("common_field") === df_right("common_field"))

Both of these solutions work, and I could see each being useful in certain situations. Are there any internal differences between the two I should be aware of?

Community
  • 1
  • 1
David Griffin
  • 13,677
  • 5
  • 47
  • 65

1 Answers1

48

There are at least two different ways you can approach this either by aliasing:

df.as("df1").join(df.as("df2"), $"df1.foo" === $"df2.foo")

or using name-based equality joins:

// Note that it will result in ambiguous column names
// so using aliases here could be a good idea as well.
// df.as("df1").join(df.as("df2"), Seq("foo"))

df.join(df, Seq("foo"))  

In general column renaming, while the ugliest, is the safest practice across all the versions. There have been a few bugs related to column resolution (we found one on SO not so long ago) and some details may differ between parsers (HiveContext / standard SQLContext) if you use raw expressions.

Personally I prefer using aliases because their resemblance to an idiomatic SQL and ability to use outside the scope of a specific DataFrame objects.

Regarding performance unless you're interested in close-to-real-time processing there should be no performance difference whatsoever. All of these should generate the same execution plan.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Is `DataFrame.as` relatively new? – David Griffin Mar 27 '16 at 18:34
  • 2
    No, it's been there at least since 1.3. It is just not so commonly used. It shouldn't be mistaken with `as[U]` which is used with `Datasets`. Scala and Python have an alternative `alias` method which achieves the same thing. – zero323 Mar 27 '16 at 18:36
  • Strangely enough, I don't see any reference to it in the scaladocs, other than in the index. I see `as[U]`, but even when I go back to earlier versions I'm not seeing `as` in `DataFrame`. I see `as` in `Column`, just not in `DataFrame`. – David Griffin Mar 27 '16 at 18:43
  • Check _Language Integrated Queries_ section (8th position or so) or the source https://github.com/apache/spark/blob/ec2b807212e568c9e98cd80746bcb61e02c7a98e/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L703 :) – zero323 Mar 27 '16 at 18:55
  • To much SO can do that to you :) – zero323 Mar 27 '16 at 19:14
  • I missed that function for literally a year! (Just passed 1 year anniversary of Spark 1.3.) – David Griffin Mar 27 '16 at 19:16
  • I can understand you've missed the function but the fact you've missed my [brilliant answer](http://stackoverflow.com/a/31538114/1560062) is just unacceptable :-D If you make it feel any better I've learned today about [OneVsRest](http://stackoverflow.com/q/36243455/1560062). I actually coded something like this from scratch a few weeks ago :/ – zero323 Mar 27 '16 at 19:28
  • See that's when I was taking time off from SO. Had I been spending more time here... – David Griffin Mar 27 '16 at 19:31
  • Can we do self join on a streaming dataset using spark 2.2.0? – user1870400 Feb 18 '18 at 05:52