16

I cannot achieve good parallelization of stream processing when the stream source is a Reader. Running the code below on a quad-core CPU I observe 3 cores being used at first, then a sudden drop to just two, then one core. Overall CPU utilization hovers around 50%.

Note the following characteristics of the example:

  • there are just 6,000 lines;
  • each line takes about 20 ms to process;
  • the whole procedure takes about a minute.

That means that all the pressure is on the CPU and I/O is minimal. The example is a sitting duck for automatic parallelization.

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

... class imports elided ...    

public class Main
{
  static final AtomicLong totalTime = new AtomicLong();

  public static void main(String[] args) throws IOException {
    final long start = System.nanoTime();
    final Path inputPath = createInput();
    System.out.println("Start processing");

    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
      Files.lines(inputPath).parallel().map(Main::processLine)
        .forEach(w::println);
    }

    final double cpuTime = totalTime.get(),
                 realTime = System.nanoTime()-start;
    final int cores = Runtime.getRuntime().availableProcessors();
    System.out.println("          Cores: " + cores);
    System.out.format("       CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
    System.out.format("      Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
    System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
  }

  private static String processLine(String line) {
    final long localStart = System.nanoTime();
    double ret = 0;
    for (int i = 0; i < line.length(); i++)
      for (int j = 0; j < line.length(); j++)
        ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
    final long took = System.nanoTime()-localStart;
    totalTime.getAndAdd(took);
    return NANOSECONDS.toMillis(took) + " " + ret;
  }

  private static Path createInput() throws IOException {
    final Path inputPath = Paths.get("input.txt");
    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
      for (int i = 0; i < 6_000; i++) {
        final String text = String.valueOf(System.nanoTime());
        for (int j = 0; j < 25; j++) w.print(text);
        w.println();
      }
    }
    return inputPath;
  }
}

My typical output:

          Cores: 4
       CPU time: 110.23 s
      Real time: 53.60 s
CPU utilization: 51.41%

For comparison, if I use a slightly modified variant where I first collect into a list and then process:

Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
  .forEach(w::println);

I get this typical output:

          Cores: 4
       CPU time: 138.43 s
      Real time: 35.00 s
CPU utilization: 98.87%

What could explain that effect, and how can I work around it to get full utilization?

Note that I have originally observed this on a reader of servlet input stream so it's not specific to a FileReader.

Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
  • 1
    Could you perhaps consider writing the lambda that you use in the `map` resp `mapToObj` as a method? I think it would improve readability a lot. – skiwi Mar 21 '14 at 20:38
  • 1
    It is just my opinion, but I prefer concise chaining in the lambda, and then have some helper methods around. You know what exactly happens in your chain (obtain stream, map, parallel, forEach), but for outsiders it is not that obvious at first glance. – skiwi Mar 21 '14 at 20:43
  • 1
    I've improved the code according to your guidelines. – Marko Topolnik Mar 21 '14 at 20:51

4 Answers4

7

Here is the answer, spelled out in the source code of Spliterators.IteratorSpliterator, the one used by BufferedReader#lines():

    @Override
    public Spliterator<T> trySplit() {
        /*
         * Split into arrays of arithmetically increasing batch
         * sizes.  This will only improve parallel performance if
         * per-element Consumer actions are more costly than
         * transferring them into an array.  The use of an
         * arithmetic progression in split sizes provides overhead
         * vs parallelism bounds that do not particularly favor or
         * penalize cases of lightweight vs heavyweight element
         * operations, across combinations of #elements vs #cores,
         * whether or not either are known.  We generate
         * O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
         * potential speedup.
         */
        Iterator<? extends T> i;
        long s;
        if ((i = it) == null) {
            i = it = collection.iterator();
            s = est = (long) collection.size();
        }
        else
            s = est;
        if (s > 1 && i.hasNext()) {
            int n = batch + BATCH_UNIT;
            if (n > s)
                n = (int) s;
            if (n > MAX_BATCH)
                n = MAX_BATCH;
            Object[] a = new Object[n];
            int j = 0;
            do { a[j] = i.next(); } while (++j < n && i.hasNext());
            batch = j;
            if (est != Long.MAX_VALUE)
                est -= j;
            return new ArraySpliterator<>(a, 0, j, characteristics);
        }
        return null;
    }

Also noteworthy are the constants:

static final int BATCH_UNIT = 1 << 10;  // batch array size increment
static final int MAX_BATCH = 1 << 25;  // max batch array size;

So in my example, where I use 6,000 elements, I get just three batches because the batch size step is 1024. That precisely explains my observation that initially three cores are used, dropping to two and then one as the smaller batches complete. In the meantime I tried a modified example with 60,000 elements and then I get almost 100% CPU utilization.

To solve my problem I have developed the code below which allows me to turn any existing stream into one whose Spliterator#trySplit will partition it into batches of specified size. The simplest way to use it for the use case from my question is like this:

toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)

On a lower level, the class below is a spliterator wrapper which changes the wrapped spliterator's trySplit behavior and leaves other aspects unchanged.


import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
  private final Spliterator<T> spliterator;
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
    final int c = toWrap.characteristics();
    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
    this.spliterator = toWrap;
    this.est = est;
    this.batchSize = batchSize;
  }
  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
    this(toWrap, toWrap.estimateSize(), batchSize);
  }

  public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
    return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!spliterator.tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}
Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
  • I'd suggest submitting this somewhere to the Java team. Be it via a bug report or on the mailing list. Maybe they can even provide a reason of why it is not implemented at the moment. – skiwi Mar 22 '14 at 17:44
  • 1
    @skiwi I have posted to `lambda-dev`: http://mail.openjdk.java.net/pipermail/lambda-dev/2014-March/011968.html I'll report back here with information I obtain. – Marko Topolnik Mar 22 '14 at 21:43
  • 1
    Well done. The difficulty is finding the right split size given an unknown (to the library) per-element cost, in the presence of an unknown stream size. There ought to be an API for tuning this but we haven't figured out what it should be yet. I hope the lambda-dev thread proves useful. – Stuart Marks Mar 23 '14 at 01:45
  • @StuartMarks Are there any conditions under which the originally large `ArraySpliterator` is again split into smaller pieces? I'm still struggling to understand the full picture; I've tried going through the code but it's very involved. – Marko Topolnik Mar 23 '14 at 07:33
  • @StuartMarks BTW I do realize what you're up against: an onslaught of angry Java devs convinced that parallel Streams destroyed their performance. The largest real-life threat is parallelization at a too-fine granularity---hence the conservative batch step of 1024. Compare Rich Hickey's approach: "here's `pmap`, it submits one concurrent task for each element. Making it perform is your responsibility." Clojure does make it simple to batch together elements with `partition`, though. I've been looking for an equivalent of that in Streams, too. – Marko Topolnik Mar 23 '14 at 07:50
  • 2
    @Marko, Looks like some conversation is occurring on lambda-dev now. Great. Re the "onslaught of angry Java devs" point, I think there are only two states a system can be in: **not good enough** or **irrelevant**. It's good that people try stuff and push the envelope, because that's how things get better. – Stuart Marks Mar 25 '14 at 02:20
  • @StuartMarks The problem is that Streams have been in alpha/beta for years, yet the "Java devs" didn't bother trying it. They should have exclaimed that the emperor has no clothes long ago, before this travesty went RTM. The right approach would have been to profile the per-element cost at runtime, and adjust the block size dynamically. It's nothing that the JVM isn't doing already, but it could have just as easily been done at the library level. – Aleksandr Dubinsky Mar 25 '14 at 23:29
  • @MarkoTopolnik: I noticed that the `est` argument is not used in the constructor. I take it that this is a mistake? – Lii Apr 12 '14 at 14:06
  • @lii Thanks for noticing, fixed. The earlier version didn't sport setting an explicit `est`, this was a leftover from that version. – Marko Topolnik Apr 12 '14 at 15:40
  • @MarkoTopolnik Did you learn why the `ArraySpliterator` does not get split further? – Aleksandr Dubinsky May 30 '15 at 16:00
  • 1
    @AleksandrDubinsky Yes, but now I have to remember :) I think it's because the splitting policy is not guided by the overall number of splits (which is a global property), but rather by the target *size* of the split, which is something which can be evaluated and applied locally, in the method which only has the one array in hand. For a sequence of an indefinite size the target split size cannot be calculated in advance such that it produces the desired number of splits. – Marko Topolnik May 31 '15 at 07:46
5

This problem is to some extent fixed in Java-9 early access builds. The Files.lines was rewritten and now upon splitting it actually jumps into the middle of memory-mapped file. Here's the results on my machine (which has 4 HyperThreading cores = 8 hardware threads):

Java 8u60:

Start processing
          Cores: 8
       CPU time: 73,50 s
      Real time: 36,54 s
CPU utilization: 25,15%

Java 9b82:

Start processing
          Cores: 8
       CPU time: 79,64 s
      Real time: 10,48 s
CPU utilization: 94,95%

As you can see, both real time and CPU utilization is greatly improved.

This optimization has some limitations though. Currently it works only for several encodings (namely UTF-8, ISO_8859_1 and US_ASCII) as for arbitrary encoding you don't know exactly how line-break is encoded. It's limited to the files of no more than 2Gb size (due to limitations of MappedByteBuffer in Java) and of course does not work for some non-regular files (like character devices, named pipes which cannot be memory-mapped). In such cases the old implementation is used as the fallback.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • I used `Files.lines` just as an MCVE of my actual issue, which is about a non-reducibly sequential stream (SQL result set). But---good to know :) – Marko Topolnik Oct 26 '15 at 11:17
  • 2
    @MarkoTopolnik, yes, for `BufferedReader.lines` the problem is still there as well as for other things which have `Spliterators.spliteratorUnknownSize(..)` under the hood. I [proposed](http://mail.openjdk.java.net/pipermail/core-libs-dev/2015-July/034528.html) some improvement which would make the result better in general (though not reaching top possible speed), but it was rejected by Paul Sandoz. – Tagir Valeev Oct 26 '15 at 11:23
  • I find issue with the general attitude in the core Java team that the only correct way to parallelize is by loading everything into RAM. Arguments to the opposite are too obvious to even bother stating. – Marko Topolnik Oct 26 '15 at 17:59
2

The parallel execution of streams is based on a fork-join model. For ordered streams, the parallel execution only works, if the stream can be split into parts, strictly following one another. In general, that's not possible with streams generated by BufferedReader. However, in theory parallel execution should be possible for unordered streams:

BufferedReader reader = ...;
reader.lines().unordered().map(...);

I am not sure if the stream returned by BufferedReader supports this kind of parallel execution. A very simple alternative is to create an intermediate list:

BufferedReader reader = ...;
reader.lines().collect(toList()).parallelStream().map(...);

In this case, the parallel execution starts after all lines have been read. This might be a problem, if reading the lines takes a long time. In this case, I recommend using an ExecutorService for parallel execution instead of parallel streams:

ExecutorService executor = ...;
BufferedReader reader = ...;
reader.lines()
   .map(line -> executor.submit(() -> ... line ...))
   .collect(toList())
   .stream()
   .map(future -> future.get())
   .map(...);
nosid
  • 48,932
  • 13
  • 112
  • 139
  • 1
    I don't see a fundamental obstacle in reading a chunk of input for one thread, handing off to another thread which reads its chunk, and so on. That's in fact the basic premise of how a `Spliterator` must be used. – Marko Topolnik Mar 21 '14 at 21:35
  • A for your suggestion to use an ExecutorService, that sounds good, except that it wouldn't be acceptable for me to `collect(toList())` at any point. Coludn't that be avoided? – Marko Topolnik Mar 21 '14 at 21:36
  • `forEach()` always implies `unordered()`, and `forEachOrdered()` typically prevents parallelism. – Aleksandr Dubinsky Mar 22 '14 at 01:39
  • @MarkoTopolnik What about collecting the results in a `BlockingQueue` with a set size constraint? I am not sure if that would be allowed, but it would turn it quite into *streaming* (and not using huge amounts of memory) **if** it works. – skiwi Mar 22 '14 at 11:18
  • @skiwi Since I have identified the precise cause of the problem, the proper fix would be to employ a spliterator with user-specified batch size. Now I find it quite a prominent missing feature that there is no `lines(batchSize)` method and similar for other cases. – Marko Topolnik Mar 22 '14 at 11:55
1

To find the real cause of this, you need to dig into the Files.lines() source, which calls the BufferedReader.lines(), which is the following:

public Stream<String> lines() {
    Iterator<String> iter = new Iterator<String>() {
        String nextLine = null;

        @Override
        public boolean hasNext() {
            if (nextLine != null) {
                return true;
            } else {
                try {
                    nextLine = readLine();
                    return (nextLine != null);
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
        }

        @Override
        public String next() {
            if (nextLine != null || hasNext()) {
                String line = nextLine;
                nextLine = null;
                return line;
            } else {
                throw new NoSuchElementException();
            }
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            iter, Spliterator.ORDERED | Spliterator.NONNULL), false);
}

Here it returns a Stream<String> that is:

  • Of unknown size
  • Ordered
  • Not null
  • Not parallel (the false argument at the end of StreamSupport.stream()

And hence I am really unsure of whether it is even subject to be being parallellised, this could be found by digging even further into the source.

What I do know is that parallel streams get explicitely provided in the Java APIs. Take for example List, it has a List.stream() and List.parallelStream() method.

skiwi
  • 66,971
  • 31
  • 131
  • 216
  • I thought `parallelStream` was just a convenience for `stream().parallel()`. – Marko Topolnik Mar 21 '14 at 21:07
  • @MarkoTopolnik I cannot confirm that as of now, but I do know that JDK developers are heavily against method explosion, so wherever they can safe a method, they will heavily consider removing a method. So that leads me to think that if `parallelStream()` were the same as `stream().parallel()`, they would not have included `parallelStream()`... – skiwi Mar 21 '14 at 21:09
  • OK, how about the flipside: why would there even be a `parallel` method if a stream cannot be parallelized? Anyway, why doesn't the `map` step get parallelized? Since that's what I'm really looking for here. – Marko Topolnik Mar 21 '14 at 21:10
  • @MarkoTopolnik Forget about what I just said. The documentation here (http://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html#executing_streams_in_parallel) implies that every single stream can get parallelized. Left to investigate is what happens when lazily populated and parallel combine. – skiwi Mar 21 '14 at 21:14
  • @MarkoTopolnik You could still try to add `.unordered()` to your stream, and also check if it matters if you put `.parallel()` directly after your `.lines()`. – skiwi Mar 21 '14 at 21:16
  • I had `parallel()` there first, I put it below just to see if *that* helps :) I tried `unordered()`, no change. – Marko Topolnik Mar 21 '14 at 21:16
  • @MarkoTopolnik If it turns out to help, you can then again do a `forEachOrdered()` instead of `forEach()` to ensure order. The only question that remains then is whether you still get any performance benefits... – skiwi Mar 21 '14 at 21:19
  • I actually edited my comment with the negative result which I have obtained :) – Marko Topolnik Mar 21 '14 at 21:21
  • 1
    `unordered()` does nothing since `forEach()` always implies `unordered()`. Likewise, `forEachOrdered()` will make the entire pipeline ordered (and probably not parallelized). – Aleksandr Dubinsky Mar 22 '14 at 01:37