1

I have a two pair RDDs in spark like this

rdd1 = (1 -> [4,5,6,7])
   (2 -> [4,5])
   (3 -> [6,7])


rdd2 = (4 -> [1001,1000,1002,1003])
   (5 -> [1004,1001,1006,1007])
   (6 -> [1007,1009,1005,1008])
   (7 -> [1011,1012,1013,1010])

I would like to combine them to look like this.

joinedRdd = (1 -> [1000,1001,1002,1003,1004,1005,1006,1007,1008,1009,1010,1011,1012,1013])
        (2 -> [1000,1001,1002,1003,1004,1006,1007])
        (3 -> [1005,1007,1008,1009,1010,1011,1012,1013])

Can someone suggest me how to do this.

Thanks Dilip

dcmovva
  • 155
  • 1
  • 3
  • 10

1 Answers1

0

With Scala Spark API, this would be

import org.apache.spark.SparkContext._ // enable PairRDDFunctions 
val rdd1Flat = rdd1.flatMapValues(identity).map(_.swap)
val rdd2Flat = rdd2.flatMapValues(identity)

rdd1Flat.join(rdd2Flat).values.distinct.groupByKey.collect

Result of this operation is

Array[(Int, Iterable[Int])] = Array(
  (1,CompactBuffer(1001, 1011, 1006, 1002, 1003, 1013, 1005, 1007, 1009, 1000, 1012, 1008, 1010, 1004)), 
  (2,CompactBuffer(1003, 1004, 1007, 1000, 1002, 1001, 1006)), 
  (3,CompactBuffer(1008, 1009, 1007, 1011, 1005, 1010, 1013, 1012)))

The approach proposed by Gabor will not work, since Spark doesn't support RDD operations performed within other RDD operation. You'll get a Java NPE thrown by a worker when trying to access the SparkContext available on the driver only.

Community
  • 1
  • 1
Marcel Krcah
  • 602
  • 8
  • 13