0

I have a list that I have broadcast. I want to do this process so that the combination of two lists would result in the same size of the resultant list for all the elements in the RDD.

diffList sees if lct key has a corresponding value. If it does, then the value of the key would be taken and if it doesn't then its elements would be compared against givenList and all those keys which are not there would be given the value 0.0.

appendList zips the lctl and diffList and applies relevant transformations.

appendList should have all the same size of the elements in the list. It runs fine when I run this code locally, then I have the same size of each list (in each element of the RDD). When I do the same operation on a cluster, it gives me the different size of the list.

I just wanted to know if I have broadcast the list correctly or there is any other problem with the logic?

val w = sqlContext.read.format("org.elasticsearch.spark.sql").load(weeks)
val agg = w.groupBy("id", "type").agg(sum($"d").as("dur"))
val idc = agg.rdd.map(_.toString).map(x => x.replace("[","")).map(x => x.replace("]","")).map(_.split(",")).map(x => (((x(4),(x(2),x(1))),x(3))))
val grp = idc.map(x => (x._1,x._2.toDouble)).reduceByKey(_+_)
val ust = grp.map(x => (x._1._1,(x._1._2,x._2)))
val lct = ust.groupByKey.map(x => (x._1,x._2.toList))
val lctl = lct.map(x => (x._1,x._2.map(y => y._1),x._2.map(y => y._2)))

val givenList = List(("a","z"), ("c","g"), ("c","t"), ("d","h"), ("y","e"), ("f","l"),("g","r"))    
val listBroadcast = sc.broadcast(givenList)

val diffList = lct.map(x => x._2.map( y => y._1)).map(x => ((listBroadcast.value.diff(x)),List.fill(listBroadcast.value.diff(x).size)(0.0)))

val appendList = lctl.zip(diffList).map(x => (x._1._1,x._1._2 ++ x._2._1,x._1._3 ++ x._2._2)).map(x => (x._1,x._2 zip x._3))
stefanobaghino
  • 11,253
  • 4
  • 35
  • 63
grammeler
  • 13
  • 4
  • 2
    I've not analyzed logic of application, but broadcasting logic is correct - broadcasat on driver, use on nodes – T. Gawęda Feb 12 '18 at 13:19
  • Please start with [Reproducible example](https://stackoverflow.com/q/48427185/6910411) / [mcve]. Majority of the code doesn't seem to be relevant (not minimal) and yet it is not reproducible (unless we try to backtrack the types just to get something runnable). – zero323 Feb 12 '18 at 13:26

0 Answers0