8

Banging my head a little with this one, and I suspect the answer is very simple. Given two dataframes, I want to filter the first where values in one column are not present in a column of another dataframe.

I would like to do this without resorting to full-blown Spark SQL, so just using DataFrame.filter, or Column.contains or the "isin" keyword, or one of the join methods.

val df1 = Seq(("Hampstead", "London"), 
              ("Spui", "Amsterdam"), 
              ("Chittagong", "Chennai")).toDF("location", "city")
val df2 = Seq(("London"),("Amsterdam"), ("New York")).toDF("cities")

val res = df1.filter(df2("cities").contains("city") === false)
// doesn't work, nor do the 20 other variants I have tried

Anyone got any ideas?

ZygD
  • 22,092
  • 39
  • 79
  • 102
Chondrops
  • 728
  • 1
  • 4
  • 14
  • try below code, it will work :- val df3 = Seq(("Hampstead", "London"), ("Spui", "Amsterdam"), ("Chittagong", "Chennai")).toDF("location", "city") val df4 = Seq(("London"),("Amsterdam"), ("New York")).toDF("cities") df3.where(df3("city").isInCollection(df4.map(_.getString(0)).collect())).show() – Ankit Tripathi Dec 30 '20 at 10:39

2 Answers2

14

I've discovered that I can solve this using a simpler method - it seems that an antijoin is possible as a parameter to the join method, but the Spark Scaladoc does not describe it:

import org.apache.spark.sql.functions._

val df1 = Seq(("Hampstead", "London"), 
              ("Spui", "Amsterdam"), 
              ("Chittagong", "Chennai")).toDF("location", "city")
val df2 = Seq(("London"),("Amsterdam"), ("New York")).toDF("cities")

df1.join(df2, df1("city") === df2("cities"), "leftanti").show

Results in:

+----------+-------+ 
|  location|   city| 
+----------+-------+ 
|Chittagong|Chennai| 
+----------+-------+  

P.S. thanks for the pointer to the duplicate - duly marked as such

Chondrops
  • 728
  • 1
  • 4
  • 14
3

If you are trying to filter a DataFrame using another, you should use join (or any of its variants). If what you need is to filter it using a List or any data structure that fits in your master and workers you could broadcast it, then reference it inside the filter or where method.

For instance I would do something like:

import org.apache.spark.sql.functions._

val df1 = Seq(("Hampstead", "London"), 
              ("Spui", "Amsterdam"), 
              ("Chittagong", "Chennai")).toDF("location", "city")
val df2 = Seq(("London"),("Amsterdam"), ("New York")).toDF("cities")

df2.join(df1, joinExprs=df1("city") === df2("cities"), joinType="full_outer")
   .select("city", "cities")
   .where(isnull($"cities"))
   .drop("cities").show()
Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
  • 1
    Ok, but which join would achieve the result I want? I believe I should be looking at an anti-join, but this is not an option according to the Scaladoc. Also, I agree that I can use filter on a List (... something like df.filter(($"col" isin ) === false), but then how can I turn the Column I want to reference into a List? – Chondrops Oct 25 '16 at 18:07