1

Suppose I have two Spark SQL dataframes A and B. I want to subtract the items in B from the items in A while preserving duplicates from A.

I followed the instructions to use DataFrame.except() that I found in another StackOverflow question ("Spark: subtract two DataFrames"), but that function removes all duplicates from the original dataframe A.

As a conceptual example, if I have two dataframes:

words     = [the, quick, fox, a, brown, fox]
stopWords = [the, a]

then I want the output to be, in any order:

words - stopWords = [quick, brown, fox, fox]

I observed that the RDD function subtract() preserves the duplicates, but the Spark-SQL function except() removes duplicates in the resulting data frame. I don't understand why the except() output produces only unique values.

Here is a complete demonstration:

// ---------------------------------------------------------------
// EXAMPLE USING RDDs
// ---------------------------------------------------------------
var wordsRdd = sc.parallelize(List("the", "quick", "fox", "a", "brown", "fox"))
var stopWordsRdd = sc.parallelize(List("a", "the"))

var wordsWithoutStopWordsRdd = wordsRdd.subtract(stopWordsRdd)
wordsWithoutStopWordsRdd.take(10)
// res11: Array[String] = Array(quick, brown, fox, fox)

// ---------------------------------------------------------------
// EXAMPLE USING DATAFRAMES
// ---------------------------------------------------------------
var wordsDf = wordsRdd.toDF()
var stopWordsDf = stopWords.toDF()
var wordsWithoutStopWordsDf = wordsDf.except(stopWordsDf)

wordsWithoutStopWordsDf.show(10)
// +-----+
// |value|
// +-----+
// |  fox|
// |brown|
// |quick|
// +-----+

I want to preserve duplicates because I am generating frequency tables.

Any help would be appreciated.

Community
  • 1
  • 1
stackoverflowuser2010
  • 38,621
  • 48
  • 169
  • 217

2 Answers2

1
val words = sc.parallelize(List("the", "quick", "fox", "a", "brown", "fox")).toDF("id")
val stopwords = sc.parallelize(List("a", "the")).toDF("id")


words.join(stopwords, words("id") === stopwords("id"), "left_outer")
     .where(stopwords("id").isNull)
     .select(words("id")).show()

The output is:

+-----+
|   id|
+-----+
|  fox|
|  fox|
|brown|
|quick|
+-----+
Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
1

You can try to use exceptAll:

https://hyukjin-spark.readthedocs.io/en/latest/reference/api/pyspark.sql.DataFrame.exceptAll.html

For example:

val words = sc.parallelize(List("the", "quick", "fox", "a", "brown", "fox")).toDF("id")
val stopwords = sc.parallelize(List("a", "the")).toDF("id")

words.exceptAll(stopwords).show()

Output should be:

+-----+
|   id|
+-----+
|quick|
|  fox|
|  fox|
|brown|
+-----+