0

I have two dataframes as below. I'm trying to find the intersection of two dataframes based on either of the two columns, not only both of them.

So In this case, I want to return dataframe C, which has df A row 1 (as A row1 col1= row one col1 in B), df A row 2(A row 2 Col 2=row 1 Col2 in B) and df A row 4(as Col1 row 2 in B = Col 1 row 4 in A), and row 5 in A. But if I do a intersect of A and B, it will only return row 5 in A, as that's a match of both columns. How do I do this? Many thanks.Let me know if I'm not explaining the question very well.

A:

     Col1    Col2 
     1         2    
     2         3
     3         7 
     5         4
     1         3   

B:

    Col1    Col2 
     1         3    
     5         1

C:

          1         2    
          2         3
          5         4
          1         3    
user4046073
  • 821
  • 4
  • 18
  • 39

2 Answers2

1

With the following data:

val df1 = sc.parallelize(Seq(1->2, 2->3, 3->7, 5->4, 1->3)).toDF("col1", "col2")
val df2 = sc.parallelize(Seq(1->3, 5->1)).toDF("col1", "col2")

Then you can join your datasets with a or condition:

val cols = df1.columns
df1.join(df2, cols.map(c => df1(c) === df2(c)).reduce(_ || _) )
   .select(cols.map(df1(_)) :_*)
   .distinct
   .show

+----+----+
|col1|col2|
+----+----+
|   2|   3|
|   1|   2|
|   1|   3|
|   5|   4|
+----+----+

The join condition is generic and would work for any number of columns. The code maps each column to an equality between that column in df1 and the same one in df2 cols.map(c => df1(c) === df2(c)). The the reduce takes the logical or of all these equalities, which is what you want. The select is there because otherwise the columns of both dataframes would be kept. Here I simply keep the ones from df1. I also added a distinct in case several lines of df2 would match a line of df1 or vice versa. Indeed, you may get a cartesian product.

Note that this method does not need any collection to the driver so it will work regardless of the size of the datasets. Yet, if df2 is small enough to be collected to the driver and braodcasted, you would get faster results with a method like this:

// to each column name, we map the set of values in df2.
val valueMap = df2.rdd
    .flatMap(row => cols.map(name => name -> row.getAs[Any](name)))
    .distinct
    .groupByKey
    .mapValues(_.toSet)
    .collectAsMap

//we create a udf that looks up in valueMap
val filter = udf((name : String, value : Any) => 
                     valueMap(name).contains(value))

//Finally we apply the filter.
df1.where( cols.map(c => filter(lit(c), df1(c))).reduce(_||_))
   .show

With this method, no shuffling of df1 and no cartesian product. If df2 is small, this is definitely the way to go.

Oli
  • 9,766
  • 5
  • 25
  • 46
  • Thanks, but one problem is that if I want to find the intersection based on 3 column, would I need to join using many combinations of those 3 cols? – user4046073 Jan 12 '18 at 10:25
  • Do you need the rows such that col1 == col2 ? Is this why you are talking about many combinations? – Oli Jan 12 '18 at 13:50
  • 1
    I edited my answer so that it is more generic and works with any number of columns. – Oli Jan 12 '18 at 20:53
  • I mean say there are 3 columns, the join could be based on either of those 3 columns, A.col 1= B.col1, or A.col2 = B.col2 or A.col3= B.col3 etc. Could you explain a bit more about your map reduce and select? Thanks! – user4046073 Jan 12 '18 at 22:29
  • That's exactly what this code does. I added more detail in the answer. Don't hesitate if that's still not clear. – Oli Jan 13 '18 at 08:26
  • 1
    I also added a version that's going to be super fast in case df2 in much smaller than df1. – Oli Jan 13 '18 at 09:35
0

You should perform two join operations individually on each of the join columns, and then perform a union of the two resulting Dataframes:

val dfA = List((1,2),(2,3),(3,7),(5,4),(1,3)).toDF("Col1", "Col2")
val dfB = List((1,3),(5,1)).toDF("Col1", "Col2")
val res1 = dfA.join(dfB, dfA.col("Col1")===dfB.col("Col1"))
val res2 = dfA.join(dfB, dfA.col("Col2")===dfB.col("Col2"))
val res = res1.union(res2)
suj1th
  • 1,781
  • 2
  • 14
  • 22
  • `(1,3)` would appear twice. Also, wouldn't this lead to a sort of cross join if the values of `col1` and `col2` are not unique? – philantrovert Jan 12 '18 at 08:44