27

I've defined two tables like this:

 val tableName = "table1"
    val tableName2 = "table2"

    val format = new SimpleDateFormat("yyyy-MM-dd")
      val data = List(
        List("mike", 26, true),
        List("susan", 26, false),
        List("john", 33, true)
      )
    val data2 = List(
        List("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
        List("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
      )

      val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
      val rdd2 = sparkContext.parallelize(data2).map(Row.fromSeq(_))
      val schema = StructType(Array(
        StructField("name", StringType, true),
        StructField("age", IntegerType, true),
        StructField("isBoy", BooleanType, false)
      ))
    val schema2 = StructType(Array(
        StructField("name", StringType, true),
        StructField("grade", StringType, true),
        StructField("howold", IntegerType, true),
        StructField("hobby", StringType, true),
        StructField("birthday", DateType, false)
      ))

      val df = sqlContext.createDataFrame(rdd, schema)
      val df2 = sqlContext.createDataFrame(rdd2, schema2)
      df.createOrReplaceTempView(tableName)
      df2.createOrReplaceTempView(tableName2)

I'm trying to build query to return rows from table1 that doesn't have matching row in table2. I've tried to do it using this query:

Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold AND table2.name IS NULL AND table2.howold IS NULL

but this just gives me all rows from table1:

List({"name":"john","age":33,"isBoy":true}, {"name":"susan","age":26,"isBoy":false}, {"name":"mike","age":26,"isBoy":true})

How to make this type of join in Spark efficiently?

I'm looking for an SQL query because I need to be able to specify columns which to compare between two tables, not just compare row by row like it is done in other recommended questions. Like using subtract, except etc.

sergeda
  • 2,061
  • 3
  • 20
  • 43
  • 1
    Possible duplicate of [Spark: subtract two DataFrames](http://stackoverflow.com/questions/29537564/spark-subtract-two-dataframes) – James Tobin Apr 03 '17 at 15:59
  • based on your edit and comment on my answer, I think you're looking for: http://stackoverflow.com/questions/29537564/spark-subtract-two-dataframes notably the comment from @Interfector on the first answer – James Tobin Apr 03 '17 at 16:00
  • RDD#cogroup should work – Tom Jul 06 '18 at 08:19
  • Another example https://stackoverflow.com/questions/57579185/how-to-subtract-dataframes-using-subset-of-columns-in-apache-spark – GPopat Aug 21 '19 at 16:46
  • 1
    Please try below query. "Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold WHERE table2.name IS NULL AND table2.howold IS NULL" – Ankit Jindal Jan 17 '20 at 12:12

4 Answers4

52

You can use the "left anti" join type - either with DataFrame API or with SQL (DataFrame API supports everything that SQL supports, including any join condition you need):

DataFrame API:

df.as("table1").join(
  df2.as("table2"),
  $"table1.name" === $"table2.name" && $"table1.age" === $"table2.howold",
  "leftanti"
)

SQL:

sqlContext.sql(
  """SELECT table1.* FROM table1
    | LEFT ANTI JOIN table2
    | ON table1.name = table2.name AND table1.age = table2.howold
  """.stripMargin)

NOTE: it's also worth noting that there's a shorter, more concise way of creating the sample data without specifying the schema separately, using tuples and the implicit toDF method, and then "fixing" the automatically-inferred schema where needed:

import spark.implicits._
val df = List(
  ("mike", 26, true),
  ("susan", 26, false),
  ("john", 33, true)
).toDF("name", "age", "isBoy")

val df2 = List(
  ("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
  ("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
).toDF("name", "grade", "howold", "hobby", "birthday").withColumn("birthday", $"birthday".cast(DateType))
GodMan
  • 2,561
  • 2
  • 24
  • 40
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
8

You can do it with the built in function except (I would have used the code you provided, but you didn't include the imports, so I couldn't just c/p it :( )

val a = sc.parallelize(Seq((1,"a",123),(2,"b",456))).toDF("col1","col2","col3")
val b= sc.parallelize(Seq((4,"a",432),(2,"t",431),(2,"b",456))).toDF("col1","col2","col3")

scala> a.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a| 123|
|   2|   b| 456|
+----+----+----+


scala> b.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   4|   a| 432|
|   2|   t| 431|
|   2|   b| 456|
+----+----+----+

scala> a.except(b).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a| 123|
+----+----+----+
James Tobin
  • 3,070
  • 19
  • 35
  • 3
    I'm looking for an SQL query because I need to be able to specify columns which to compare between two tables, not just compare row by row – sergeda Apr 03 '17 at 15:44
1

you can use left anti.

dfRcc20.as("a").join(dfClientesDuplicados.as("b")
  ,col("a.eteerccdiid")===col("b.eteerccdiid")&&
    col("a.eteerccdinr")===col("b.eteerccdinr")
  ,"left_anti")
Andy Quiroz
  • 833
  • 7
  • 8
-1

In SQL, you can simply your query to below (not sure if it works in SPARK)

Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold where table2.name IS NULL 

This will return all rows of table1 for which join failed

Maciej Kowalski
  • 25,605
  • 12
  • 54
  • 63
  • this will not work. the where clause is applied before the join operation so will not have the effect desired. – Hafthor Dec 19 '19 at 21:13
  • 1
    @Hafthor This will work and usualy optimizer generates the same left anti join plan. – valex Feb 27 '20 at 14:47