1

I am attempting to parallelise a for-loop using Java streams & ForkJoinPool in order to control the number of threads used. When run with a single thread, the parallelised code returns the same result as the sequential program. The sequential code is a set of standard for-loops:

for(String file : fileList){
    for(String item : xList){
        for(String x : aList) {
              // action code
        }
    }
}

And the following is my parallel implementation:

ForkJoinPool threadPool = new ForkJoinPool(NUM_THREADS);
int chunkSize = aList.size()/NUM_THREADS;

for(String file : fileList){
    for(String item : xList){
    IntStream.range(0,  NUM_THREADS)
        .parallel().forEach(i -> threadPool.submit(() -> {

          aList.subList(i*chunkSize, Math.min(i*chunkSize + chunkSize -1, aList.size()-1))
               .forEach(x -> {
                      // action code
                });
          }));

         threadPool.shutdown();
         threadPool.awaitTermination(5, TimeUnit.MINUTES);
    }
}

When using more than 1 thread, only a limited number of iterations are completed. I have attempted to use .shutdown() and .awaitTermination() to ensure completion of all threads, however this doesn't seem to work. The number of iterations that occur difference dramatically from run to run (between 0-1500).

Note: I'm using a Macbook Pro with 8 available cores (4 dual-cores), and my action code does not contain references that make parallelisation unsafe.

Any advice would be much appreciated, thank you!

Chynna
  • 55
  • 8
  • 1
    Why do you use IntStream.parallel *and* a fork join pool? – matt Oct 23 '18 at 13:19
  • I saw it mentioned a few times in various places (incl. StackOverflow answers [1](https://stackoverflow.com/questions/41051520/java-8-how-can-i-convert-a-for-loop-to-run-in-parallel), [2](https://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream)). Although I'm now thinking that `System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4")` may be the better way to go. Still interested if anyone can explain to me why the original code did not work! – Chynna Oct 23 '18 at 13:29
  • I don't know how your reply answers my questions. IntStream.parallel will cause a stream to be processed in parallel. The threadPool.submit will submit tasks that will be run in parallel. You don't need to submit these tasks in parallel. Also, you don't need to call invoke submit will start the tasks. – matt Oct 23 '18 at 13:41
  • Also, with the 'invoke()' in your IntStream processing you can be swallowing runtime exceptions, which could stop the processing of your IntRange. – matt Oct 23 '18 at 13:48
  • Because it is a for-loop & I need the `i` value, it seems that the only option is to use the `IntStream.parallel()` method. I then also used `ForkJoinPool` because I wasn't aware of the `System.setProperty()` option for allowing # of parallel threads (I was only aware of how to limit the number of threads available in a Fork Join Pool). I do see now that I should be able to do it without the `ForkJoinPool` method that I used. – Chynna Oct 23 '18 at 13:54
  • Ah yes, I actually did end up deleting the `.invoke()` because I realised it wasn't needed. Forgot to delete it in this before I posted though - sorry about that! – Chynna Oct 23 '18 at 13:55
  • Why are you using a fork join pool in the first place? This is not what those are designed for. You seem to use them as a normal `ExecutorService` too. – daniu Oct 23 '18 at 14:50
  • 1
    You use an `IntStream`, which gives you the required `i`. This still doesn’t explain, why you are using `parallel()` on it. – Holger Oct 23 '18 at 15:47
  • Besides that, did you have the problems *before* or *after* you added `shutdown`? When you shut down the pool in the first iteration of the innermost loop, it won’t accept jobs in the next iterations… – Holger Oct 23 '18 at 15:54
  • @daniu I'm super new to parallel programming and saw a lot of online tutorials suggesting Fork Join Pool to manage the number of threads used. I see now that that's not needed when using a stream. – Chynna Oct 23 '18 at 22:12
  • @Holger I am using `parallel()` because I need it to run in parallel. There is thousands of iterations occurring and the loop would benefit from parallelisation. I have the same issue without including `shutdown()` however when including it, it does look like all iterations are performed in the first instance (before `shutdown()` is called). I've also tried a `CountDownLatch` to make sure the next iteration of the outer for-loop is not processed until all threads have finished, but don't believe the `CountDownLatch` value can be reset in each instance. – Chynna Oct 23 '18 at 22:18
  • @Chynna there seems to be a big confusion regarding the effect of `parallel()` on your side. You are already submitting the work to a thread pool. That’s already sufficient to perform the “thousands of iterations” in parallel. Calling `parallel()` on an `IntStream` only affects the evaluation of that single `IntStream` instance, which, in this case, is the iteration from zero to `NUM_THREADS`. You said, in your case it’s eight. Does counting from zero to eight sound like an operation that wins by being executed in parallel? It rather makes your own thread pool less efficient. – Holger Oct 24 '18 at 09:29
  • Besides the expendability of `parallel()`, why is completing all background tasks before the next iteration even a requirement? When “*my action code does not contain references that make parallelisation unsafe*”, where’s the problem? Just place the `shutDown(); awaitTermination(…)` after the outermost loop, and you’re done. Normally, letting the pool’s threads run out of work, is the last thing someone wants to do. [But if you insist on it…](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ForkJoinPool.html#awaitQuiescence(long,java.util.concurrent.TimeUnit)) – Holger Oct 24 '18 at 09:44
  • @Holger Definitely a misunderstanding by me! This makes a lot of sense. Thanks so much for your help. – Chynna Oct 25 '18 at 01:58

1 Answers1

2

I think the actual problem you have is caused by your calling shutdown on the ForkJoinPool. If you look into the javadoc, this results in "an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted" - ie. I'd expect only one task to actually finish.

BTW there's no real point in using a ForkJoinPool the way you use it. A ForkJoinPool is intended to split workload recursively, not unlike you do with your creating sublists in the loop - but a ForkJoinPool is supposed to be fed by RecursiveActions that split their work themselves, rather than splitting it up beforehand like you do in a loop. That's just a side note though; your code should run fine, but it would be clearer if you just submitted your tasks to a normal ExecutorService, eg one you get by Executors.newFixedThreadPool(parallelism) rather than by new ForkJoinPool().

daniu
  • 14,137
  • 4
  • 32
  • 53