0

I am new to spark.

Here is my code:

val Data = sc.parallelize(List(
      ("I", "India"), 
      ("U", "USA"), 
      ("W", "West"))) 

val DataArray = sc.broadcast(Data.collect)

val FinalData = DataArray.value

Here FinalData is of Array[(String, String)] type. But I want data to be in the form of RDD[(String, String)] type.

Can I convert FinalData to RDD[(String, String)] type.

More Detail:

I want to join Two RDD So to optimize join condition(For performance point of view) I am broadcasting small RDD to all cluster so that data shuffling will be less.(Indirectly performance will get improved) So for all this I am writting something like this:

//Big Data
val FirstRDD = sc.parallelize(List(****Data of first table****))

//Small Data
val SecondRDD = sc.parallelize(List(****Data of Second table****)) 

So defintely I will broadcast Small Data set(means SecondRDD)

val DataArray = sc.broadcast(SecondRDD.collect)

val FinalData = DataArray.value

//Here it will give error that

val Join = FirstRDD.leftOuterJoin(FinalData)

Found Array required RDD

That's why I am looking for Array to RDD conversion.

Darshan
  • 81
  • 2
  • 4
  • 8

2 Answers2

3

The real is problem is that you are creating a Broadcast variable, by collecting the RDD (notice that this action converts the RDD into an Array). So, what I'm saying is that you already have an RDD, which is Data, and this variable has exactly the same values as FinalData, but in the form you want RDD[(String, String)].

You can check this in the following output.

Data: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[2] at parallelize at <console>:32
DataArray: org.apache.spark.broadcast.Broadcast[Array[(String, String)]] = Broadcast(1)
FinalData: Array[(String, String)] = Array((I,India), (U,USA), (W,West))

Although, I don't understand your approach You just need to parallelize the Broadcast's value.

// You already have this data stored in `Data`, so it's useless repeat this process.
val DataCopy = sc.parallelize(DataArray.value)

EDIT

After reading your question again, I believe the problem is almost the same. You are trying to join an RDD with a Broadcast and that's not allowed. However, if you read the documentation you may notice that it's possible to join both RDDs (see code below).

val joinRDD = FirstRDD.keyBy(_._1).join(SecondRDD.keyBy(_._1))
Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
  • But After broadcast I have to again convert FinalData to RDD. As FinalData is of Array type. – Darshan Sep 21 '16 at 12:05
  • 1
    @Darshan perhaps you're not noticing the fact that `Data` has type `RDD[(String, String)]` - exactly what you seem to be looking for... – Tzach Zohar Sep 21 '16 at 12:11
  • There is one function that i want to use. But that function is in RDD not in Array.That's why – Darshan Sep 21 '16 at 12:11
  • @Tzach Zohar Yes `Data` has type `RDD[(String, String)]` – Darshan Sep 21 '16 at 12:12
  • And what's the difference between `Data` and the RDD you're trying to create? – Tzach Zohar Sep 21 '16 at 12:13
  • @Tzach Zohar But I want to use broadcast also. So after broadcast I will get variable as broadcasted type. I will get value of Broadcasted variable using `DataArray.value` And this will return as `Array` But i want it in `RDD`.So that future operation I can perform on `RDD`. – Darshan Sep 21 '16 at 12:16
  • 1
    So you _intentionally_ want to shift the data back and forth 4 times between driver application and cluster nodes, just to end up with an _identical_ value? – Tzach Zohar Sep 21 '16 at 12:26
  • Please check Newly add code. It will give brief idea about what I am looking for – Darshan Sep 21 '16 at 12:43
2

Broadcasts are indeed useful to improve performance of a JOIN between a large RDD and a smaller one. When you do that, broadcast (along with map or mapPartitions) replaces the join, it's not used in a join, and therefore in no way you'll need to "transform a broadcast into an RDD".

Here's how it would look:

val largeRDD = sc.parallelize(List(
  ("I", "India"),
  ("U", "USA"),
  ("W", "West")))

val smallRDD = sc.parallelize(List(
  ("I", 34),
  ("U", 45)))

val smaller = sc.broadcast(smallRDD.collectAsMap())

// using "smaller.value" inside the function passed to RDD.map ->
// on executor side. Broadcast made sure it's copied to each executor (once!)
val joinResult = largeRDD.map { case (k, v) => (k, v, smaller.value.get(k)) }

joinResult.foreach(println)
// prints:
// (I,India,Some(34))
// (W,West,None)
// (U,USA,Some(45))

See a similar solution (using mapPartitions) which might be more efficient here.

Community
  • 1
  • 1
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85