20

The following code fragment is part of a method that gets a directory listing, calls an extract method on each file and serializes the resulting drug object to xml.

try(Stream<Path> paths = Files.list(infoDir)) {
    paths
        .parallel()
        .map(this::extract)
        .forEachOrdered(drug -> {
            try {
                marshaller.write(drug);
            } catch (JAXBException ex) {
                ex.printStackTrace();
            }
        });
}

Here is the exact same code doing the exact same thing but using a plain .list() call to get the directory listing and calling .parallelStream() on the resulting list.

Arrays.asList(infoDir.toFile().list())
    .parallelStream()
    .map(f -> infoDir.resolve(f))
    .map(this::extract)
    .forEachOrdered(drug -> {
        try {
            marshaller.write(drug);
        } catch (JAXBException ex) {
            ex.printStackTrace();
    }
});

My machine is a quad core MacBook Pro, Java v 1.8.0_60 (build 1.8.0_60-b27).

I am processing ~ 7000 files. The averages of 3 runs:

First version: With .parallel(): 20 seconds. Without .parallel(): 41 seconds

Second version: With .parallelStream(): 12 seconds. With .stream(): 41 seconds.

Those 8 seconds in parallel mode seem like an enormous difference given that the extract method that reads from the stream and does all the heavy work and the write call doing the final writes are unchanged.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
kliron
  • 4,383
  • 4
  • 31
  • 47
  • how does your code perform without parallelStream? – sidgate Dec 17 '15 at 18:33
  • I doubt File.list() is “the exact same code doing the exact same thing” as traversing a DirectoryStream. – VGR Dec 17 '15 at 18:41
  • @sidgate I updated the question. Without parallelization they execute in almost exactly the same time. – kliron Dec 17 '15 at 18:45
  • @VGR Sure, I meant I didn't expect it to do so much work as to take a full 8 seconds out of a 20 second run. Think about it. More than 1/3 the execution time just to coordinate a parallel stream under the covers?? – kliron Dec 17 '15 at 18:57
  • 4
    http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html "Currently, JDK IO-based Stream sources (for example BufferedReader.lines()) are mainly geared for sequential use, processing elements one-by-one as they arrive." – Louis Wasserman Dec 17 '15 at 18:58
  • 5
    A directory stream doesn’t know its size beforehand and thus, can’t make a balanced workload split. As you said yourself, it’s `extract` that does the heavy work, not the reading of the directory, so reading the directory single threaded completely into the memory and perform a parallel operation on the resulting array, that has a known size, is much more efficient. – Holger Dec 17 '15 at 18:58
  • 3
    It's worth looking at the flags returned by calling `spliterator().characteristics()` on each Stream. For me (on Windows 7 64-bit), the first returns only `DISTINCT`, while the second returns `ORDERED|SIZED|IMMUTABLE|SUBSIZED`. – VGR Dec 17 '15 at 19:16
  • @Holger Does a stream need to know its size beforehand to balance the load? What size does `IntStream.iterate(0, i -> i + 2);` have? – kliron Dec 17 '15 at 19:16
  • Ah, infinite streams don't play well at all with parallel(). Oh well... – kliron Dec 17 '15 at 19:22
  • 9
    By "don't play well at all", what you really mean is "it is harder to effectively extract parallelism from an infinite, lazily generated source than from a materialized finite source like an array or collection", and that is reflected in the library performance. – Brian Goetz Dec 17 '15 at 19:27

3 Answers3

32

The problem is that current implementation of Stream API along with the current implementation of IteratorSpliterator for unknown size source badly splits such sources to parallel tasks. You were lucky having more than 1024 files, otherwise you would have no parallelization benefit at all. Current Stream API implementation takes into account the estimateSize() value returned from Spliterator. The IteratorSpliterator of unknown size returns Long.MAX_VALUE before split and its suffix always returns Long.MAX_VALUE as well. Its splitting strategy is the following:

  1. Define the current batch size. Current formula is to start with 1024 elements and increase arithmetically (2048, 3072, 4096, 5120 and so on) until MAX_BATCH size is reached (which is 33554432 elements).
  2. Consume input elements (in your case Paths) into array until the batch size is reached or input is exhausted.
  3. Return an ArraySpliterator iterating over the created array as prefix, leaving itself as suffix.

Suppose you have 7000 files. Stream API asks for estimated size, IteratorSpliterator returns Long.MAX_VALUE. Ok, Stream API asks the IteratorSpliterator to split, it collects 1024 elements from the underlying DirectoryStream to the array and splits to ArraySpliterator (with estimated size 1024) and itself (with estimated size which is still Long.MAX_VALUE). As Long.MAX_VALUE is much much more than 1024, Stream API decides to continue splitting the bigger part without even trying to split the smaller part. So the overall splitting tree goes like this:

                     IteratorSpliterator (est. MAX_VALUE elements)
                           |                    |
ArraySpliterator (est. 1024 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 2048 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 3072 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 856 elements)    IteratorSpliterator (est. MAX_VALUE elements)
                                                    |
                                        (split returns null: refuses to split anymore)

So after that you have five parallel tasks to be executed: actually containing 1024, 2048, 3072, 856 and 0 elements. Note that even though the last chunk has 0 elements, it still reports that it has estimatedly Long.MAX_VALUE elements, so Stream API will send it to the ForkJoinPool as well. The bad thing is that Stream API thinks that further splitting of first four tasks is useless as their estimated size is much less. So what you get is very uneven splitting of the input which utilizes four CPU cores max (even if you have much more). If your per-element processing takes roughly the same time for any element, then the whole process would wait for the biggest part (3072 elements) to complete. So maximum speedup you may have is 7000/3072=2.28x. Thus if sequential processing takes 41 seconds, then the parallel stream will take around 41/2.28 = 18 seconds (which is close to your actual numbers).

Your work-around solution is completely fine. Note that using Files.list().parallel() you also have all the input Path elements stored in the memory (in ArraySpliterator objects). Thus you will not waste more memory if you manually dump them into the List. Array-backed list implementations like ArrayList (which is currently created by Collectors.toList()) can split evenly without any problems, which results in additional speed-up.

Why such case is not optimized? Of course it's not impossible problem (though implementation could be quite tricky). It seems that it's not high-priority problem for JDK developers. There were several discussions on this topic in mailing lists. You may read Paul Sandoz message here where he comments on my optimization effort.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • 4
    This is a great explanation !! +1 – Alexis C. Dec 18 '15 at 11:02
  • Thanks for the explanation. Out of interest, how does this part work: "The bad thing is that Stream API thinks that further splitting of first four tasks is useless as their estimated size is much less." – Jackie Mar 10 '21 at 02:37
  • @lwpro2 it sees that the estimated size of the right part is much much bigger than the estimated size of the left part, so it doesn't even try splitting the left part anymore. And the right part (that actually contains 0 elements) refuses to split (which is quite expected). Note that all the splitting is done before actual element processing, so the pipeline doesn't know that the rightmost part is actually empty when it decides where to split. – Tagir Valeev Mar 12 '21 at 10:08
  • Thanks @TagirValeev. Seems like in the code, it was the `sizeThreshold` wrongly calculated based on `Long.MAX_VALUE`. It stops further splitting any part smaller than that threshold. – Jackie Mar 13 '21 at 14:13
5

As an alternative, you may use this custom spliterator specially tailored for DirectoryStream:

public class DirectorySpliterator implements Spliterator<Path> {
    Iterator<Path> iterator;
    long est;

    private DirectorySpliterator(Iterator<Path> iterator, long est) {
        this.iterator = iterator;
        this.est = est;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Path> action) {
        if (iterator == null) {
            return false;
        }
        Path path;
        try {
            synchronized (iterator) {
                if (!iterator.hasNext()) {
                    iterator = null;
                    return false;
                }
                path = iterator.next();
            }
        } catch (DirectoryIteratorException e) {
            throw new UncheckedIOException(e.getCause());
        }
        action.accept(path);
        return true;
    }

    @Override
    public Spliterator<Path> trySplit() {
        if (iterator == null || est == 1)
            return null;
        long e = this.est >>> 1;
        this.est -= e;
        return new DirectorySpliterator(iterator, e);
    }

    @Override
    public long estimateSize() {
        return est;
    }

    @Override
    public int characteristics() {
        return DISTINCT | NONNULL;
    }

    public static Stream<Path> list(Path parent) throws IOException {
        DirectoryStream<Path> ds = Files.newDirectoryStream(parent);
        int splitSize = Runtime.getRuntime().availableProcessors() * 8;
        DirectorySpliterator spltr = new DirectorySpliterator(ds.iterator(), splitSize);
        return StreamSupport.stream(spltr, false).onClose(() -> {
            try {
                ds.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
    }
}

Just replace Files.list with DirectorySpliterator.list and it will parallelize evenly without any intermediate buffering. Here we use the fact that DirectoryStream produces a directory list without any specific order, so every parallel thread will just take a subsequent entry from it (in synchronized manner, as we already have synchronous IO operations, additional synchronization has next-to-nothing overhead). The parallel order will differ every time (even if forEachOrdered is used), but Files.list() does not guarantee the order as well.

The only non-trivial part here is how many parallel tasks to create. As we don't know how many files in the folder until we traverse it, it's good to use availableProcessors() as a base. I create about 8 x availableProcessors() individual tasks, which seems to be a good fine-grained/coarse-grained compromise: if per-element processing is uneven, having more tasks than processors would help to balance the load.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • 1
    Keep in mind that the number of CPUs doesn’t need to be a power of two, it doesn’t even need to be an even number. iirc, AMD made once CPUs with three cores and well, you might get the idea why they didn’t take off, too many software failed to balance their work properly… – Holger Dec 18 '15 at 09:41
  • @Holger, yes I thought about it. I even worked on 3-core AMD CPU. It's just 4-core machine with one core disabled (either it failed some factory tests or for marketing reasons, to sell some CPUs cheaper). It's not very hard to fix this in my code and number of sub-tasks is a rough estimate anyways. – Tagir Valeev Dec 18 '15 at 09:49
  • Can you explain why `long e = this.est >>> 1; this.est -= e;` are needed? – Alex R Oct 24 '19 at 03:14
2

Another alternative to your workaround is to use .collect(Collectors.toList()).parallelStream() on your stream like

try(Stream<Path> paths = Files.list(infoDir)) {
    paths
        .collect(Collectors.toList())
        .parallelStream()
        .map(this::extract)
        .forEachOrdered(drug -> {
            try {
                marshaller.write(drug);
            } catch (JAXBException ex) {
                ex.printStackTrace();
            }
        });
}

With this you don't need to call .map(f -> infoDir.resolve(f)) and the performance should be similar to your second solution.

asmaier
  • 11,132
  • 11
  • 76
  • 103