5

Assume a lambda expression consume a certain amount of a resource (like memory) which is limited and requires to limit the number of concurrent executions (example: if the lambda temporarily consumes 100 MB (of local memory) and we like to limit it to 1GB, we do not allow for more that 10 concurrent evaluations).

What is the best way to limit the number of concurrent execution, say for example in

IntStream.range(0, numberOfJobs).parallel().foreach( i -> { /*...*/ });

?

Note: An obvious option is to perform a nesting like

    double jobsPerThread = (double)numberOfJobs / numberOfThreads;
    IntStream.range(0, numberOfThreads).parallel().forEach( threadIndex ->
        IntStream.range((int)(threadIndex * jobsPerThread), (int)((threadIndex+1) * jobsPerThread)).sequential().forEach( i -> { /*...*/ }));

Is this the only way? Tt is not that elegant. Actually I would like to have a

IntStream.range(0, numberOfJobs).parallel(numberOfThreads).foreach( i -> { /*...*/ });
Christian Fries
  • 16,175
  • 10
  • 56
  • 67
  • 1
    Why can't you use a shared fixed thread pool? What about the parallel streams complicates things? – Gray Mar 02 '14 at 15:28
  • I did use a shared fixed thread pool in Java 6, but the Java 8 code is much more concise. With the thread pool I have to define an ArrayList of Futures, submit the worker to the executor, collect the results from the Futures. While this is the main motivation, I had the impression that the streams are more efficient (by about 10%). – Christian Fries Mar 02 '14 at 15:33
  • 2
    http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream – Alexei Kaigorodov Mar 02 '14 at 16:42
  • @AlexeiKaigorodov Thanks for the hint to creating a ForkJoinPool with a given target parallelism level or change the current target parallelism level. This is a good solution too. – Christian Fries Mar 02 '14 at 18:03

2 Answers2

4

The Streams use a ForkJoinPool for parallel operations. By default they are using the ForkJoinPool.commonPool() which does not allow changing the concurrency afterwards. However, you can use your own ForkJoinPool instance. When you execute the stream code within the context of your own ForkJoinPool this context pool will be used for the stream operations. The following example illustrates this by executing the same operation once using default behavior and once using a custom pool with a fixed concurrency of 2:

import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class InterfaceStaticMethod {
    public static void main(String[] arg) throws Exception {
      Runnable parallelCode=() -> {
        HashSet<String> allThreads=new HashSet<>();
        IntStream.range(0, 1_000_000).parallel().filter(i->{
          allThreads.add(Thread.currentThread().getName()); return false;}
        ).min();
        System.out.println("executed by "+allThreads);
      };
      System.out.println("default behavior: ");
      parallelCode.run();
      System.out.println("specialized pool:");
      ForkJoinPool pool=new ForkJoinPool(2);
      pool.submit(parallelCode).get();
    }
}
Holger
  • 285,553
  • 42
  • 434
  • 765
  • 1
    Interesting - I would have thought that `parallel()` would always use its own pool (ForkJoin common pool) but it does not. – assylias Mar 03 '14 at 11:55
  • 5
    The pool that's used by `parallel()` is unspecified. The current behavior is as illustrated here, but it may change in the future. – Stuart Marks Mar 03 '14 at 18:18
2

Depending on your use case, using the CompletableFuture utility methods may be easier:

import static java.util.concurrent.CompletableFuture.runAsync;

ExecutorService executor = Executors.newFixedThreadPool(10); //max 10 threads
for (int i = 0; i < numberOfJobs; i++) {
    runAsync(() -> /* do something with i */, executor);
}

//or with a stream:
IntStream.range(0, numberOfJobs)
         .forEach(i -> runAsync(() -> /* do something with i */, executor));

The main difference with your code is that the parallel forEach will only return after the last job is over, whereas runAsync will return as soon as all the jobs have been submitted. There are various ways to change that behaviour if required.

assylias
  • 321,522
  • 82
  • 660
  • 783
  • This one is nice too, but to make it complete (i.e. have a fair comparison to the other solutions) one has to add the lines of code required to joint all the threads. It is actually this additional code which make working with an ExecutorService look a bit more lengthy. – Christian Fries Mar 02 '14 at 18:01
  • 1
    Yes indeed - depending on your use case, you may be able to use one of the `thenXXX` method, or you could shutdown the executor (which will join on all threads). If that does not work then you will probably have to retrieve the futures and it will become very close to the code that does not use streams. – assylias Mar 02 '14 at 18:18