3

I discovered the hard way that there is only one thread pool used by the JVM for processing streams in parallel. We had an I/O blocked function on a large stream that caused liveliness problems for unrelated and otherwise fast functions used with unrelated parallel streams.

There are no methods on stream that allow an alternate thread pool to be used.

Is there a simple way to avoid this problem, perhaps somehow specifying which thread pool to use?

Stuart Marks
  • 127,867
  • 37
  • 205
  • 259
Bohemian
  • 412,405
  • 93
  • 575
  • 722
  • 2
    possible duplicate of [Custom thread pool in Java 8 parallel stream](http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream) – mkobit Dec 20 '14 at 21:33
  • 1
    There is a trick -- suggested in the above-linked question -- but you should bear in mind that streams are primarily a mechanism for parallelizing *computation*, not *IO*, and so you're operating in the margins. We're looking into ways to bring some of these use cases a bit more into the mainstream. – Brian Goetz Dec 21 '14 at 18:39
  • @BrianGoetz while I have your ear, please consider an extension to the API, perhaps a parameter to the `parallel()` method, that allows a separate thread pool to be used for parallel stream processing - some computations are heavy too and could cause liveliness problems. Although I see that "very long" times (such as IO) are out of scope for the design, it is natural enough to use streams to drive such usage and IMHO there should be an easy way to mitigate the impact of this kind of usage. For me, it broke Josh's rule of "least astonishment". Merry Xmas :) – Bohemian Dec 21 '14 at 21:24
  • 1
    @Bohemian You probably won't be surprised to hear that you're not the first to suggest this. However, we had good reasons for not making this "obvious" choice, and I am still convinced we made the right call. – Brian Goetz Dec 21 '14 at 23:08

3 Answers3

2

I wrote a small library called StreamEx which can submit a task to the custom FJP. So you can write

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
int[] primes = IntStreamEx.range(1, 1_000_000)
    .parallel(forkJoinPool)
    .filter(PrimesPrint::isPrime).toArray();

It simply remembers your pool and launches the terminal operation inside it joining the result. Just a syntactic sugar for the beforementioned solution.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
2

You can wrap the blocking operation in a ForkJoinPool.ManagedBlocker, along the lines of this:

static <T> Supplier<T> blocking(Supplier<T> supplier) {
    return new Supplier<T>() {
        volatile T result;

        @Override
        public T get() {
            try {
                ForkJoinPool.managedBlock(new ManagedBlocker() {
                    @Override
                    public boolean block() {
                        result = supplier.get();
                        return true;
                    }

                    @Override
                    public boolean isReleasable() {
                        return result != null;
                    }
                });
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return result;
        }
    };
}

Use it then, for example, as such:

Stream.generate(blocking(() -> ...))
      .parallel()
      ...
      .collect(...);

More information can be found in this blog post: http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/

The jOOλ provides wrappers for all Java 8 FunctionalInterface types, like the above, via org.jooq.lambda.Blocking, so you can write:

Stream.generate(Blocking.supplier(() -> ...))
      .parallel()
      ...
      .collect(...);

Or, e.g. when a filter is blocking:

Stream....
      .parallel()
      .filter(Blocking.predicate(t -> blockingTest(t)))
      .collect(...);

(Disclaimer, I work for the company behind jOOλ).

Lukas Eder
  • 211,314
  • 129
  • 689
  • 1,509
  • IIRC you claimed that jOOλ does not care about parallel processing :-) Actually your solution (as well as mine) have another problem. Number of subtasks created by Stream API is usually 4*number of processors (could be more for infinite streams). This number will not change if we create custom pool with 100 threads or mark tasks as blocking with your approach. For blocking IO it's actually better to use something else (e.g. CompletableFutures). – Tagir Valeev Feb 10 '16 at 02:11
  • @TagirValeev: The purpose of jOOλ is to "fix" the JDK as a whole (e.g. also provide functional interfaces that throw checked exceptions). The purpose of jOOλ's `Seq` is to "fix" `Stream` for sequential (and ordered) use. `CompletableFuture` also sends tasks to the `ForkJoinPool`, so you might exhaust the same threads. I agree that this usage here is probably not what people should do. But then again, what *is* blocking (in this context)? Is it I/O? Is it a loop until `1_000_000_000_000`? Is it a silly exponential algorithm? – Lukas Eder Feb 10 '16 at 08:04
  • OP explicitly said it's I/O. Using CF you can create as many tasks as you wish, but using parallel stream you cannot control it. On 4-core machine up to 16 tasks are usually created and if 15 of them are blocked in I/O op, then even with your trick only one task will continue working. Stream API will just not divide the input anymore and will not create more tasks for you. – Tagir Valeev Feb 10 '16 at 08:21
0

This is possibly similar to Custom thread pool in Java 8 parallel stream

The problem is further discussed in this blog.

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();
Community
  • 1
  • 1
wassgren
  • 18,651
  • 6
  • 63
  • 77