When encountering a RuntimeException
during stream processing, should the stream processing abort? Should it first finish? Should the exception be rethrown on Stream.close()
? Is the exception rethrown as is or is it wrapped? The JavaDoc of Stream
and package java.util.stream has nothing to say about it.
All questions on Stackoverflow that I have found seem to be focused on how-to wrap a checked exception from within a functional interface in order to make their code compile. Indeed, blog posts and similar articles on Internet all focus on the same caveat. This is not my concern.
I know from my own experience that the processing of sequential streams will abort as soon as a RuntimeException
is thrown and this exception is rethrown as is. This is the same for parallel stream only if the exception was thrown by the client's thread.
However, example code put here demonstrate that if the exception was thrown by a "worker thread" (= not the same thread as the one invoking the terminal operation) during a parallel stream processing, then this exception will forever be lost and the stream processing completes.
The example code will first run an IntStream
in parallel. Then a "normal" Stream
in parallel.
The example will show that,
1) IntStream
has no problem aborting parallel processing if a RuntimeException is encountered. The exception is re-thrown, wrapped in another RuntimeException.
2) Stream
does not play as nice. In fact, client thread will never see a trace of the RuntimeException thrown. The stream does not only finish processing; more elements than what limit()
specified will be processed!
In the example code, IntStream
is generated using IntStream.range(). The "normal" Stream
has no notion of a "range" and is instead made up of 1:s, but Stream.limit() is called to limit the stream to one billion elements.
Here's another twist. The example code that produce the IntStream does something like this:
IntStream.range(0, 1_000_000_000).parallel().forEach(..)
Change that to a generated stream just like the second example in the code:
IntStream.generate(() -> 1).limit(1_000_000_000).parallel().forEach(..)
The outcome for this IntStream is the same: Exception is wrapped and rethrown, the processing aborts. But, the second stream will now also wrap and rethrow the exception and not process more elements than the limit! Thus: Changing how the first stream is produced have a side-effect on how the second stream behave. To me, this is very odd.
JavaDoc of ForkJoinPool.invoke() and ForkJoinTask
says that exceptions are rethrown and this is what I would have expected from a parallel stream.
Background
I encountered this "problem" when processing elements in a parallel stream taken from Collection.stream().parallel()
(I haven't verified the behavior of Collection.parallelStream()
but it should be the same). What happened was that a "worker thread" crashed and then went silently away while all other threads completed the stream successfully. My app uses a default exception handler that write the exception to a log file. But not even this log file was created. The thread and his exception simply disappeared. Since I need to abort as soon as a runtime exception is caught, one alternative is to write code that leak this exception to other workers making them unwilling to proceed if an exception has been thrown by any other thread. Of course, this does not guarantee that the stream implementation simply keep on spawning new threads trying to complete the stream. So I will probably end up not using parallel streams and instead do "normal" concurrent programming using a thread pool/executor.
This show that the problem of lost runtime exceptions is not isolated to streams generated by Stream.generate()
or streams using Stream.limit()
. And bottom line is that I would love to know what .. is the expected behavior?