33

How do Java 8 parallel streams behave on a thrown exception in the consuming clause, for example in forEach handling? For example, the following code:

final AtomicBoolean throwException = new AtomicBoolean(true);
IntStream.range(0, 1000)
    .parallel()
    .forEach(i -> {
        // Throw only on one of the threads.
        if (throwException.compareAndSet(true, false)) {
            throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
        });

Does it stop the handled elements immediately? Does it wait for the already started elements to finish? Does it wait for all the stream to finish? Does it start handling stream elements after the exception is thrown?

When does it return? Immediately after the exception? After all/part of the elements were handled by the consumer?

Do elements continue being handled after the parallel stream threw the exception? (Found a case where this happened).

Is there a general rule here?

EDIT (15-11-2016)

Trying to determine if the parallel stream returns early, I found that it's not determinate:

@Test
public void testParallelStreamWithException() {
    AtomicInteger overallCount = new AtomicInteger(0);
    AtomicInteger afterExceptionCount = new AtomicInteger(0);
    AtomicBoolean throwException = new AtomicBoolean(true);

    try {
        IntStream.range(0, 1000)
            .parallel()
            .forEach(i -> {
                overallCount.incrementAndGet();
                afterExceptionCount.incrementAndGet();
                try {
                    System.out.println(i + " Sleeping...");
                    Thread.sleep(1000);
                    System.out.println(i + " After Sleeping.");
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // Throw only on one of the threads and not on main thread.
                if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
                    System.out.println("Throwing exception - " + i);
                    throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
                }
            });
        Assert.fail("Should not get here.");
    }
    catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }
    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());
}

Late return when throwing not from the main thread. This caused a lot of new elements to be handled way after the exception was thrown. On my machine, about 200 elements were handled after the exception was thrown. BUT, not all 1000 elements were handled. So what's the rule here? Why more elements were handled even though the exception was thrown?

Early return when removing the not (!) sign, causing the exception to be thrown in the main thread. Only the already started elements finished processing and no new ones were handled. Returning early was the case here. Not consistent with the previous behavior.

What am I missing here?

MC Emperor
  • 22,334
  • 15
  • 80
  • 130
AlikElzin-kilaka
  • 34,335
  • 35
  • 194
  • 277

1 Answers1

10

When an exception is thrown in one of the stages, it does not wait for other operations to finish, the exception is re-thrown to the caller. That is how ForkJoinPool handles that.

In contrast findFirst for example when run in parallel, will present the result to the caller only after ALL operations have finished processing (even if the result is known before the need to finish of all operations).

Put in other words : it will return early, but will leave all the running tasks to finish.

EDIT to answer the last comment

This is very much explained by Holger's answer (link in comments), but here are some details.

1) When killing all BUT the main thread, you are also killing all the tasks that were supposed to be handled by these threads. So that number should actually be more around 250 as there are 1000 tasks and 4 Threads, I assume this returns 3?:

int result = ForkJoinPool.getCommonPoolParallelism();

Theoretically there are 1000 tasks, there are 4 threads, each supposed to handle 250 tasks, then you kill 3 of them meaning 750 tasks are lost. There are 250 tasks left to execute, and ForkJoinPool will span 3 new threads to execute these 250 left tasks.

A few things you can try, change your stream like this (making the stream not sized):

IntStream.generate(random::nextInt).limit(1000).parallel().forEach

This time, there would be many more operations ending, because the initial split index is unknown and chosen by some other strategy. What you could also try is change this :

 if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {

to this:

 if (!Thread.currentThread().getName().equals("main")) {

This time you would always kill all threads besides main, until a certain point, where no new threads will be created by ForkJoinPool as the task is too small to split, thus no need for other threads. In this case even less tasks would finish.

2) Your second example, when you actually kill the main thread, as the way code is, you will not see the actual running of other threads. Change it :

    } catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }

    // give some time for other threads to finish their work. You could play commenting and de-commenting this line to see a big difference in results. 
    TimeUnit.SECONDS.sleep(60);

    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());
Eugene
  • 117,005
  • 15
  • 201
  • 306
  • Can you base it on some documentation? – AlikElzin-kilaka Nov 14 '16 at 14:18
  • 3
    @AlikElzin-kilaka not really, I don't think this is documented. I remember this by reading some other SO question that made a reference to this bug : https://bugs.openjdk.java.net/browse/JDK-8164690 – Eugene Nov 14 '16 at 14:20
  • 2
    @AlikElzin-kilaka There is also [this](http://mail.openjdk.java.net/pipermail/core-libs-dev/2016-August/042972.html) discussion thread on the core-libs-dev mailing list which led to the JBS bug mentioned by Eugene. – Stefan Zobel Nov 14 '16 at 14:47
  • 2
    [here](http://stackoverflow.com/questions/39261067/what-is-the-expected-behavior-when-a-java-8-stream-throw-a-runtimeexception/39275425#comment65883637_39261067) is an older reference. – Holger Nov 14 '16 at 15:47
  • I've added a code example of an exception causing the parallel stream to not return early. – AlikElzin-kilaka Nov 15 '16 at 11:23
  • @Eugene I don't get your edit. Are you trying to change my test to suit some notion? I'm trying to show something else entirely. I'm trying to show the non deterministic nature of throwing an exception in a parallel stream. Sometimes it fails late and sometimes early. Sometimes it continues to handle **new** elements after the exception is thrown and sometimes it doesn't. – AlikElzin-kilaka Nov 16 '16 at 13:49
  • @AlikElzin-kilaka I was trying to explain what actually happens by suggesting to change the code to understand *why this happens*. It seems that you are missing the vital point on how ForkJoinPool works. – Eugene Nov 16 '16 at 13:52
  • 1. You're saying I killed the main thread. I didn't. I just threw an exception in the parallel stream consumer. 2. You're saying that the code returns early. I showed a case where it doesn't. 3. You're suggesting to change the type of the stream by using limit. I don't want to. I'm trying to find the general rule. **Can we keep some high level discussion but with some explanation?** I found a case where it doesn't return early. I also found a case where the elements were continued to processed even after the parallel stream "finished/returned" with the exception. – AlikElzin-kilaka Nov 16 '16 at 14:09
  • My edit was to try to get a thorough answer. Not a link to "go read somewhere else" – AlikElzin-kilaka Nov 16 '16 at 14:10