4

Consider this code :

Thread thread = new Thread(() -> tasks.parallelStream().forEach(Runnable::run));

tasks are a list of Runnables that should be executed in parallel. When we start this thread, and it begins its execution, then depending on some calculations we need to interrupt (cancel) all those tasks.

Interrupting the Thread will only stop one of exections. How do we handle others? or maybe Streams should not be used that way? or you know a better solution?

vach
  • 10,571
  • 12
  • 68
  • 106
  • 4
    I don’t get it. There is `ExecutorService.invokeAll` with a clear way of canceling the created tasks. Why do you insist on using something not meant to provide such a feature? There is nothing more readable in `tasks.parallelStream().forEach(Runnable::run)` than in `executor.invokeAll(tasks)` – Holger Jun 16 '14 at 12:46
  • Because code that runs this, sometimes needs to do synced task execution by using tasks.stream() instead of parallel stream, the only right now I'm using streams is that they are more readable and easy to understands for someone reviewing the code... But probably you're right and these are not intended to be used the way i'm using it. – vach Jun 17 '14 at 06:00
  • You can use a single threaded `ExecutorService` or even implement a run-in-place `ExecutorService` which is very easy when subclassing [`AbstractExecutorService`](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/AbstractExecutorService.html). With that you get sequential execution with cancellation support for free… – Holger Jun 17 '14 at 07:55

3 Answers3

4

You can use a ForkJoinPool to interrupt the threads:

@Test
public void testInterruptParallelStream() throws Exception {
    final AtomicReference<InterruptedException> exc = new AtomicReference<>();

    final ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    // use the pool with a parallel stream to execute some tasks
    forkJoinPool.submit(() -> {
        Stream.generate(Object::new).parallel().forEach(obj -> {
            synchronized (obj) {
                try {
                    // task that is blocking
                    obj.wait();
                } catch (final InterruptedException e) {
                    exc.set(e);
                }
            }
        });
    });

    // wait until the stream got started
    Threads.sleep(500);
    // now we want to interrupt the task execution
    forkJoinPool.shutdownNow();
    // wait for the interrupt to occur
    Threads.sleep(500);
    // check that we really got an interruption in the parallel stream threads
    assertTrue(exc.get() instanceof InterruptedException);
}

The worker threads do really get interrupted, terminating a blocking operation. You can also call shutdown() within the Consumer.

Note that those sleeps might not be tweaked for a proper unit test, you might have better ideas to just wait as necessary. But it is enough to show that it is working.

benez
  • 1,856
  • 22
  • 28
  • 1
    I check this solution works but it seems bad idea to create and stop new thread pool everyime, right? Is there a better solution? I have tried to use Future#cancel like this Future> task = forkJoinPool.submit(() -> { // parallel stream here }); task.cancel(true); but this doesn't work for some reason – Vitaly Tsvetkoff Jan 31 '20 at 13:18
  • @VitalyTsvetkoff if you intend to interrupt a specific execution of parallel executed tasks, then you don't want this process to be coupled with any other threads or pools, thus creating your own `ForkJoinPool` is the best way to ensure that you don't have side effects. and if the execution doesn't take long enough to run to justify the creation of your own pool, it may not be a good use case for a parallel stream that should improve performance – benez Mar 04 '21 at 17:46
3

You aren't actually running the Runnables on the Thread you are creating. You are running a thread which will submit to a pool, so:

Thread thread = new Thread(() -> tasks.parallelStream().forEach(Runnable::run));

In this example you are in lesser terms doing

List<Runnable> tasks = ...;
Thread thread = new Thread(new Runnable(){
    public void run(){
       for(Runnable r : tasks){
          ForkJoinPool.commonPool().submit(r);
       }
    }
});

This is because you are using a parallelStream that delegates to a common pool when handling parallel executions.

As far as I know, you cannot get a handle of the Threads that are executing your tasks with a parallelStream so may be out of luck. You can always do tricky stuff to get the thread but probably isn't the best idea to do so.

John Vint
  • 39,695
  • 7
  • 78
  • 108
2

Something like the following should work for you:

AtomicBoolean shouldCancel = new AtomicBoolean();
...
tasks.parallelStream().allMatch(task->{
    task.run();
    return !shouldCancel.get();
});

The documentation for the method allMatch specifically says that it "may not evaluate the predicate on all elements if not necessary for determining the result." So if the predicate doesn't match when you want to cancel, then it doesn't need to evaluate any more. Additionally, you can check the return result to see if the loop was cancelled or not.

BrainStorm.exe
  • 1,565
  • 3
  • 23
  • 40