15

I've written a streams implementation that performs four simple reductions (+ and <) on the lines of a file.

At first I performed four streams, but I decided to write my own accumulator and combiner so that I could perform all four reductions in one stream. On small data sets (10,000,000 lines) this reduces runtime to about 1/4 as expected, and runs in 14 seconds on my hardware.

fileIn = new BufferedReader(new InputStreamReader(
            new URL(args[0].trim()).openStream()));

final Results results = fileIn.lines()
        .parallel()
        .skip(1)
        .map(User::parse)
        .filter(Optional::isPresent)
        .map(Optional::get)
        .collect(Results::new, Results::accumulate, Results::combine);

Results::accumulate and Results::combine correctly combine Users into Results and Results with Results respectively, and this implementation works great for small data sets.

I tried using .reduce() as well, and results are similar, but I tried .collect() to reduce the creation of short-lived objects.

The problem is that when I use real-world sized data with 1 billion lines I am hitting an issue that suggests that Java 8 streams are incapable of the task. The heap memory is observed in JConsole to climb to the allocated 12 GB in a roughly linear fashion and then OOM.

I was under the impression that the collector or reducer would provide performance comparable to an iterative solution, which should be bounded by CPU and IO but not memory, because the reduction step produces a Result that doesn't grow, it's a reduction!

When I take a heap dump and put it into jhat I see that about 7GB is taken up with Strings, and these strings can clearly be seen to be the lines of the input file. I feel they should not be in memory at all, but jhat shows a very large ForkJoin related structure being accumulated in memory:

Static reference from java.util.concurrent.ForkJoinPool.common (from class java.util.concurrent.ForkJoinPool) :

--> java.util.concurrent.ForkJoinPool@0x786d41db0 (76 bytes) (field workQueues:)
--> [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598 (144 bytes) (Element 3 of [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598:)
--> java.util.concurrent.ForkJoinPool$WorkQueue@0x786d41ee8 (96 bytes) (field currentSteal:)
--> java.util.stream.SliceOps$SliceTask@0x7b4ac6cb0 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b379ad18 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b25bdb68 (130 bytes) (field leftChild:)
--> java.util.stream.SliceOps$SliceTask@0x7b379acb8 (130 bytes) (field localResult:)
--> java.util.stream.Nodes$SpinedNodeBuilder@0x7b25fdda0 (53 bytes) (field spine:)
--> [[Ljava.lang.Object;@0x7b25ffe48 (144 bytes) (Element 12 of [[Ljava.lang.Object;@0x7b25ffe48:)
--> [Ljava.lang.Object;@0x7b37c4f20 (262160 bytes) (Element 19598 of [Ljava.lang.Object;@0x7b37c4f20:)
--> 31ea87ba876505645342b31928394b3c,2013-11-24T23:02:17+00:00,898,22200,1314,700 (28 bytes) (field value:)
--> [C@0x7b2ffff88 (170 bytes) // <<<< There are thousands of these

There are other references in ApplicationShutdownHooks, Local references, and System Classes, but this one I show is the crux of the problem, and it causes the memory to grow O(n) when

Does the streams implementation make this O(1) memory problem O(n) memory by holding all the Strings in the ForkJoin classes?? I love streams and I don't want this to be so :(

denvercoder9
  • 2,979
  • 3
  • 28
  • 41
spl
  • 510
  • 4
  • 15
  • 9
    https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#skip-long- `While skip() is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines, especially for large values of n, since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order.` – Marko Topolnik Nov 09 '16 at 11:58
  • 5
    Removal of the skip(1) shows no heap growth in JConsole. Running some tests but this seems to be the answer :) – spl Nov 09 '16 at 12:41
  • 5
    Note that the fix is easier than you might currently think: calling `BufferedReader.readLine()` once before calling `.lines()` has the effect of skipping the first line (you can even keep that header line for further processing). – Holger Nov 09 '16 at 12:48
  • 2
    @Holger That's precisely what i was wondering if it would work. Glad to hear it does. – Marko Topolnik Nov 09 '16 at 12:54
  • 1
    See also [here](http://stackoverflow.com/a/38122274/2711488). @Marko Topolnik: The [documentation of `lines()`](https://docs.oracle.com/javase/8/docs/api/java/io/BufferedReader.html#lines--) clearly says “*Returns a Stream, the elements of which are lines read from this BufferedReader*” which implies the dependency on the reader’s state. It also clearly states that the reader’s state *after* the stream operation is unspecified. – Holger Nov 09 '16 at 12:57
  • 1
    Removing the skip makes runtime scale up about linear with number of lines. The memory bounces around as GC collects (I expect) all those User objects I'm parsing into existence. And yes, doing a .readLine() removes the first line, which is a header line, and removes it from the stream. Thanks all around. – spl Nov 09 '16 at 15:45
  • 1
    You may also swap skip(1) with parallel(). – Edo user1419293 Nov 14 '16 at 06:06
  • You may want to look at what your `User.parse()` method is doing. If it is parsing some data out of each line using something like `string.substring()` then you might be accidentally holding a reference to the full String from each line even if what you want is only a part of that line. In this case you'd want `new String(string.substring())`. (Of course, I'm just making a guess about what you are trying to parse from your file.) – Enwired Nov 15 '16 at 20:56

1 Answers1

1

Thanks to Marko Topolnik and Holger for coming to the correct answer. Though neither posted an answer for me to accept, so I'll try to tie this up for posterity :)

The .skip(1) is very expensive on a parallel stream because it requires ordering to skip exactly the first entry, as per the Javadoc for Stream.skip()

Reading the first line of the BufferedReader before calling .lines() on it does successfully skip the first line in my implementation.

Then removing the .skip() solves the memory problem, and it is observed in JConsole to bounce around nicely and return to < 1GB on every garbage collection even if the program processes 1 billion lines. This is desired behaviour and is close enough to O(1) memory for my purposes.

Contrary to a suggestion above, the relative locations of .parallel() and .skip(1) do not matter, you cannot re-order them to make the .skip(1) happen "before" the .parallel(). The builder pattern suggests that ordering is important, and it is for other intermediate operations, but not for this one. I remember this subtlety from my OCP certification materials, but it doesn't appear to be in the Javadoc, hence no reference. I have, however, confirmed this experimentally by making the isolated change and observing the regression in JConsole, and associated OOM.

spl
  • 510
  • 4
  • 15