8

By default Java streams are processed by a common thread pool, which is constructed with default parameters. As has been answered in another question, one can adjust these defaults by specifying a custom pool or by setting the java.util.concurrent.ForkJoinPool.common.parallelism system parameter.

However, I've been unable to increase the number of threads allocated to stream processing by any of these two methods. As an example, consider the program below, which processes a list of IP addresses contained in a file specified in its first argument and outputs the resolved addresses. Running this on a file with about 13000 unique IP addresses, I see using Oracle Java Mission Control as few as 16 threads. Of these, only five are ForkJoinPool workers. Yet, this particular task would well benefit from many more threads, because threads spend most of the time waiting for DNS responses. So my question is, how can I actually increase the number of threads used?

I've tried the program on three environments; these are the OS-reported number of threads.

  • Java SE Runtime Environment build 1.8.0_73-b02 on an 8-core machine running Windows 7: 17 threads
  • Java SE Runtime Environment build 1.8.0_66-b17 on a 2-core machine running OS X Darwin 15.2.0: 23 threads
  • openjdk version 1.8.0_72 on a 24-core machine running FreeBSD 11.0: 44 threads

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;

/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
    /** Resolve the passed IP address into a name */
    static String addressName(String ipAddress) {
        try {
            return InetAddress.getByName(ipAddress).getHostName();
        } catch (UnknownHostException e) {
            return ipAddress;
        }
    }

    public static void main(String[] args) {
        Path path = Paths.get(args[0]);
        ForkJoinPool fjp = new ForkJoinPool(100);
        try {
            fjp.submit(() -> {
                try {
                    Files.lines(path)
                    .parallel()
                    .map(line -> addressName(line))
                    .forEach(System.out::println);
                } catch (IOException e) {
                    System.err.println("Failed: " + e);
                }
            }).get();
        } catch (Exception e) {
            System.err.println("Failed: " + e);
        }
    }
}
Community
  • 1
  • 1
Diomidis Spinellis
  • 18,734
  • 5
  • 61
  • 83
  • 3
    You should enclose `Files.lines()` in a try-with-resources statement! – fge Feb 23 '16 at 15:42
  • 2
    I suggest you add the lines to a List before attempting to parallel() it. It does a much better job when it knows how many entries there are in advance. – Peter Lawrey Feb 23 '16 at 15:48

1 Answers1

10

There are two problems with your approach. First is that using custom FJP will not change the maximal number of individual tasks created by stream API as this is defined in the following way:

static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;

So even if you're using custom pool, the number of parallel tasks will be limited by commonPoolParallelism * 4. (it's actually not hard limit, but a target, but in many cases number of tasks is equal to this number).

The above problem can be fixed by using java.util.concurrent.ForkJoinPool.common.parallelism system property, but here you hit another problem: Files.lines is parallelized really badly. See this question for details. In particular, for 13000 input lines the maximal possible speedup is 3.17x (assuming that every line processing takes roughly the same time) even if you have 100 CPUs. My StreamEx library provides a work-around for this (create stream using StreamEx.ofLines(path).parallel()). Another possible solution is to read file lines sequentially into List, then create a parallel stream from it:

Files.readAllLines(path).parallelStream()...

This would work along with the system property. However in general Stream API is not well-suited for parallel-processing when tasks involve I/O. More flexible solution is to use CompletableFuture for each line:

ForkJoinPool fjp = new ForkJoinPool(100);
List<CompletableFuture<String>> list = Files.lines(path)
    .map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp))
    .collect(Collectors.toList());
list.stream().map(CompletableFuture::join)
    .forEach(System.out::println);

This way you don't need to tweak system property and can use separate pools for separate tasks.

Community
  • 1
  • 1
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • and it shouldn’t be unmentioned that this technique for changing the number of threads is completely implementation dependent, unspecified behavior and nothing, developers should rely on – Holger Feb 23 '16 at 17:34
  • @Holger, I assume you mean the .submit method, right? – Diomidis Spinellis Feb 23 '16 at 17:48
  • Thank you! The CompletableFuture approach does indeed spawn 100 threads and offers an order of magnitude speedup. Here are the numbers. Original: 48m40.036s; CompletableFuture: 0m37.465s. (Note that the original version also run on a warm DNS cache.) – Diomidis Spinellis Feb 23 '16 at 17:51
  • @Diomidis Spinellis: right, the `submit` method, which alters the stream behavior because streams use Fork/Join, which is an implementation detail. – Holger Feb 23 '16 at 17:52
  • @Tagir Valeev: One limitation of your approach is that it requires memory proportional to the number of items. Would it be possible to run the processing with the specified number of threads on a single stream? I tried the naive way (eliminating collect(), and stream.list()), but the number of threads shrank again down to 16. I'm not sure what's going on. – Diomidis Spinellis Feb 23 '16 at 19:04
  • @DiomidisSpinellis, in your case memory usage should not be the very big problem (13000 completable futures is not that much). Also note that `Files.lines().parallel()` actually also buffers lines in intermediate arrays, so you actually have all the lines loaded in memory. – Tagir Valeev Feb 24 '16 at 03:04
  • Thank you @TagirValeev. I know that 13000 entries consumes insignificant memory. However, I like how streams can process arbitrary amounts of data, and it's a pity that the new solution doesn't allow this for an algorithm that doesn't require in-memory processing. Does `.parallel()` buffer all entries, or a window? (I'd assume the latter, which is the correct approach.) – Diomidis Spinellis Feb 24 '16 at 06:22
  • @DiomidisSpinellis, it depends on number of elements and pool size. For 13000 elements it creates 5 buffers: of 1024, 2048, 3072, 4096 and 2760 elements and creates 5 subtasks on these buffers (btw that's how I calculated 3.17x factor in my answer: it's 13000/4096 - biggest buffer). If first subtask is fully processed before the splitting finishes, then you will have at most less than 13000 lines in the memory. However in your case I think that file reading is much faster than DNS resolution, so the whole file will be in memory. You may save memory if you have much more lines (like 1M). – Tagir Valeev Feb 24 '16 at 06:34
  • Thanks! Maybe I'm asking for too much, and a new execution mechanism is required. I was hoping for something that would act like GNU parallel: dynamically schedule arbitrary jobs as long as the machine load is below a given level, without reading all of them into memory. – Diomidis Spinellis Feb 24 '16 at 09:21