16

I'm trying to understand why the following Java program gives an OutOfMemoryError, while the corresponding program without .parallel() doesn't.

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

I have two questions:

  1. What is the intended output of this program?

    Without .parallel() it seems that this simply outputs sum(1+2+3+...) which means that it simply "gets stuck" at the first stream in the flatMap, which makes sense.

    With parallel I don't know if there is an expected behaviour, but my guess would be that it somehow interleaved the first n or so streams, where n is the number of parallel workers. It could also be slightly different based on the chunking/buffering behaviour.

  2. What causes it to run out of memory? I'm specifically trying to understand how these streams are implemented under the hood.

    I'm guessing something blocks the stream, so it never finishes and is able to get rid of the generated values, but I don't quite know in which order things are evaluated and where buffering occurs.

Edit: In case it is relevant, I'm using Java 11.

Editt 2: Apparently the same thing happens even for the simple program IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(), so it might have to do with the lazyness of limit rather than flatMap.

Lukas Körfer
  • 13,515
  • 7
  • 46
  • 62
Thomas Ahle
  • 30,774
  • 21
  • 92
  • 114

3 Answers3

11

You say “but I don't quite know in which order things are evaluated and where buffering occurs”, which is precisely what parallel streams are about. The order of evaluation is unspecified.

A critical aspect of your example is the .limit(100_000_000). This implies that the implementation can’t just sum up arbitrary values, but must sum up the first 100,000,000 numbers. Note that in the reference implementation, .unordered().limit(100_000_000) doesn’t change the outcome, which indicates that there’s no special implementation for the unordered case, but that’s an implementation detail.

Now, when worker threads process the elements, they can’t just sum them up, as they have to know which elements they are allowed to consume, which depends on how many elements are preceding their specific workload. Since this stream doesn’t know the sizes, this can only be known when the prefix elements have been processed, which never happens for infinite streams. So the worker threads keep buffering for the moment, this information becomes available.

In principle, when a worker thread knows that it processes the leftmost¹ work-chunk, it could sum up the elements immediately, count them, and signal the end when reaching the limit. So the Stream could terminate, but this depends on a lot of factors.

In your case, a plausible scenario is that the other worker threads are faster in allocating buffers than the leftmost job is counting. In this scenario, subtle changes to the timing could make the stream occasionally return with a value.

When we slow down all worker threads except the one processing the leftmost chunk, we can make the stream terminate (at least in most runs):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ I’m following a suggestion by Stuart Marks to use left-to-right order when talking about the encounter order rather than the processing order.

Naman
  • 27,789
  • 26
  • 218
  • 353
Holger
  • 285,553
  • 42
  • 434
  • 765
  • Very nice answer! I wonder if there is even a risk that all the threads start running the flatMap operations, and none get allocated to actually empty the buffers (summing)? In my actual use case the infinite streams are instead files too large to keep in memory. I wonder how I may rewrite the stream to keep memory usage down? – Thomas Ahle Jan 31 '20 at 13:59
  • 1
    Are you using `Files.lines(…)`? It has been improved significantly in Java 9. – Holger Jan 31 '20 at 14:15
  • it seems that it just calls into `BufferedReader.lines`, which is just a spliterator wrapper for a simple line iterator. So I expect it will keep pushing into my memory until I run out of space, just like `iterate`. But I will have to test it of course. – Thomas Ahle Jan 31 '20 at 14:34
  • 1
    This is what it does in Java 8. In newer JREs, it will still fall back to `BufferedReader.lines()` in certain circumstances (not the default filesystem, a special charset, or the size larger than `Integer.MAX_FILES`). If one of these applies, a custom solution could help. This would be worth a new Q&A… – Holger Jan 31 '20 at 14:46
  • Ah, it has a nice FileChannelLinesSpliterator now. However, I'm not sure that applies to `flatMap(path -> Files.lines(path))` since flatMap transforms the inner stream to a sequential and just pushes everything through. (IIUC) – Thomas Ahle Jan 31 '20 at 15:21
  • 1
    What is the outer stream, a stream of files? Does it have a predictable size? – Holger Jan 31 '20 at 16:36
5

My best guess is that adding parallel() changes the internal behavior of flatMap() which already had problems being evaluated lazily before.

The OutOfMemoryError error that you are getting was reported in [JDK-8202307] Getting a java.lang.OutOfMemoryError: Java heap space when calling Stream.iterator().next() on a stream which uses an infinite/very big Stream in flatMap. If you look at the ticket it's more or less the same stack trace that you are getting. The ticket was closed as Won't Fix with following reason:

The iterator() and spliterator() methods are "escape hatches" to be used when it's not possible to use other operations. They have some limitations because they turn what is a push model of the stream implementation into a pull model. Such a transition requires buffering in certain cases, such as when an element is (flat) mapped to two or more elements. It would significantly complicate the stream implementation, likely at the expense of common cases, to support a notion of back-pressure to communicate how many elements to pull through nested layers of element production.

Karol Dowbecki
  • 43,645
  • 9
  • 78
  • 111
  • This is very interesting! It makes sense that the push/pull transition requires buffering which may use up the memory. However in my case it seems that using just push should work fine and simply discarding the remaining elements as they appear? Or maybe you're saying that flapmap causes an iterator to be created? – Thomas Ahle Jan 31 '20 at 11:57
3

OOME is caused not by the stream being infinite, but by the fact that it isn't.

I.e., if you comment out the .limit(...), it will never run out of memory -- but of course, it will never end either.

Once it's split, the stream can only keep track of the number of elements if they're accumulated within each thread (looks like the actual accumulator is Spliterators$ArraySpliterator#array).

Looks like you can reproduce it without flatMap, just run the following with -Xmx128m:

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

However, after commenting out the limit(), it should run fine until you decide to spare your laptop.

Besides the actual implementation details, here's what I think is happening:

With limit, the sum reducer wants the first X elements to sum up, so no thread can emit partial sums. Each "slice" (thread) will need to accumulate elements and pass them through. Without limit, there's no such constraint so each "slice" will just compute the partial sum out of the elements it gets (forever), assuming it will emit the result eventually.

Costi Ciudatu
  • 37,042
  • 7
  • 56
  • 92
  • What do you mean "once it's split"? Does limit split it somehow? – Thomas Ahle Jan 31 '20 at 12:19
  • @ThomasAhle `parallel()` will use `ForkJoinPool` internally to achieve parallelism. The `Spliterator` will be used to assign work to each `ForkJoin` task, I guess we can call the unit of work here as "split". – Karol Dowbecki Jan 31 '20 at 12:37
  • But why does that only happen with limit? – Thomas Ahle Jan 31 '20 at 12:38
  • Oddly, while `IntStream.iterate(1,i->i+1).parallel().sum()` runs fine (doesn't stop, but also doesn't crash); the version `IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()` runs out of memory. This suggests `limit` is doing something strange, more than just ordering the elements. – Thomas Ahle Jan 31 '20 at 14:14
  • 1
    @ThomasAhle set a breakpoint in `Integer.sum()`, used by the `IntStream.sum` reducer. You'll see that the no-limit version calls that function all the time, while the limited version never gets to call it before OOM. – Costi Ciudatu Jan 31 '20 at 14:27
  • @CostiCiudatu Why do you think that happens? – Thomas Ahle Jan 31 '20 at 17:19
  • I did try to explain this in the answer itself. – Costi Ciudatu Jan 31 '20 at 18:23
  • You should also check @Holger' s answer, which is way more rigorous than mine. – Costi Ciudatu Jan 31 '20 at 18:30