-2

Copying example from this question: As a conceptual example, if I have two dataframes:

words     = [the, quick, fox, a, brown, fox]
stopWords = [the, a]

then I want the output to be, in any order:

words - stopWords = [quick, brown, fox, fox]

ExceptAll can do this in 2.4 but I cannot upgrade. The answer in the linked question is specific to a dataframe:

words.join(stopwords, words("id") === stopwords("id"), "left_outer")
     .where(stopwords("id").isNull)
     .select(words("id")).show()

as in you need to know the pkey and the other columns.

Can anyone come up with an answer that will work on any dataframe?

user
  • 352
  • 3
  • 13

2 Answers2

0

Here is an implementation for you all. I have tested in Spark 2.4.2, it should work for 2.3 too (not 100% sure)

    val df1 = spark.createDataset(Seq("the","quick","fox","a","brown","fox")).toDF("c1")
    val df2 = spark.createDataset(Seq("the","a")).toDF("c1")

    exceptAllCustom(df1, df2, Seq("c1")).show()


  def exceptAllCustom(df1 : DataFrame, df2 : DataFrame, pks : Seq[String]): DataFrame = {
    val notNullCondition = pks.foldLeft(lit(0==0))((column,cName) => column && df2(cName).isNull)
    val joinCondition = pks.foldLeft(lit(0==0))((column,cName) => column && df2(cName)=== df1(cName))
    val result = df1.join(df2, joinCondition, "left_outer")
       .where(notNullCondition)

    pks.foldLeft(result)((df,cName) => df.drop(df2(cName)))
  }

Result -

+-----+
|   c1|
+-----+
|quick|
|  fox|
|brown|
|  fox|
+-----+
Salim
  • 2,046
  • 12
  • 13
  • Is there a solution for when pkeys are not known? This method should be able to be applied on any 2 dfs, like the ExceptAll in 2.4 – user Jan 24 '20 at 00:13
  • that is possible, read all columns from dataframe's schema and use it. – Salim Jan 24 '20 at 00:15
  • This is not working for me. Try `df1 = [1,2,2,3,4,5]` and `df2 = [1,3,4,5]`. Result should be `[2,2]` but this solution returns `[ ]`. – user Jan 24 '20 at 18:57
  • Ok, it works in 2.4 but in 2.3 you get the error: `Resolved attribute(s) c1#8 missing from c1#3 in operator !Filter` Due to https://issues.apache.org/jira/browse/SPARK-14948 – user Jan 24 '20 at 19:56
  • It does not work if there is more than 1 column. df1: ```+---+------+ |id |name | +---+------+ |1 |Beth | |2 |Alan | |3 |Rachel| |1 |Beth | |2 |Alan | +---+------+``` df2: ```+---+------+ |id |name | +---+------+ |3 |Rachel| +---+------+``` Expected: ```+---+----+ |id |name| +---+----+ |2 |Alan| |2 |Alan| |1 |Beth| |1 |Beth| +---+----+``` Actual: ```+---+----+----+ |id |name|id | +---+----+----+ |1 |Beth|null| |2 |Alan|null| |1 |Beth|null| |2 |Alan|null| +---+----+----+``` – user Jan 24 '20 at 21:57
  • Yes that's the problem, there shouldn't be the extra column. Why is it not being removed when you do `pks.foldLeft(result)((df,cName) => df.drop(df2(cName)))`? – user Jan 24 '20 at 22:13
  • This also does not account for null values. e.g. `df1 = [(null, cat), (1, mouse)]` and `df2 = [(null, cat)]` should return `res = [(1,mouse)]` but it returns `(null,cat)` as well – user Jan 24 '20 at 22:56
0

Turns out it's easier to do df1.except(df2) and then join the results with df1 to get all the duplicates.

Full code:

def exceptAllCustom(df1: DataFrame, df2: DataFrame): DataFrame = {
    val except = df1.except(df2)

    val columns = df1.columns
    val colExpr: Column = df1(columns.head) <=> except(columns.head)
    val joinExpression = columns.tail.foldLeft(colExpr) { (colExpr, p) =>
        colExpr && df1(p) <=> except(p)
    }

    val join = df1.join(except, joinExpression, "inner")

    join.select(df1("*"))
}
user
  • 352
  • 3
  • 13