1

When applying broadcast variable with collectasmap(), not all the values are included by broadcast variable. e.g.

    val emp = sc.textFile("...text1.txt").map(line => (line.split("\t")(3),line.split("\t")(1))).distinct()
    val emp_new = sc.textFile("...text2.txt").map(line => (line.split("\t")(3),line.split("\t")(1))).distinct()
    emp_new.foreach(println)

    val emp_newBC = sc.broadcast(emp_new.collectAsMap())
    println(emp_newBC.value)

When i checked the values within emp_newBC I saw that not all the data from emp_new appear. What am i missing?

Thanks in advance.

mlee_jordan
  • 772
  • 4
  • 18
  • 50

1 Answers1

1

The problem is that emp_new is a collection of tuples, while emp_newBC is a broadcasted map. If you are collecting map, the duplicate keys are being removed and therefore you have less data. If you want to get back a list of all tuples, use

val emp_newBC = sc.broadcast(emp_new.collect())

TheMP
  • 8,257
  • 9
  • 44
  • 73
  • Thanks for your answer Niemand. collect() gets all the data. However i cannot use it easily as collectAsMap(). When employing collectAsMap() i could use it like: val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) val joined = rdd2.mapPartitions({ iter => val m = rdd1Broadcast.value for { ((t, w), u) <- iter if m.contains(t) } yield ((t, w), (u, m.get(t).get)) }, preservesPartitioning = true) ....but when using collect somehow i cannot use contains function properly. do you have any suggestion to adjust the code above? – mlee_jordan Sep 21 '15 at 14:20