0

I have a list of tasks. Each task is independent of each other (they do not use results from each other).

When having 1000 tasks and using a sequential stream to process these tasks..

tasks.forEach(task->{
            // long running task
            task.run();
            System.out.println("Thread: " + Thread.currentThread().getName());
        });

..then, the second task is running AFTER the first task and so forth. The loop is running in blocking and sequential mode (second task is only done after first task is finished).

What is the best way to process each task in parallel?

Is this the best way?

tasks.parallelStream().forEach(task->{
            // long running task
            task.run();
            System.out.println("Thread: " + Thread.currentThread().getName());
        });

According to Should I always use a parallel stream when possible?, it should be avoided to use parallel streams. As in my case, these tasks are independent of each other, I do not need the synchronization overhead which comes by using parallelStream(). However, there is no option to disable the synchronization overhead when using parallelStream(). Or?

Is there a better way for my use case than parallelStream()?

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
nimo23
  • 5,170
  • 10
  • 46
  • 75
  • 3
    Why not submit your tasks to an `ExecutorService`? Then you can also specifiy how much threads should be used for parallel processing. – Glains Nov 21 '19 at 21:47
  • To clarify: "synchronization overhead" just means *it's expensive to do the things necessary to run multiple things in parallel*. It's not about "need", it's just *the cost you must pay to parallelize*. Parallelizing is only worthwhile if the *savings* from parallel execution outweigh the *costs* of setting up parallel execution – MyStackRunnethOver Nov 21 '19 at 23:05
  • Why should I use fork-join-techniques when my tasks are independent of each other? I guess, the "synchronization overhead" is the **join**-part which I do not need for my case. I only want to use async and non-blocking processing of each task without synchronization.. – nimo23 Nov 21 '19 at 23:09
  • It's not just the synchronization of the tasks themselves, but the synchronization required to schedule and execute the tasks. Blocking tasks need special coordination with the thread pool, and very fast tasks don't benefit from parallelism. – erickson Nov 21 '19 at 23:47

2 Answers2

3

In Java 8 parallelStream() use the ForkJoinCommonPool which is initialised at JVM startup and contains a fixed number of threads that is more suited to work that can follows the "divide and conquer" paradigm. In your case, since they are all isolated, the use of an ExecutorService may be more fitting.

tomgeraghty3
  • 1,234
  • 6
  • 10
  • Can I **nest** executorServices? For example: Actually, the `tasks`-list is produced by a `scheduledExecutorService` and after the task-list is produced, I delegate each task to `executorService`. After all tasks are finished, it waits for the next schedule. And repeat these steps. Are there any caveats when nesting an `executorService` within a `scheduledExecutorService`? Or are there better techniques for such cases? – nimo23 Nov 21 '19 at 22:22
  • You mean having a single thread that generates tasks every X minutes/seconds etc and then passes these to an Executor Service that has a predefined number of threads? It's not exactly **nesting** because they operate independently, it's just so happens that your scheduled service utiilises the fixed service. Without knowing too much of the task you're trying to acomplish I'd say it seems ok. – tomgeraghty3 Nov 21 '19 at 23:32
  • I already tried it and it works..however, I tend to use CompletableFuture with ExecutorService as it is more like "fire and forget" (non-blocking and async) which is what I need. – nimo23 Nov 21 '19 at 23:34
  • Yeah and you can check the state of those Futures to see if it's finished. If they're not you could block the Scheduler Service until they've finished (which I think you said you wanted when you said "after all tasks are finished") because if they've not finished and you're Scheduler Service goes over the next schedule then it will just queue the new tasks – tomgeraghty3 Nov 21 '19 at 23:36
  • I guess, I do not need to see if its finished, because I use `executor.scheduleWithFixedDelay`..so it must process all tasks and only after all previous tasks finished it will run the next schedule. – nimo23 Nov 21 '19 at 23:39
  • 1
    But I thought you said you had 2 executors? 1 that's the scheduled one (The line you just gave: `executor.scheduleWithFixedDelay`) and another (fixed one) that receives the tasks to perform. Your schedule one will be giving the fixed one tasks asynchronously and **won't** wait for them to finish before scheduling again... That's why I suggested that the logic in your schedule one needs to check to see if the last tasks are finished before it schedules more. – tomgeraghty3 Nov 21 '19 at 23:52
  • yes, good point. So the first executor (the scheduler) must look if all its tasks are finished before starting the next schedule, because each task is also processed by another executor. If I did not use the second executor then I would not need to look if its finished. – nimo23 Nov 21 '19 at 23:57
  • 1
    Correct. But you have 2 so you need to do the checking. Most of the methods of the Fixed executor will return `Future`s so just add those to a list and at the start of the logic for the Scheduled executor, check they are all complete and block until they are. Then overwrite the list with the newly scheduled task. You may want to add metrics (or logs at least) to say how long the Schedule executor is blocked for (if at all) as if it's happening regularly then that tells you that you either need more threads in the Fixed executor or you need to leave a longer gap between schedules – tomgeraghty3 Nov 22 '19 at 00:07
0

A good solution for you can be to use CompletableFuture.allOf. Use it like this:

ExecutorService ex = //Whatever executor you want;

CompletableFuture.allOf((CompletableFuture<Void>[]) tasks.stream()
        .map(task -> CompletableFuture.runAsync((() -> /* Do task */), ex))
        .toArray());

In doing so, you can perform asynchronous, non-blocking. Also, you will get a compiler warning about type casting but I think in your case, it may be safe to ignore it.

ExecutorService.submit will fire off the task but when you use get to obtain any result, it's going to block and then retrieve. CompletableFuture doesn't block when getting the data. This is case when you want to see some kind of result returned after all the parallel tasks finish.
Some more explanation can be found here.

Also, in your original question, you asked if it is a good idea to use parallelStream and my answer to that would be that it isn't a good idea because if there is a task that blocks the thread then you will have problems (assuming you have used parallelStream all over the place in your code).

Also, CompletableFuture can accept it's own thread pool (which you can customize yourself) and run there. Notice the second argument to runAsync in the above code.

If you simply want to have a fire and forget mechanism and don't care about the result then using the ExecutorService.invokeAll is a good way to do thing. You can use it like this:

 executorService.invokeAll(tasks.stream().map(task -> new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    // run task;
                    return null;
                }
            })
.collect(Collectors.toList()));  

But why do you want to use a CompletableFuture with your own ExecutorService in such a case?
One good reason is the fluent error handling. You can see some examples here and here

Shankha057
  • 1,296
  • 1
  • 20
  • 38
  • Can u tell me the difference between yours and the following version: `tasks.forEach(u -> executorService.submit(t::run));` Why should I use CompletableFuture additionally? – nimo23 Nov 21 '19 at 22:34
  • According to https://dzone.com/articles/concurrency-in-action-using-javas-completable-futu: `CompletableFuture` uses a forkjoin thread pool, which is a thread pool **shared between all CompletableFutures and all parallel streams**. I did not get the point: why not simply using my above solution with `tasks.parallelStream()` as it also uses forkjoin thread pool..it seems, there is no benefit in using `CompletableFuture` over `parallelStream`..both are async and non-blocking..or? – nimo23 Nov 21 '19 at 22:48
  • And what if I use `ExecutorService.submit` **without** using `get` and without `CompletableFuture`. I do not need a result, it should only process the task (like fire and forget). Does this behave non blocking and async the same as if I would use `CompletableFuture` in combination with `ExecutorService`? – nimo23 Nov 21 '19 at 23:14
  • @nimo23 is it compulsory for you to collect all your tasks in a `List`? What I mean is that is it possible for you to use the result of the computation directly in the other executor and not collect it in a `List`? – Shankha057 Nov 22 '19 at 00:13