0

rdd1:

(m1,p1)
(m1,p2)
(m1,p3)
(m2,p1)
(m2,p2)
(m2,p3)
(m2,p4)

rdd2:

(m1,p1)
(m1,p2)
(m1,p3)
(m2,p1)
(m2,p2)
(m2,p3)

I have two rdds rdd1 and rdd2 I want to compare both rdds and to print the differences i.e (m2,p4) which is not present in rdd2.

I tried rdd1.substractbykey(rdd2) and rdd1.substract(rdd2) I am not getting any data please assist

Anish Nair
  • 79
  • 2
  • 11
bigdata techie
  • 147
  • 1
  • 11
  • You can use `df1.except(df2)` More explained here : https://stackoverflow.com/questions/29537564/spark-subtract-two-dataframes – user238607 Dec 05 '18 at 17:51

3 Answers3

0

Try this -

rdd1:

(m1,p1) (m1,p2) (m1,p3) (m2,p1) (m2,p2) (m2,p3) (m2,p4)

rdd2:

(m1,p1) (m1,p2) (m1,p3) (m2,p1) (m2,p2) (m2,p3)
Shree
  • 10,835
  • 1
  • 14
  • 36
Jsaw
  • 1
  • 1
0

You can using full outer join in dataframes:

def find_not_null(row):
     if(row['col1'] is None):
         return (row['col3'], row['col4'])
     else:
         return (row['col1'], row['col2'])

diff_rdd = rdd1.toDF(['col1', 'col2']). \
   join(rdd1.toDF(['col3', 'col4']), \ 
        col('col1') == col('col2') and col('col3') == col('col4'), \
        'full_outer'). \
   filter(lambda x: x['col1'] is None or x['col3'] is None).rdd. \
   map(find_not_null)
OmG
  • 18,337
  • 10
  • 57
  • 90
0

If you really need RDDs, than you could get your result using subtract and union.

Assuming that you're interested in differences from the both sides, this will work:

val left = sc.makeRDD(Seq(("m1","p1"), ("m1","p2"), ("m1","p3"), ("m2","p1"), ("m2","p2"), ("m2","p3"), ("m2","p4")))
val right = sc.makeRDD(Seq(("m1","p1"), ("m1","p2"), ("m1","p3"), ("m2","p1"), ("m2","p2"), ("m2","p3"), ("m3","p1")))

val output = left.subtract(right).union(right.subtract(left))
output.collect() // Array[(String, String)] = Array((m2,p4), (m3,p1))

On the other hand, if don't mind keeping "full outer join" in memory, you could achieve the same using cogroup:

val output = left.cogroup(right).flatMap { case (k, (i1, i2)) => 
  val s1 = i1.toSet
  val s2 = i2.toSet
  val diff = (s1 diff s2) ++ (s2 diff s1)
  diff.toList.map(k -> _)
}
output.collect() // Array[(String, String)] = Array((m2,p4), (m3,p1))
ChernikovP
  • 471
  • 1
  • 8
  • 18