1

I want to join two sets by applying broadcast variable. I am trying to implement the first suggestion from Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?

val emp_newBC = sc.broadcast(emp_new.collectAsMap())
val joined = emp.mapPartitions({ iter =>
      val m = emp_newBC.value
      for {
        ((t, w)) <- iter
        if m.contains(t)
      } yield ((w + '-' + m.get(t).get),1)
    }, preservesPartitioning = true)

However as mentioned here: broadcast variable fails to take all data I need to use collect() rather than collectAsMAp(). I tried to adjust my code as below:

val emp_newBC = sc.broadcast(emp_new.collect())
val joined = emp.mapPartitions({ iter =>
      val m = emp_newBC.value
      for {
        ((t, w)) <- iter
        if m.contains(t)
        amk = m.indexOf(t)
      } yield ((w + '-' + emp_newBC.value(amk)),1) //yield ((t, w), (m.get(t).get))   //((w + '-' + m.get(t).get),1)
    }, preservesPartitioning = true)

But it seems m.contains(t) does not respond. How can I remedy this?

Thanks in advance.

Community
  • 1
  • 1
mlee_jordan
  • 772
  • 4
  • 18
  • 50

1 Answers1

2

How about something like this?

val emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap)

val joined = emp.mapPartitions(iter => for {
  (k, v1) <- iter
  v2 <- emp_newBC.value.getOrElse(k, Iterable())
} yield (s"$v1-$v2", 1))

Regarding your code... As far as I understand emp_new is a RDD[(String, String)]. When it is collected you get an Array[(String, String)]. When you use

((t, w)) <- iter

t is a String so m.contains(t) will always return false.

Another problem I see is preservesPartitioning = true inside mapPartitions. There a few possible scenarios:

  1. emp is partitioned and you want joined to be partitioned as well. Since you change key from t to some new value partitioning cannot be preserved and resulting RDD has to be repartitioned. If you use preservesPartitioning = true output RDD will end up with wrong partitions.
  2. emp is partitioned but you don't need partitioning for joined. There is no reason to use preservesPartitioning.
  3. emp is not partitioned. Setting preservesPartitioning has no effect.
zero323
  • 322,348
  • 103
  • 959
  • 935
  • val emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap) part still returns missing data. I try with a small dataset and i anticipate 354 rows in my broadcast variable but it returns 312 rows. I think the reason is still valid as stated in the 2nd link i wrote on my post. Isn't it possible to work with only collect()? And yes emp has to be partitioned. Indeed emp has 500 partitions. I do not get it how to repartition new rdd. Shall i repartition joined? – mlee_jordan Sep 22 '15 at 08:10
  • It should cover all values. What do you get when you run `emp_newBC.value.values.map(_.size).sum`? Regarding partitioning do you have partitioner? What is the output from `emp.partitioner`? – zero323 Sep 22 '15 at 12:14
  • To control the size i had checked it through `val oo = emp_newBC.value` and as a result it still showed less number. However when I run your code it works perfectly. Now I wonder if this would be a good approach. ..`joined.repartition(sc.defaultParallelism * 500)` Shall I again partition joined by 500 as it was at the beginning? – mlee_jordan Sep 22 '15 at 13:14
  • If you don't use partitioner there is nothing else to do here. Regarding broadcast map it's should work just fine and it is the only efficient solution here. – zero323 Sep 22 '15 at 13:39
  • I get emp as val emp = sc.textFile("text.txt",500).map(line => (line.split("\t")(5),line.split("\t")(0))).distinct(). But emp.partitioner returns null. And when I monitor UI it do shows 500 partitions. I am a bit confused. If you dont mind i have another small question. Now spark spills and cannot finish the job since emp is too big. Increasing partition number (e.g. from 500 to 5000) has any effect on it? – mlee_jordan Sep 22 '15 at 13:52
  • Yes, increasing number of partitions should help. – zero323 Sep 22 '15 at 13:55