1

I have two RDD each in the format of

{string, HashMap[long,object]}

I want to perform a join operation on them such that the hashmap of the same key get merge in scala.

RDD1-> {string1,HashMap[{long a,object},{long b,object}]
RDD2-> {string1,HashMap[{long c,object}]

After joining the two RDD, it should be like

RDD->{string1,HashMap[{long a,object},{long b,object},{long c,object}]

Any help will be appreciated, also I am kind of new to scala and spark.

Y0gesh Gupta
  • 2,184
  • 5
  • 40
  • 56
  • 1
    how are you planning to deal with keys that are in both hashmaps? – cybye Mar 26 '15 at 06:25
  • the value corresponding to both keys will be same in the hashmap, in case there are keys with same value in both hashmap so even if they overlap their shouldn't be an issue(they shouldnt have duplicate entry). – Y0gesh Gupta Mar 26 '15 at 06:30

2 Answers2

4

Update: a simpler way is just to take the union and then reduce by key:

(rdd1 union rdd2).reduceByKey(_++_)

Older solution, just for reference. This can also be done by cogroup, which collects values for keys in one or both RDDs (whereas join will omit values that only have a key in one of the original RDDs). See the ScalaDoc.

We then concatenate the lists of values using ++ to form a single list of values, and finally reduce the values (Maps) to a single Map.

The last two steps can be combined into a single mapValues operation:

Using this data...

val rdd1 = sc.parallelize(List("a"->Map(1->"one", 2->"two")))
val rdd2 = sc.parallelize(List("a"->Map(3->"three")))

...in the spark shell:

val x = (rdd1 cogroup rdd2).mapValues{ case (a,b) => (a ++ b).reduce(_++_)}

x foreach println

> (a,Map(1 -> one, 2 -> two, 3 -> three))
DNA
  • 42,007
  • 12
  • 107
  • 146
1

You can do by joining the two RDDs and applying a merge function to the tuples of maps:

def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] Return an RDD containing all pairs of elements with matching keys in this and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in this and (k, v2) is in other. Performs a hash join across the cluster.

def mapValues[U](f: (V) ⇒ U): RDD[(K, U)] Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.

assume, there is a function merge like discussed in Best way to merge two maps and sum the values of same key?

def [K] merge(a:K,b:K):K = ???

could be like

def merge(a:Map[K,V],b:Map[K,V]) = a ++ b

given that, the RDDs can be joined first

val joined = RDD1.join(RDD2) 

and then mapped

val mapped = joined.mapValues( v => merge(v._1,v._2))

The result is an RDD with (Key, the merged Map)..

Community
  • 1
  • 1
cybye
  • 1,171
  • 6
  • 13
  • This code is merging the maps with the common keys in both rdds, but is filtering out the rest. I worked it around by doing the union of this and the orginal RDD, still it helped in fixing the solution thanks a lot. :) – Y0gesh Gupta Mar 26 '15 at 07:49