0

How can I use "ssc.sparkContext()" in foreachRDD of spark streaming?

If I use "ssc.sparkContext()" as it is in foreachRDD (JAVA) (basically, something like ssc.sparkContext().broadcast(map)), then I get "Task not serializable" error.

If I use "(new JavaSparkContext(rdd.context())).broadcast(map)" then there is no problem.

So, basically is "ssc.sparkContext()" equivalent to "(new JavaSparkContext(rdd.context()))"?

And if I use "(new JavaSparkContext(rdd.context())).broadcast(map)" will the broadcast variable i.e. associated "map" get distributed to all executors in SparkContext.

Code is given below: Here, "bcv.broadcastVar = (new JavaSparkContext(rdd.context())).broadcast(map);" works but "bcv.broadcastVar = ssc.sparkContext.broadcast(map);" does not work

            words.foreachRDD(new Function<JavaRDD<String>, Void>() {
                    @Override
                    public Void call(JavaRDD<String> rdd) throws Exception {
                            if (rdd != null) {
                                    System.out.println("Hello World - words - SSC !!!"); // Gets printed on Driver
                                    if (stat.data_changed == 1) {
                                            stat.data_changed = 0;
                                            bcv.broadcastVar.unpersist(); // Unpersist BC variable
                                            bcv.broadcastVar = (new JavaSparkContext(rdd.context())).broadcast(map); // Re-broadcast same BC variable with NEW data
                                    }
                            }

                            rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
                                    @Override
                                    public void call(Iterator<String> items) throws Exception {
                                            System.out.println("words.foreachRDD.foreachPartition: CALLED ..."); // Gets called on Worker/Executor
                                            Integer index = 1;
                                            String lastKey = "";
                                            Integer lastValue = 0;
                                            while (true) {
                                                    String key = "A" + Long.toString(index);
                                                    Integer value = bcv.broadcastVar.value().get(key); // Executor Consumes map
                                                    if (value == null) break;
                                                    lastKey = key;
                                                    lastValue = value;
                                                    index++;
                                            }
                                            System.out.println("Executor BC: key/value: " + lastKey + " = " + lastValue);
                                            return;
                                    }
                            });

                            return null;
                    }
            });
sunillp
  • 983
  • 3
  • 13
  • 31

0 Answers0