1

Java Streams base the amount of parallelism on your hardware. But what if i want to always have the maximum amount of parallelism?

Consider the code below. I want each of the 10 tasks to concurrently run for 100 milliseconds.

long runUntil = System.currentTimeMillis() + 100;
IntStream.range(0, 10).parallel().forEach(i ->
{
    int cnt = 0;
    while(System.currentTimeMillis() < runUntil)
        cnt++;
    System.out.println(i + ": " + cnt);
});

However, the result I get is:

2: 56443
1: 67506
4: 74693
6: 70549
0: 0
3: 0
5: 0
7: 0
8: 0
9: 0

So only 4 tasks are executed in parallel, and the fifth task only starts when one of the first 4 is finished. I want all the tasks to start at approximately the same time, and not wait for eachother.

I don't agree that it's a duplicate of Custom thread pool in Java 8 parallel stream, because that question is about slow running tasks blocking other tasks, while in my case, I just want to know how I can (if I can) maximize the parallelism when using the Stream API.

Community
  • 1
  • 1
wvdz
  • 16,251
  • 4
  • 53
  • 90
  • 1
    What is the expected result? –  Dec 12 '16 at 14:15
  • 6
    It looks like you've got 4 cores. – 4castle Dec 12 '16 at 14:17
  • 'concurrently run for 100 millisecond': Do you want all tasks to a) start at the same time, or b) each not to finish until it has run for 100ms? –  Dec 12 '16 at 14:22
  • 3
    @wvdz If the solution you're going to go with is *java.util.concurrent.ForkJoinPool.common.parallelism = 10* it might be *enormously wrong.* What you are doing is enabling the parallelism for the ENTIRE application, this can't be good. Every time you are going to use a parallel stream *or* the forkjoinpool will be used (in places that you have no control of) there will be 10 threads used. You might want to share the exact thing that you want to achieve. – Eugene Dec 12 '16 at 14:32
  • @StoyanDekov I don't agree that it's an exact duplicate. The usecase is different. I really want to execute everything concurrently. The most highly voted answer to the linked question is not a solution for my problem. – wvdz Dec 12 '16 at 14:43

2 Answers2

6

When you execute a parallel stream, you are under the hood invoking a ForkJoinPool, that pool has the number of working Threads that are equal to the result of :

 Runtime.getRuntime().availableProcessors(); // 4 in your case

so the parallel task is executed concurrently by 4 threads.

By the time you start the 5th task (100 miliseconds have passed), so this condition:

  while(System.currentTimeMillis() < runUntil)

reports false, thus zeroes only.

To solve this problem you can create a ForkJoinPool yourself, as explained in this answer as well (https://stackoverflow.com/a/22269778/2947592)

long runUntil = System.currentTimeMillis() + 1000;
ForkJoinPool forkJoinPool = new ForkJoinPool(10); // 10 Threads
forkJoinPool.submit(() ->
IntStream.range(0, 10).parallel().forEach(i -> {
    int cnt = 0;
    while (System.currentTimeMillis() < runUntil)
        cnt++;
    System.out.println(i + ": " + cnt);
})).get();
Community
  • 1
  • 1
Eugene
  • 117,005
  • 15
  • 201
  • 306
  • 1
    @wvdz I've seen the answer that you have provided and yes that *fixes* your problem, but it's not a good idea to map more threads then cores you have. Would it be OK for you to explain your use case? I'm really intrigued. If all you want to do is measure how fast each core does the adding, then allocating 10 threads for 4 cores will only make your results worse. – Eugene Dec 12 '16 at 14:26
  • That may not be a good idea, but it's a requirement for what I'm working on. The other solution would be to manually fire up threads, but I'd rather use the stream API. – wvdz Dec 12 '16 at 14:30
  • 2
    @wvdz In a production environment, you'll want to manually fire off threads so that people know what's going on. You could `mapToObject` and create threads there. – 4castle Dec 12 '16 at 14:39
  • If that's true then that should be the answer. – wvdz Dec 12 '16 at 14:42
  • @wvdz if that is the answer you wanted, then this should be closed as a duplicate. – Eugene Dec 12 '16 at 14:44
  • Duplicate of what? – wvdz Dec 12 '16 at 14:46
  • @wvdz edit to add how to create the ForkJoinPool with 10 threads as you wanted 10 Threads. – Eugene Dec 12 '16 at 15:49
1

So I already found the answer to my own question. The problem is that it really feels like a hack and not a proper solution. I would not be comfortable using this in a production environment.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

Now I will get a result like this, when I let it run for 1000 milliseconds:

9: 40158551
8: 41835052
0: 39087202
4: 37993773
6: 37993442
7: 36503041
2: 40076207
1: 37894657
5: 35785211
3: 40086037

I think my requirement is reasonable and am surprised it is apparantly not supported by the stream API.

wvdz
  • 16,251
  • 4
  • 53
  • 90
  • 2
    Again ... both question and answer are a duplicate of [Custom thread pool in Java 8 parallel stream](http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream) – SDekov Dec 12 '16 at 14:28