I have a list that I have broadcast. I want to do this process so that the combination of two lists would result in the same size of the resultant list for all the elements in the RDD
.
diffList
sees if lct
key has a corresponding value. If it does, then the value of the key would be taken and if it doesn't then its elements would be compared against givenList
and all those keys which are not there would be given the value 0.0
.
appendList
zips the lctl
and diffList
and applies relevant transformations.
appendList
should have all the same size of the elements in the list.
It runs fine when I run this code locally, then I have the same size of each list (in each element of the RDD
). When I do the same operation on a cluster, it gives me the different size of the list.
I just wanted to know if I have broadcast the list correctly or there is any other problem with the logic?
val w = sqlContext.read.format("org.elasticsearch.spark.sql").load(weeks)
val agg = w.groupBy("id", "type").agg(sum($"d").as("dur"))
val idc = agg.rdd.map(_.toString).map(x => x.replace("[","")).map(x => x.replace("]","")).map(_.split(",")).map(x => (((x(4),(x(2),x(1))),x(3))))
val grp = idc.map(x => (x._1,x._2.toDouble)).reduceByKey(_+_)
val ust = grp.map(x => (x._1._1,(x._1._2,x._2)))
val lct = ust.groupByKey.map(x => (x._1,x._2.toList))
val lctl = lct.map(x => (x._1,x._2.map(y => y._1),x._2.map(y => y._2)))
val givenList = List(("a","z"), ("c","g"), ("c","t"), ("d","h"), ("y","e"), ("f","l"),("g","r"))
val listBroadcast = sc.broadcast(givenList)
val diffList = lct.map(x => x._2.map( y => y._1)).map(x => ((listBroadcast.value.diff(x)),List.fill(listBroadcast.value.diff(x).size)(0.0)))
val appendList = lctl.zip(diffList).map(x => (x._1._1,x._1._2 ++ x._2._1,x._1._3 ++ x._2._2)).map(x => (x._1,x._2 zip x._3))