1

I was playing around with spark and I am getting stuck with something that seems foolish.

Let's say we have two RDD:

rdd1 = {(1, 2), (3, 4), (3, 6)}

rdd2 = {(3, 9)}

if I am doing rdd1.substrackByKey(rdd2) , I will get {(1, 2)} wich is perfectly fine. But I also want to save the rejected values {(3,4),(3,6)} to another RDD, is there a prebuilt function in spark or an elegant way to do this?

Please keep in mind that I am new with Spark, any help will be appreciated, thanks.

Will
  • 2,057
  • 1
  • 22
  • 34

5 Answers5

1

As Rohan suggests, there is no (to the best of my knowledge) standard API call to do this. What you want to do can be expressed as Union - Intersection.

Here is how you can do this on spark:

val r1 = sc.parallelize(Seq((1,2), (3,4), (3,6)))
val r2 = sc.parallelize(Seq((3,9)))

val intersection = r1.map(_._1).intersection(r2.map(_._1))
val union = r1.map(_._1).union(r2.map(_._1))

val diff = union.subtract(intersection)

diff.collect()
> Array[Int] = Array(1) 

To get the actual pairs:

val d = diff.collect() 
r1.union(r2).filter(x => d.contains(x._1)).collect
marios
  • 8,874
  • 3
  • 38
  • 62
  • I am not sure I love this in terms of performance, but it should work well enough as long as `diff` is not huge. Otherwise the `d.contains` closure will get pretty large. – marios Nov 06 '15 at 17:23
1

I think I claim this is slightly more elegant:

val r1 = sc.parallelize(Seq((1,2), (3,4), (3,6)))
val r2 = sc.parallelize(Seq((3,9)))

val r3 = r1.leftOuterJoin(r2)
val subtracted = r3.filter(_._2._2.isEmpty).map(x=>(x._1, x._2._1))
val discarded = r3.filter(_._2._2.nonEmpty).map(x=>(x._1, x._2._1))

//subtracted: (1,2)
//discarded: (3,4)(3,6)

The insight is noticing that leftOuterJoin produces both the discarded (== records with a matching key in r2) and remaining (no matching key) in one go.

It's a pity Spark doesn't have RDD.partition (in the Scala collection sense of split a collection into two depending on a predicate) or we could caclculate subtracted and discarded in one pass

The Archetypal Paul
  • 41,321
  • 20
  • 104
  • 134
1

You can try

val rdd3 = rdd1.subtractByKey(rdd2)
val rdd4 = rdd1.subtractByKey(rdd3)

But you won't be keeping the values, just running another subtraction.

gjin
  • 860
  • 1
  • 14
  • 28
0

Unfortunately, I don't think there's an easy way to keep the rejected values using subtractByKey(). I think one way you get your desired result is through cogrouping and filtering. Something like:

val cogrouped = rdd1.cogroup(rdd2, numPartitions)
def flatFunc[A, B](key: A, values: Iterable[B]) : Iterable[(A, B)] = for {value <- values} yield (key, value)
val res1 = cogrouped.filter(_._2._2.isEmpty).flatMap { case (key, values) => flatFunc(key, values._1) }
val res2 = cogrouped.filter(_._2._2.nonEmpty).flatMap { case (key, values) => flatFunc(key, values._1) }

You might be able to borrow the work done here to make the last two lines look more elegant.

When I run this on your example, I see:

scala> val rdd1 = sc.parallelize(Array((1, 2), (3, 4), (3, 6)))
scala> val rdd2 = sc.parallelize(Array((3, 9)))
scala> val cogrouped = rdd1.cogroup(rdd2)
scala> def flatFunc[A, B](key: A, values: Iterable[B]) : Iterable[(A, B)] = for {value <- values} yield (key, value)
scala> val res1 = cogrouped.filter(_._2._2.isEmpty).flatMap { case (key, values) => flatFunc(key, values._1) }
scala> val res2 = cogrouped.filter(_._2._2.nonEmpty).flatMap { case (key, values) => flatFunc(key, values._1) }
scala> res1.collect()
...
res7: Array[(Int, Int)] = Array((1,2))
scala> res2.collect()
...
res8: Array[(Int, Int)] = Array((3,4), (3,6))
Community
  • 1
  • 1
Rohan Aletty
  • 2,432
  • 1
  • 14
  • 20
0

First use substractByKey() and then subtract

val rdd1 = spark.sparkContext.parallelize(Seq((1,2), (3,4), (3,5)))
val rdd2 = spark.sparkContext.parallelize(Seq((3,10)))

val result = rdd1.subtractByKey(rdd2)
result.foreach(print) // (1,2)

val rejected = rdd1.subtract(result)
rejected.foreach(print) // (3,5)(3,4)
thedevd
  • 683
  • 11
  • 26