30

The default "paralellStream()" in Java 8 uses the common ForkJoinPool which may be a latency problem if the common Pool threads are exhausted when a task is submitted. However in many cases enough CPU power is available and the tasks are short enough so that this is not a problem. If we do have some long running tasks this will of course need some careful consideration, but for this question let's assume that this is not the problem.

However filling the ForkJoinPool with I/O tasks that don't actually do any CPU-bound work is a way to introduce a bottleneck even though enough CPU power is available. I understood that. However that is what we have the ManagedBlocker for. So if we have an I/O task we should simply allow the ForkJoinPool to manage that within a ManagedBlocker. That sounds incredibly easy. However to my surprise using a ManagedBlocker is rather complicated API for the simple thing that it is. And after all I think that this is a common problem. So I just built a simple utility method that makes ManagedBlockers easy to use for the common case:

public class BlockingTasks {

    public static<T> T callInManagedBlock(final Supplier<T> supplier) {
        final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
        try {
            ForkJoinPool.managedBlock(managedBlock);
        } catch (InterruptedException e) {
            throw new Error(e);
        }
        return managedBlock.getResult();
    }

    private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
        private final Supplier<T> supplier;
        private T result;
        private boolean done = false;

        private SupplierManagedBlock(final Supplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override
        public boolean block() {
            result = supplier.get();
            done = true;
            return true;
        }

        @Override
        public boolean isReleasable() {
            return done;
        }

        public T getResult() {
            return result;
        }
    }
}

Now if I want to download the html code of a couple of websites in paralell I could to it like this without the I/O causing any trouble:

public static void main(String[] args) {
    final List<String> pagesHtml = Stream
        .of("https://google.com", "https://stackoverflow.com", "...")
        .map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
        .collect(Collectors.toList());
}

I am a little bit surprised that there is no class like the BlockingTasks above shipped with Java (or I did not find it?), but it was not that hard to build.

When I google for "java 8 parallel stream" I get in the first four results those articles that claim that due to the I/O problem Fork/Join sucks in Java:

I have altered my search terms somewhat and while there a lot of people complaining about how horrible life is I found nobody talking about a solution like the above. Since I don't feel like Marvin (brain like a planet) and Java 8 is available for quite a while I suspect that there is something terribly wrong with what I am proposing up there.

I banged together a small test:

public static void main(String[] args) {
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
    IntStream.range(0, 10).parallel().forEach((x) -> sleep());
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}

public static void sleep() {
    try {
        System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        throw new Error(e);
    }
}

I ran that an got the following result:

18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End

So on my 8 CPU computer the ForkJoinPool naturally choose 8 threads, completed the first 8 tasks and finally the last two tasks which means that this took 20 seconds and if there were other tasks queued the pool could still have not used the clearly idle CPUs (except for 6 cores in the last 10 seconds).

Then I used...

IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));

...instead of...

IntStream.range(0, 10).parallel().forEach((x) -> sleep());

...and got the following result:

18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End

It looks to me like this works, extra threads were started to compensate my mock "blocking I/O action" (sleep). Time was cut down to 10 seconds and I suppose that if I'd queue more tasks that those could still use the available CPU power.

Is there anything wrong with this solution or in general using I/O in streams if the I/O operation is wrapped in a ManagedBlock?

Community
  • 1
  • 1
yankee
  • 38,872
  • 15
  • 103
  • 162

1 Answers1

17

In short, yes, there are some problems with your solution. It definitely improves using blocking code inside parallel stream, and some third-party libraries provide similar solution (see, for example, Blocking class in jOOλ library). However this solution does not change the internal splitting strategy used in Stream API. The number of subtasks created by Stream API is controlled by the predefined constant in AbstractTask class:

/**
 * Default target factor of leaf tasks for parallel decomposition.
 * To allow load balancing, we over-partition, currently to approximately
 * four tasks per processor, which enables others to help out
 * if leaf tasks are uneven or some processors are otherwise busy.
 */
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;

As you can see it's four times bigger than common pool parallelism (which is by default number of CPU cores). The real splitting algorithm is a little bit more tricky, but roughly you cannot have more than 4x-8x tasks even if all of them are blocking.

For example, if you have 8 CPU cores, your Thread.sleep() test will work nicely up to IntStream.range(0, 32) (as 32 = 8*4). However for IntStream.range(0, 64) you will have 32 parallel tasks each processing two input numbers, so the whole processing would take 20 seconds, not 10.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • Good point with the decomposition. This will of course limit the the time a single task may take, but it will not limit the total throughput if enough other computational tasks are in the queue. Conclusion: If only throughput is an issue the solution is fine. If response time of a single I/O task is an issue and the single I/O task in question can be decomposed in more steps, then onther solution should be considered. – yankee May 30 '16 at 07:03
  • 3
    And not to forget: the Stream API’s use of Fork/Join is an implementation detail. As long as Streams aren’t guaranteed to use that framework, there is no guaranty that using `ManagedBlocker` will improve concurrency at all… – Holger May 30 '16 at 13:33
  • @yankee why do you say it doesn't limit total throughput? I tried the experiment and the throughput definitely seemed limited. – Adam Bliss Jun 12 '19 at 14:10
  • @AdamBliss: My statement was academically derived from the answer above. As mentioned this of course only holds true if enough computational tasks are still present to fill the CPU... – yankee Jul 18 '19 at 15:09
  • 1
    It looks like this errant line in AbstractTask was adjusted in https://bugs.openjdk.java.net/browse/JDK-8190974 . So now, as long as your pool has at least 1/4 as many threads as you have sleeping tasks, you may be able to achieve good throughput. – Adam Bliss Sep 09 '19 at 17:45