73

In Spark version 1.2.0 one could use subtract with 2 SchemRDDs to end up with only the different content from the first one

val onlyNewData = todaySchemaRDD.subtract(yesterdaySchemaRDD)

onlyNewData contains the rows in todaySchemRDD that do not exist in yesterdaySchemaRDD.

How can this be achieved with DataFrames in Spark version 1.3.0?

Ric S
  • 9,073
  • 3
  • 25
  • 51
Interfector
  • 1,868
  • 1
  • 23
  • 43

7 Answers7

106

According to the Scala API docs, doing:

dataFrame1.except(dataFrame2)

will return a new DataFrame containing rows in dataFrame1 but not in dataframe2.

ZygD
  • 22,092
  • 39
  • 79
  • 102
Eric Eijkelenboom
  • 6,943
  • 2
  • 25
  • 29
  • 5
    What if I need to perform the exception not on the whole row, but only a column? E.g. `dataFrame1` rows except those where the `primary_key` does not appear in `dataFrame2`? (looks like `subtractByKey` for RDDs, but preferably for a dataframe. – Katya Willard Aug 24 '16 at 21:12
  • 6
    @KatyaHandler You can use a `LEFT JOIN` coupled with a `IS NULL` on the joined dataframe's key. The sql for this is something like: `SELECT * FROM df1 LEFT JOIN df2 ON df1.id = df2.id WHERE df2.id IS NULL` – Interfector Feb 17 '17 at 07:19
  • 1
    @KatyaHandler: I'm also looking for the same solution.. could you please tell me how did you acheive this using DF, can you please update the answer. – Shankar Sep 25 '17 at 12:19
  • 15
    @KatyaHandler you can do that with a `LEFT ANTI` join. `dataFrame1.join(dataFrame2, "key_col", "left_anti")` – harthur Oct 17 '17 at 20:32
  • except() method is working fine when ran on small set of input data locally . But when I ran over S3 bucket data, except is not working properly . https://stackoverflow.com/questions/49188415/spark-dataframe-except-method-issue – RaAm Mar 15 '18 at 05:23
  • 1
    Is there an implementation in Pyspark? – Jan33 Jul 24 '20 at 10:47
  • @Jan33 [below](https://stackoverflow.com/a/67700696/7465462) you can see my answer in Pyspark – Ric S May 31 '21 at 09:24
75

In PySpark it would be subtract

df1.subtract(df2)

or exceptAll if duplicates need to be preserved

df1.exceptAll(df2)
ZygD
  • 22,092
  • 39
  • 79
  • 102
Tej
  • 861
  • 7
  • 5
17

From Spark 1.3.0, you can use join with 'left_anti' option:

df1.join(df2, on='key_column', how='left_anti')

These are Pyspark APIs, but I guess there is a correspondent function in Scala too.

Ric S
  • 9,073
  • 3
  • 25
  • 51
13

I tried subtract, but the result was not consistent. If I run df1.subtract(df2), not all lines of df1 are shown on the result dataframe, probably due distinct cited on the docs.

exceptAll solved my problem: df1.exceptAll(df2)

ZygD
  • 22,092
  • 39
  • 79
  • 102
Arthur Julião
  • 849
  • 1
  • 14
  • 29
  • 2
    New in Spark version 2.4. True, but I don't think it works for prior versions! Thanks for sharing. – Rudr Nov 30 '18 at 19:38
  • You can duplicate that in versions 2+ doing `DataFrame(df1._jdf.exceptAll(df2._jdf), sqlContext)` – Arthur Julião Dec 01 '18 at 01:57
  • 3
    This is just a principle of SQL: set operations like subtract, intersect, etc. are just that: set operations. Hence, they will convert tables to a set first (which has no duplicate entries) – information_interchange Jul 29 '19 at 14:45
  • 1
    @BdEngineer , could not find except function on docs, is it PySpark or Spark Scala? ( https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.subtract ) – Arthur Julião May 11 '20 at 14:15
  • @ArthurJulião For pyspark use `subtract`. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.subtract – sourabhxiii May 29 '20 at 09:30
3

For me, df1.subtract(df2) was inconsistent. Worked correctly on one dataframe, but not on the other. That was because of duplicates. df1.exceptAll(df2) returns a new dataframe with the records from df1 that do not exist in df2, including any duplicates.

ZygD
  • 22,092
  • 39
  • 79
  • 102
Vidya
  • 71
  • 5
1

From Spark 2.4.0 - exceptAll

data_cl = reg_data.exceptAll(data_fr)
ZygD
  • 22,092
  • 39
  • 79
  • 102
s510
  • 2,271
  • 11
  • 18
1

Ric S's answer is the best solution in some situation like below.

From Spark 1.3.0, you can use join with 'left_anti' option:

df1.join(df2, on='key_column', how='left_anti')

These are Pyspark APIs, but I guess there is a correspondent function in Scala too.

This is very useful in some situation. Suppose. I have two dataframes

dataframe1
-----------------------------------------
|id | category                           |
-----------------------------------------
|1  | [{"type":"sport","name","soccer"}] |
-----------------------------------------
    
dataframe2
-----------------------------------------------------------------------------
|id | category                                                               |
-----------------------------------------------------------------------------
|1  | [{"type":"sport","name","soccer"}, {"type":"player","name":"ronaldo"}] |
-----------------------------------------------------------------------------

here it is not possible to use exceptAll() or substract()

NickyPatel
  • 503
  • 4
  • 11