73

I have two dataframes with the following columns:

df1.columns
//  Array(ts, id, X1, X2)

and

df2.columns
//  Array(ts, id, Y1, Y2)

After I do

val df_combined = df1.join(df2, Seq(ts,id))

I end up with the following columns: Array(ts, id, X1, X2, ts, id, Y1, Y2). I could expect that the common columns would be dropped. Is there something that additional that needs to be done?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Neel
  • 9,913
  • 16
  • 52
  • 74

10 Answers10

58

The simple answer (from the Databricks FAQ on this matter) is to perform the join where the joined columns are expressed as an array of strings (or one string) instead of a predicate.

Below is an example adapted from the Databricks FAQ but with two join columns in order to answer the original poster's question.

Here is the left dataframe:

val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))

val left = llist.toDF("firstname","lastname","date","duration")

left.show()

/*
+---------+--------+----------+--------+
|firstname|lastname|      date|duration|
+---------+--------+----------+--------+
|      bob|       b|2015-01-13|       4|
|    alice|       a|2015-04-23|      10|
+---------+--------+----------+--------+
*/

Here is the right dataframe:

val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")

right.show()

/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
|    alice|       a|   100|
|      bob|       b|    23|
+---------+--------+------+
*/

Here is an incorrect solution, where the join columns are defined as the predicate left("firstname")===right("firstname") && left("lastname")===right("lastname").

The incorrect result is that the firstname and lastname columns are duplicated in the joined data frame:

left.join(right, left("firstname")===right("firstname") &&
                 left("lastname")===right("lastname")).show

/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname|      date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
|      bob|       b|2015-01-13|       4|      bob|       b|    23|
|    alice|       a|2015-04-23|      10|    alice|       a|   100|
+---------+--------+----------+--------+---------+--------+------+
*/

The correct solution is to define the join columns as an array of strings Seq("firstname", "lastname"). The output data frame does not have duplicated columns:

left.join(right, Seq("firstname", "lastname")).show

/*
+---------+--------+----------+--------+------+
|firstname|lastname|      date|duration|upload|
+---------+--------+----------+--------+------+
|      bob|       b|2015-01-13|       4|    23|
|    alice|       a|2015-04-23|      10|   100|
+---------+--------+----------+--------+------+
*/
stackoverflowuser2010
  • 38,621
  • 48
  • 169
  • 217
  • 6
    actually the output DF *does* have duplicates using the following; `val joined = sampledDF.join(idsDF, idColumns, "inner")` . and where `idColumns` is a Seq[String] containing the join columns – WestCoastProjects Jul 26 '17 at 20:26
  • 7
    I don't think this works if the names of the columns in the two datasets are different. – sparkonhdfs Apr 17 '18 at 15:24
  • 1
    What to do when out of 4 join exprs, 2 have different columns in both tables but 2 refers to same columns on both tables. rename? – nir Jul 02 '18 at 21:46
  • What do we do when join columns of two datasets are different? – Amar Gajbhiye Jan 18 '19 at 12:05
  • Assume if suppose, **date** column is in right dataframe too with different values. How can i left_outer join with getting the latest date i have among both tales? @sparkonhdfs – CdVr Feb 04 '20 at 08:44
  • What needs to be imported to use Seq? I keep getting NameError: name 'Seq' is not defined – Haha Mar 03 '20 at 16:54
  • 1
    This will not work if columns have null values and its a null comparision – Murari Goswami May 18 '20 at 06:29
  • 1
    What if the common columns on which we join have different names in different dataframes? – Memphis Meng Aug 09 '21 at 01:19
  • What if the common columns on which we join have different names in different dataframes? – Topde Aug 31 '21 at 09:18
35

This is an expected behavior. DataFrame.join method is equivalent to SQL join like this

SELECT * FROM a JOIN b ON joinExprs

If you want to ignore duplicate columns just drop them or select columns of interest afterwards. If you want to disambiguate you can use access these using parent DataFrames:

val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???

a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent 
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))

or use aliases:

// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")

For equi-joins there exist a special shortcut syntax which takes either a sequence of strings:

val usingColumns: Seq[String] = ???

a.join(b, usingColumns)

or as single string

val usingColumn: String = ???

a.join(b, usingColumn)

which keep only one copy of columns used in a join condition.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Instead of select, can I drop the duplicate column? – Neel Feb 07 '16 at 21:12
  • Yes, but only via parents not with aliases. – zero323 Feb 07 '16 at 21:31
  • How about an outer join? Any rows without a match will have a null in one of the table's key columns, but you don't know ahead of time which one to drop. Is there a way to handle that case elegantly? – Darryl Aug 18 '16 at 23:53
  • 3
    @Darryl `coalesce` and drop both. – zero323 Aug 19 '16 at 00:15
  • In the joined dataframe, i want the column name as something other than input table's column name. Is there any way to do this ?. For example : Instead of having the column name as "foo" which is being taken from "b" dataframe, I want to have the column name as "column_new". Something like this sql query : "select b.foo as column_new" – JKC Aug 23 '17 at 08:46
  • Can I do full join in Scala? I tried this way but it always discards the entry if the key is not in both df. So I did it sing spark.sql and then I have KEY column gets redundant. Any suggestion? – Abu Shoeb Jan 30 '18 at 18:28
10

I have been stuck with this for a while, and only recently I came up with a solution what is quite easy.

Say a is

scala> val a  = Seq(("a", 1), ("b", 2)).toDF("key", "vala")
a: org.apache.spark.sql.DataFrame = [key: string, vala: int]

scala> a.show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+
and 
scala> val b  = Seq(("a", 1)).toDF("key", "valb")
b: org.apache.spark.sql.DataFrame = [key: string, valb: int]

scala> b.show
+---+----+
|key|valb|
+---+----+
|  a|   1|
+---+----+

and I can do this to select only the value in dataframe a:

scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+
tintin
  • 1,459
  • 1
  • 10
  • 27
  • 2
    what does " a.columns.map(a(_)) : _* " do? – Nick01 Apr 25 '18 at 03:21
  • 1
    @Nick01 it selects the columns from 'a' data frame (see the answer here: https://stackoverflow.com/questions/39909863/spark-select-with-a-list-of-columns-scala) – Aris Dec 14 '20 at 22:59
7

You can simply use this

df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN")

Here TYPE-OF-JOIN can be

  • left
  • right
  • inner
  • fullouter

For example, I have two dataframes like this:

// df1
word   count1
w1     10   
w2     15  
w3     20

// df2
word   count2
w1     100   
w2     150  
w5     200

If you do fullouter join then the result looks like this

df1.join(df2, Seq("word"),"fullouter").show()

word   count1  count2
w1     10      100
w2     15      150
w3     20      null
w5     null    200
Abu Shoeb
  • 4,747
  • 2
  • 40
  • 45
6

try this,

val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))
Ray
  • 69
  • 1
  • 2
2

This is a normal behavior from SQL, what I am doing for this:

  • Drop or Rename source columns
  • Do the join
  • Drop renamed column if any

Here I am replacing "fullname" column:

Some code in Java:

this
    .sqlContext
    .read()
    .parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day))
    .drop("fullname")
    .registerTempTable("data_original");

this
    .sqlContext
    .read()
    .parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day))
    .registerTempTable("data_v2");

 this
    .sqlContext
    .sql(etlQuery)
    .repartition(1)
    .write()
    .mode(SaveMode.Overwrite)
    .parquet(outputPath);

Where the query is:

SELECT
    d.*,
   concat_ws('_', product_name, product_module, name) AS fullname
FROM
    {table_source} d
LEFT OUTER JOIN
    {table_updates} u ON u.id = d.id

This is something you can do only with Spark I believe (drop column from list), very very helpful!

Thomas Decaux
  • 21,738
  • 2
  • 113
  • 124
2

If anyone is using spark-SQL and wants to achieve the same thing then you can use USING clause in join query.

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._

val df1 = List((1, 4, 3), (5, 2, 4), (7, 4, 5)).toDF("c1", "c2", "C3")
val df2 = List((1, 4, 3), (5, 2, 4), (7, 4, 10)).toDF("c1", "c2", "C4")

df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")

spark.sql("select * from table1  inner join  table2  using (c1, c2)").show(false)

/*
+---+---+---+---+
|c1 |c2 |C3 |C4 |
+---+---+---+---+
|1  |4  |3  |3  |
|5  |2  |4  |4  |
|7  |4  |5  |10 |
+---+---+---+---+
*/
Mohana B C
  • 5,021
  • 1
  • 9
  • 28
1

Best practice is to make column name different in both the DF before joining them and drop accordingly.

df1.columns =[id, age, income]
df2.column=[id, age_group]

df1.join(df2, on=df1.id== df2.id,how='inner').write.saveAsTable('table_name')

will return an error while error for duplicate columns

Try this instead try this:

df2_id_renamed = df2.withColumnRenamed('id','id_2')
df1.join(df2_id_renamed, on=df1.id== df2_id_renamed.id_2,how='inner').drop('id_2')
Vaibhav
  • 2,527
  • 1
  • 27
  • 31
1

Inner Join is default join in spark, Below is simple syntax for it.

leftDF.join(rightDF,"Common Col Nam")

For Other join you can follow the below syntax

leftDF.join(rightDF,Seq("Common Columns comma seperated","join type")

If columns Name are not common then

leftDF.join(rightDF,leftDF.col("x")===rightDF.col("y),"join type")
Manoj Kumar Dhakad
  • 1,862
  • 1
  • 12
  • 26
0

After I've joined multiple tables together, I run them through a simple function to rename columns in the DF if it encounters duplicates. Alternatively, you could drop these duplicate columns too.

Where Names is a table with columns ['Id', 'Name', 'DateId', 'Description'] and Dates is a table with columns ['Id', 'Date', 'Description'], the columns Id and Description will be duplicated after being joined.

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = deDupeDfCols(NamesAndDates, '_')
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

Where deDupeDfCols is defined as:

def deDupeDfCols(df, separator=''):
    newcols = []

    for col in df.columns:
        if col not in newcols:
            newcols.append(col)
        else:
            for i in range(2, 1000):
                if (col + separator + str(i)) not in newcols:
                    newcols.append(col + separator + str(i))
                    break

    return df.toDF(*newcols)

The resulting data frame will contain columns ['Id', 'Name', 'DateId', 'Description', 'Id2', 'Date', 'Description2'].

Apologies this answer is in Python - I'm not familiar with Scala, but this was the question that came up when I Googled this problem and I'm sure Scala code isn't too different.

QA Collective
  • 2,222
  • 21
  • 34