2

I was working on parallel stream in java. My code is pretty simple, a map contains a two keys. I want to call a function forEach key parallely. Now for one of the key function threw exception.

I am not able to understand why function didn't execute for second key.

map.put(1,1);     
map.put(2,2);
map.entrySet().parallelStream()
                    .map(batch -> function(batch.getKey()))
                    .collect(Collectors.toList());


f(key)
      if(key==1) throw new Exception();
      System.out.print("printing: "key)
Rohit
  • 23
  • 1
  • 5
  • that's how streams work... if there is any exception, your stream processing basically stops/breaks/dies... as you use a parallel stream there might be some chance that sometimes you could also process the other entries... but that's nothing you can control... – Roland Nov 22 '18 at 10:03
  • if you want that all entries are processed, you need to write your code so that every exception within the stream processing is catched... but error handling might not always be so trivial then... – Roland Nov 22 '18 at 10:05

2 Answers2

2

If your Stream has only two elements, it's likely the stream framework will not create two threads to process those two elements. Therefore, once an exception is thrown for the first element, the second won't be processed.

Actually, when I tried similar code to yours, but without the exception, I found out the 2 elements are actually processed on the main thread:

public static int function(int key) {
    System.out.print("printing: " + key + " " + Thread.currentThread ().getName () + " ");
    return key;
}

public static void main (java.lang.String[] args)
{
    HashMap<Integer,Integer> map = new HashMap<>();
    for (int i = 1 ; i <= 2 ; i ++) {
        map.put(i,i);     
    }

    map.entrySet().parallelStream()
                  .map(batch -> function(batch.getKey()))
                  .collect(Collectors.toList());
}

This output:

printing: 1 main printing: 2 main

Only when the Stream is larger, the work is split into multiple threads, so the exception thrown for one element won't affect the processing of the rest.

If I increase the size of the Stream to 5, I get:

printing: 1 ForkJoinPool.commonPool-worker-1 printing: 2 ForkJoinPool.commonPool-worker-1 printing: 3 ForkJoinPool.commonPool-worker-1 printing: 4 main printing: 5 main

So now the work is divided into two threads.

And if I throw a RuntimeException for the first element, I get:

printing: 4 main printing: 5 main Exception in thread "main" java.lang.RuntimeException

As you can see, the elements that were assigned to the thread where the exception was thrown were not processed, but the elements assigned to the other thread were processed.

Eran
  • 387,369
  • 54
  • 702
  • 768
  • Make sense now. I actually tested by creating 5 threads around 10 times and every time the allocation of task remained same. Do you have any documention link to this behavior of Java? – Rohit Nov 22 '18 at 11:48
  • @Rohit no I don't. I suggest you google it. – Eran Nov 22 '18 at 11:53
0

When you use parallelStream it

  • starts a background thread pool of processors-1 in size, if not already running, this takes some time. From the computer's point of view, a long time.
  • it adds tasks to a work queue.
  • the current thread joins the pool of threads working on these tasks.

Note: the main thread can do all this long before the background pool starts.

In short, passing work to background threads is not instantaneous, especially if they are not already running.

Here is an example of how most work a running process can do before a new process starts.

static volatile int counter = 0;

static void countTo(int n) {
    for (int i = 0; i < n; i++)
        counter = i;
}

public static void main(String[] args) {
    countTo(10_000_000);
    counter = 0;
    new Thread(() -> System.out.println("counter=" + A.counter)).start();
    countTo(10_000_000);
}

on my machine prints something like

counter=21017
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130