0

I have a list of heavy tasks that should be run in parallel. I like the java Stream API and don't want to touch Executors directly, for that reason I've written the following code:

Set<Runnable> tasks = Set.of(heavyTask(),…);
tasks.parallelStream().forEach(Runnable::run);

Quite often (not always!) my concurrency tests fails during “heavy task” execution. OK, probably it's a race condition. I've rewritten the code using Executors directly:

try {
    Set<Callable<Object>> tasks = Set.of(heavyTask(),…);
    Executors.newFixedThreadPool(4)
        .invokeAll(tasks).forEach(future->{
            try {
                future.get();
            } catch (InterruptedException | ExecutionException ignore) {
        }
    });
} catch (InterruptedException ignore) {
}

The problem with heavy tasks is gone. I'm really confused. I thought that parallelStream() uses Executors under the hood and it's almost the same. Is there any difference between .parallelStream() and ExecutorService? Or maybe forEach isn't correct termination operation in the first code example?

Mark Rotteveel
  • 100,966
  • 191
  • 140
  • 197
Volodya Lombrozo
  • 2,325
  • 2
  • 16
  • 34
  • 1
    We might have more insight if you were more specific about the nature of your concurrency tests and the failures observed in them. – John Bollinger Dec 06 '22 at 21:07
  • 2
    `invokeAll(tasks)` does already invoke all callables and return when all of them have been completed. There is no point in chaining a `forEach` with an action that does nothing but call `get` without using the result. Besides that, try to use `List.of(…)` instead of `Set.of(…)`. That might give you better parallel processing. – Holger Dec 09 '22 at 13:31

2 Answers2

0

parallelStream uses the fork/join common pool which is an Executor, so you're right, it's almost the same.

The fork/join pool is used for all kinds of things so maybe some other, unrelated task was interfering. By declaring the Executor yourself, you are guaranteeing 4 dedicated threads.

forEach is a fine terminal operation for the first example. The one to avoid would be forEachOrdered, which breaks the parallelism.

Michael
  • 41,989
  • 11
  • 82
  • 128
0

ExecutorService vs parallelStream()

From the first sight they are interchangeable approaches since parallelStream() uses ForkJoinPool that uses Executors in turn. The syntax sugar if you want. But it isn’t always the true. The ForkJoinPool (and therefore parallelStream()) since Java 9 returns Executor with the ClassLoader that differ from the main ClassLoader from which you probably forked. It could lead to strange problems: I had the library that wasn’t loaded by the ClassLoader from ForkJoinPool and I got ClassNotFoundException, one of the possible solutions is:

final ClassLoader cl = Thread.currentThread().getContextClassLoader();
tasks.parallelStream().forEach(task -> {
    Thread.currentThread().setContextClassLoader(cl);
    task.get();
});

But as for me - it doesn’t look so well and by that reason, for my particular case, I decided to use pure Executors without any side effects. You can find more solutions of the problem in that SO question. I hope this helps someone.

Volodya Lombrozo
  • 2,325
  • 2
  • 16
  • 34
  • 1
    What exactly is `task`? In your question, you had a stream of `Runnable` instances, which do not have a `get()` method. Besides that, depending on the thread’s context loader is a self-made problem. Java doesn’t use that loader, so it’s some code explicitly using `currentThread().getContextClassLoader()` instead of using the intended loader in the first place… – Holger Dec 09 '22 at 13:26
  • @Holger thank you for your valuable reply. Do you mean that instead of `currentThread().getContextClassLoader()` it's better to use some kind of a particular class loader? And I don't clearly understand why using of thread’s context loader is a self-made problem. Do you have any sources where I can read more about it? – Volodya Lombrozo Dec 11 '22 at 19:31
  • 1
    Ordinary code dependencies are [resolved by using the defining class loader of the class containing the reference](https://docs.oracle.com/javase/specs/jvms/se17/html/jvms-5.html#jvms-5.4.3.1). The same applies to `Class.forName(String)`, without a class loader argument; it will use the caller’s defining class loader. The only way to have the code using the thread’s context loader, is by explicitly querying and using it, which is mostly a convention among 3rd party libraries. If you do not want a dependency to the thread executing the code, do not use the thread’s context loader. – Holger Dec 12 '22 at 08:09