How can I use "ssc.sparkContext()" in foreachRDD of spark streaming?
If I use "ssc.sparkContext()"
as it is in foreachRDD (JAVA) (basically, something like ssc.sparkContext().broadcast(map)),
then I get "Task not serializable" error.
If I use "(new JavaSparkContext(rdd.context())).broadcast(map)"
then there is no problem.
So, basically is "ssc.sparkContext()"
equivalent to "(new JavaSparkContext(rdd.context()))"?
And if I use "(new JavaSparkContext(rdd.context())).broadcast(map)"
will the broadcast variable i.e. associated "map" get distributed to all executors in SparkContext.
Code is given below: Here, "bcv.broadcastVar = (new JavaSparkContext(rdd.context())).broadcast(map);" works but "bcv.broadcastVar = ssc.sparkContext.broadcast(map);" does not work
words.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
if (rdd != null) {
System.out.println("Hello World - words - SSC !!!"); // Gets printed on Driver
if (stat.data_changed == 1) {
stat.data_changed = 0;
bcv.broadcastVar.unpersist(); // Unpersist BC variable
bcv.broadcastVar = (new JavaSparkContext(rdd.context())).broadcast(map); // Re-broadcast same BC variable with NEW data
}
}
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
public void call(Iterator<String> items) throws Exception {
System.out.println("words.foreachRDD.foreachPartition: CALLED ..."); // Gets called on Worker/Executor
Integer index = 1;
String lastKey = "";
Integer lastValue = 0;
while (true) {
String key = "A" + Long.toString(index);
Integer value = bcv.broadcastVar.value().get(key); // Executor Consumes map
if (value == null) break;
lastKey = key;
lastValue = value;
index++;
}
System.out.println("Executor BC: key/value: " + lastKey + " = " + lastValue);
return;
}
});
return null;
}
});