11

I get the error message SPARK-5063 in the line of println

val d.foreach{x=> for(i<-0 until x.length)
      println(m.lookup(x(i)))}    

d is RDD[Array[String]] m is RDD[(String, String)] . Is there any way to print as the way I want? or how can i convert d from RDD[Array[String]] to Array[String] ?

G_cy
  • 994
  • 3
  • 13
  • 28

2 Answers2

18

SPARK-5063 relates to better error messages when trying to nest RDD operations, which is not supported.

It's a usability issue, not a functional one. The root cause is the nesting of RDD operations and the solution is to break that up.

Here we are trying a join of dRDD and mRDD. If the size of mRDD is large, a rdd.join would be the recommended way otherwise, if mRDD is small, i.e. fits in memory of each executor, we could collect it, broadcast it and do a 'map-side' join.

JOIN

A simple join would go like this:

val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6))
val flat = rdd.flatMap(_.toSeq).keyBy(x=>x)
val res = flat.join(map).map{case (k,v) => v}

If we would like to use broadcast, we first need to collect the value of the resolution table locally in order to b/c that to all executors. NOTE the RDD to be broadcasted MUST fit in the memory of the driver as well as of each executor.

Map-side JOIN with Broadcast variable

val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6)))
val bcTable = sc.broadcast(map.collectAsMap)
val res2 = rdd.flatMap{arr => arr.map(elem => (elem, bcTable.value(elem)))} 
maasg
  • 37,100
  • 11
  • 88
  • 115
  • both two strategy will give out error message. the first gives out not found value: keyBy and `missing arguments for method identity in object Predef; follow this method with '_' if you want to treat it as a partially applied function` .The second one gives out type mismatch, elem should be String but it is Array[String] now. – G_cy Apr 23 '15 at 18:44
  • here is the problem. The first, join one works. But the second not. It will give error message `java.util.NoSuchElementException: key not found: null` when i try to do foreach(println). But I can get result with first method, so it should be able to find the keys. – G_cy Apr 23 '15 at 21:04
  • I tested it on the spark shell - what's the issue? At least, do you get the idea? – maasg Apr 23 '15 at 21:07
  • I get the idea. and a little confused with the broadcast concept. – G_cy Apr 23 '15 at 21:30
  • @G_cy the broadcast is an optimization of serialization. With serialization, Spark would need to serialize the map with each task dispatched to the executors. With broadcast, the same can be achieve by serializing the map only once per executor, but in general its "data that you ship with the job to the executors" – maasg Apr 23 '15 at 21:44
-3

This RDD lacks a SparkContext. It could happen in the following cases:

RDD transformations and actions are NOT invoked by the driver,

but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation

Ali Akbarpour
  • 958
  • 2
  • 18
  • 35