2

I have the following Java Spark code:

stream.foreachRDD(rdd -> {
    //do some operations
    List<String> jsonList = new ArrayList<String>();
    rdd.foreach(msg -> {//Kafka messages
         jsonList.Add(msg.value());
    });

    writeJsons(jsonList);//jsonList size is 0
}

I want to iterate for each message, add the message to my List and do some logic with my Json list.

I'm very new in Spark and I'm trying to understand why after the rdd.foreach loop the jsonList size is 0. How does Spark share the List between the nodes?

What should I change in my code if I want to add all Json meesages to list and then do with the json list my logic?

Shaido
  • 27,497
  • 23
  • 70
  • 73
Ya Ko
  • 509
  • 2
  • 4
  • 19
  • In my opinion, the first part is the same as https://stackoverflow.com/questions/23394286/modify-collection-inside-a-spark-rdd-foreach but here Ya Ko is also asking for a way around it, which was not provided in the other post. – Oli Jul 05 '18 at 08:45

1 Answers1

0

The detailed reason why it does not work is provided in this post Modify collection inside a Spark RDD foreach. Basically when you reference a local object (jsonList) within a foreach of an RDD, the object is serialized to each worker. It is never serialized back to the driver though. Therefore in your situation, when you call add from a worker, you modify a copy of your object and not the original which is why the original remains empty.

An alternative would be to use Spark to compute the list and then collect it to the driver.

List<String> jsonList = rdd.map(msg -> msg.value()).collect();
Oli
  • 9,766
  • 5
  • 25
  • 46