2

I have never used a ForkJoinPool and I came accross this code snippet.

I have a Set<Document> docs. Document has a write method. If I do the following, do I need to have a get or join to ensure that all the docs in the set have correctly finished their write method?

ForkJoinPool pool = new ForkJoinPool(concurrencyLevel);
pool.submit(() -> docs.parallelStream().forEach(
    doc -> {
        doc.write();
    })
);

What happens if one of the docs is unable to complete it's write? Say it throws an exception. Does the code given wait for all the docs to complete their write operation?

Mark
  • 2,058
  • 2
  • 35
  • 64
  • @Kayaman. Thank you. According to https://www.baeldung.com/java-8-parallel-streams-custom-threadpool, This approach should work. – Mark Aug 20 '18 at 20:46
  • What does `doc.write()` do? Write the content of a document to a file? In that case, doing it in parallel is most likely not going to make it faster; it might even make it slower, since a harddisk can only write one thing at a time. If this does what it looks like it does, this is not a good case of something you'd want to parallellize. – Jesper Aug 20 '18 at 20:52
  • @Jesper, it writes to an external datastore. – Mark Aug 20 '18 at 20:54
  • 1
    According to the link where you got example you still need to call `get()` on a `Future` object returned by `pool.submit()` to wait for all tasks to finish – Ivan Aug 21 '18 at 06:13
  • @Mark you're right. I didn't realize such a bad looking construct [can be used](https://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream). I have to say I don't like how it looks, since it's not at all intuitive. Something like `parallelStream(pool)` would be understandable immediately (although it seems like this is more of a "trick" than some well thought out functionality). – Kayaman Aug 21 '18 at 06:28
  • @Kayaman still, this interaction of the custom `ForkJoinPool` with the `parallelStream()` is an undocumented side effect of the implementation. Also, there are some hard-coded dependencies to the default pool’s parallelism in the Stream implementation code, which makes this trick less smooth (and also indicate that this is not an intended use case). – Holger Aug 21 '18 at 08:57
  • @Holger right, so it seems. Definitely not a "cool trick" I would use in production grade code. – Kayaman Aug 21 '18 at 09:39
  • Would either of you mid turning this into an answer? I am a little bit new to Java, so if you could please elucidate on why you think this is either not an intended use case and/or why you would not use it in production code? Thanks! – Mark Aug 21 '18 at 20:39

1 Answers1

1

ForkJoinPool.submit(Runnable) returns a ForkJoinTask representing the pending completion of the task. If you want to wait for all documents to be processed, you need some form of synchronization with that task, like calling its get() method (from the Future interface).

Concerning the exception handling, as usual any exception during the stream processing will stop it. However you have to refer to the documentation of Stream.forEach(Consumer):

The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever thread the library chooses. […]

This means that you have no guarantee of which document will be written if an exception occurs. The processing will stop but you cannot control which document will still be processed.

If you want to make sure that the remaining documents are processed, I would suggest 2 solutions:

  • surround the document.write() with a try/catch to make sure no exception propagates, but this makes it difficult to check which document succeeded or if there was any failure at all; or
  • use another solution to manage your parallel processing, like the CompletableFuture API. As noted in the comments, your current solution is a hack that works thanks to implementation details, so it would be preferable to do something cleaner.

Using CompletableFuture, you could do it as follows:

List<CompletableFuture<Void>> futures = docs.stream()
                    .map(doc -> CompletableFuture.runAsync(doc::write, pool))
                    .collect(Collectors.toList());

This will make sure that all documents are processed, and inspect each future in the returned list for success or failure.

Didier L
  • 18,905
  • 10
  • 61
  • 103