8

I have two DataFrames: a and b. This is how they look like:

a
-------
v1 string
v2 string

roughly hundreds of millions rows


b
-------
v2 string

roughly tens of millions rows

I would like to keep rows from DataFrame a where v2 is not in b("v2").

I know I could use left join and filter where right side is null or SparkSQL with "not in" construction. I bet there is better approach though.

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
devopslife
  • 668
  • 1
  • 9
  • 21
  • 1
    I've posted an answer, but join+filter should work quite well too! I think most of the work from join+filter is unavoidable in any solution. – Daniel Darabos Feb 15 '16 at 00:01
  • Yeah, actually SparkSQL worked very fast. Also - it's not duplicate - I needed negative filter. – devopslife Feb 15 '16 at 22:31
  • see http://stackoverflow.com/questions/29537564/spark-subtract-two-dataframes – cuz Oct 14 '16 at 10:13

3 Answers3

3

You can achieve that using the except method of Dataset, wich "Returns a new Dataset containing rows in this Dataset but not in another Dataset"

Javier Alba
  • 381
  • 1
  • 3
  • 11
1

Use PairRDDFunctions.subtractByKey:

def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]

Return an RDD with the pairs from this whose keys are not in other.

(There are variants that offer control over the partitioning. See the docs.)

So you would do a.rdd.map { case (v1, v2) => (v2, v1) }.subtractByKey(b.rdd).toDF.

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • Perhaps a pure DataFrame-based solution exists as well? I don't use DataFrames much, sorry. But it shouldn't be too painful to jump back to RDDs, use `subtractByKey` and go back to DataFrames. – Daniel Darabos Feb 15 '16 at 00:00
  • 1
    You could use [except](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@except(other:org.apache.spark.sql.Dataset[T]):org.apache.spark.sql.Dataset[T]) – Javier Alba Jun 15 '17 at 14:21
  • Ah, `except` is the perfect answer! Want to post it as a separate answer? – Daniel Darabos Jun 18 '17 at 06:46
  • sure, I've posted it as an answer – Javier Alba Jun 19 '17 at 08:49
1

Consider your dataframe a looks like below.

+----+
|col1|
+----+
|  v1|
|  v2|
+----+

Consider your dataframe b looks like below.

+----+
|col1|
+----+
|  v2|
+----+



APPROACH 1:
-------------------

You can use dataframe's join method and use the type of join as left_anti to find out the values that are in dataframe a but not in dataframe b. The code is given below :

a.as('a).join(b.as('b),$"a.col1" === $"b.col1","left_anti").show()

Please find the result below :

enter image description here



APPROACH 2:
-------------------

You can use sql which is similar to Sql server/Oracle etc to do this. For this, first you have to register your dataframe as temp table (which will reside in spark's memory) and then write the sql on top of that table.

a.registerTempTable("table_a")
b.registerTempTable("table_b")
spark.sql("select * from table_a a where not exists(select 1 from table_b b where a.col1=b.col1)").show()

Please find the result below :

enter image description here

Sarath Subramanian
  • 20,027
  • 11
  • 82
  • 86