1

I wants to iterate one BIG RDD with small RDD with some additional filter conditions . the below code is working fine but the process is running only with Driver and Not spread-ed across the nodes . So please suggest any other approach ?

val cross = titlesRDD.cartesian(brRDD).cache()
 val matching = cross.filter{ case( x, br) => 
    ((br._1 == "0") && 
   (((br._2 ==((x._4))) &&
    ((br._3 exists (x._5)) || ((br._3).head=="")) 
}

Thanks, madhu

MapReddy Usthili
  • 288
  • 1
  • 7
  • 23
  • How big is the "small" RDD? Could you make it a broadcast variable instead? – mattinbits Aug 04 '15 at 09:34
  • Yes I had tried with Broadcast variable .. same performance.. :( Thought of doing with Iterations and MapPartitions methos like below URL [link] (http://stackoverflow.com/questions/17621596/spark-whats-the-best-strategy-for-joining-a-2-tuple-key-rdd-with-single-key-rd) – MapReddy Usthili Aug 04 '15 at 11:51
  • For starters could you provide information about types of input data and/or example input? How big are both datasets? Also, why do you think that _process is running only with driver_? – zero323 Aug 04 '15 at 18:05

1 Answers1

3

You probably don't want to cache cross. Not caching it will, I believe, let the cartesian product happen "on the fly" as needed for the filter, instead of instantiating the potentially large number of combinations resulting from the cartesian product in memory.

Also, you can do brRDD.filter(_._1 == "0") before doing the cartesian product with titlesRDD, e.g.

val cross = titlesRDD.cartesian(brRRD.filter(_._1 == "0"))

and then modify the filter used to create matching appropriately.

Jason Scott Lenderman
  • 1,908
  • 11
  • 14