I've written a streams implementation that performs four simple reductions (+ and <) on the lines of a file.
At first I performed four streams, but I decided to write my own accumulator and combiner so that I could perform all four reductions in one stream. On small data sets (10,000,000 lines) this reduces runtime to about 1/4 as expected, and runs in 14 seconds on my hardware.
fileIn = new BufferedReader(new InputStreamReader(
new URL(args[0].trim()).openStream()));
final Results results = fileIn.lines()
.parallel()
.skip(1)
.map(User::parse)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Results::new, Results::accumulate, Results::combine);
Results::accumulate
and Results::combine
correctly combine Users into Results and Results with Results respectively, and this implementation works
great for small data sets.
I tried using .reduce()
as well, and results are similar, but I tried .collect()
to reduce the creation of short-lived objects.
The problem is that when I use real-world sized data with 1 billion lines I am hitting an issue that suggests that Java 8 streams are incapable of the task. The heap memory is observed in JConsole to climb to the allocated 12 GB in a roughly linear fashion and then OOM.
I was under the impression that the collector or reducer would provide performance comparable to an iterative solution, which should be bounded by CPU and IO but not memory, because the reduction step produces a Result that doesn't grow, it's a reduction!
When I take a heap dump and put it into jhat I see that about 7GB is taken up with Strings, and these strings can clearly be seen to be the lines of the input file. I feel they should not be in memory at all, but jhat shows a very large ForkJoin related structure being accumulated in memory:
Static reference from java.util.concurrent.ForkJoinPool.common (from class java.util.concurrent.ForkJoinPool) :
--> java.util.concurrent.ForkJoinPool@0x786d41db0 (76 bytes) (field workQueues:)
--> [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598 (144 bytes) (Element 3 of [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598:)
--> java.util.concurrent.ForkJoinPool$WorkQueue@0x786d41ee8 (96 bytes) (field currentSteal:)
--> java.util.stream.SliceOps$SliceTask@0x7b4ac6cb0 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b379ad18 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b25bdb68 (130 bytes) (field leftChild:)
--> java.util.stream.SliceOps$SliceTask@0x7b379acb8 (130 bytes) (field localResult:)
--> java.util.stream.Nodes$SpinedNodeBuilder@0x7b25fdda0 (53 bytes) (field spine:)
--> [[Ljava.lang.Object;@0x7b25ffe48 (144 bytes) (Element 12 of [[Ljava.lang.Object;@0x7b25ffe48:)
--> [Ljava.lang.Object;@0x7b37c4f20 (262160 bytes) (Element 19598 of [Ljava.lang.Object;@0x7b37c4f20:)
--> 31ea87ba876505645342b31928394b3c,2013-11-24T23:02:17+00:00,898,22200,1314,700 (28 bytes) (field value:)
--> [C@0x7b2ffff88 (170 bytes) // <<<< There are thousands of these
There are other references in ApplicationShutdownHooks, Local references, and System Classes, but this one I show is the crux of the problem, and it causes the memory to grow O(n) when
Does the streams implementation make this O(1) memory problem O(n) memory by holding all the Strings in the ForkJoin classes?? I love streams and I don't want this to be so :(