1

I have a collection of streams which i am trying to ZIP together into 1 stream. I am using guava Streams.zip method to zip the stream. It works fine when number of streams in collection is below 8000, above 8000 it starts throwing stack overflow exception. On locally debugging i have found that the stack overflow is happening inside zip method. It successfully zip until 8000 streams and starts to throw exception after that. I am not able to find a workaround this or to why is it happening. Need some help around this to find. The guava zip code is here https://github.com/google/guava/blame/6d7e326b2cbfba5f19fc67859c0b3d4c45fab63f/guava/src/com/google/common/collect/Streams.java#L318

I tried local debugging. Converted all my lambda calls to vanlla for loop , so to confirm we are not calling anything recursively.Finally pin pointed that is is being caused by zip function.

Source code:

merge method which uses zip.

private static <T> Stream<T> merge(Stream<T> firstList, Stream<T> secondList) {
            return Streams.zip(firstList, secondList, (first, second) -> {
                if (first == null) {
                    return second;
                }
    
                return first.merge(second);
            });
        }

I am calling the merge method as this

Collections.singletonList(inlineList.stream()
                    .reduce(merge)

where inline list is list of streams.

Exception:

java.lang.StackOverflowError at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)

user2650973
  • 113
  • 5
  • StackOverflow doesn't imply recursion per se. It just means you have too many nested calls and might need to simplify your code. – shmosel Nov 10 '22 at 01:53
  • yeah i just refactored the code to avoid that possibility. But i am pretty sure i am getting stack overflow from inside the zip method. – user2650973 Nov 10 '22 at 01:55
  • 1
    It could happen anywhere. The call stack is called a stack because it's a stack of calls. – shmosel Nov 10 '22 at 02:08
  • 2
    I'm confused by what you say. Streams.zip() accepts _two_ streams. How are you using it to zip 8000+ streams? As always, you should read the help on how to ask a question. Providing a _runnable_ minimal example is always best. – Gene Nov 10 '22 at 02:10
  • i am looping over the collection of stream and reducing it to single stream. – user2650973 Nov 10 '22 at 02:15
  • 1
    If you're reducing them in a way that stacks calls, that would explain the exception. But like Gene said, share your code please. – shmosel Nov 10 '22 at 02:33
  • Added the source code – user2650973 Nov 10 '22 at 03:01

1 Answers1

2

It's important to remember that streams are data pipelines, not containers. That means when you call zip(s1, s2, f), it's not consuming the input streams; it's just returning a wrapper stream that reads from each input on demand and merges the results. Another key to keep in mind is that reduce() only processes two elements at a time. Say you have 4 streams and want to reduce them by hand; it would look something like this:

Stream<T> m1 = merge(s1, s2);
Stream<T> m2 = merge(m1, s3);
Stream<T> m3 = merge(m2, s4);

Now, consider what happens when you want to read a single element from the final merged stream. m3 has to request an element from s4 and one from m2, which in turn requires elements from s3 and m1, which in turn requires elements from s2 and s1, all in a single call stack. Stretch that out too far and you'll hit the maximum number of nested calls, resulting in a stack overflow.

I suspect this is an XY Problem and there's a cleaner way to achieve what you want that doesn't involve lazy merging of streams. But within the parameters of the question, one solution is to write a zipper that can handle n streams side by side instead of trying to stack them:

static <T> Stream<T> merge(List<Stream<T>> streams, BinaryOperator<T> mergeFunction) {
    List<Iterator<T>> iters = streams.stream()
            .map(Stream::iterator)
            .collect(Collectors.toList());

    return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, 0) {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            Optional<T> next = iters.stream()
                    .filter(Iterator::hasNext)
                    .map(Iterator::next)
                    .reduce(mergeFunction);

            next.ifPresent(action);
            return next.isPresent();
        }
    }, false);
}

Note that this is a little different from zip() in that the resulting stream is as long as the longest input stream instead of the shortest.

shmosel
  • 49,289
  • 6
  • 73
  • 138
  • Thanks for the answer, i did not follow the part where you mentioned, "returns a stream that delegates to firstList and secondList". What does this mean? Also , i intentionally truncated the details of the bifunction passed to zip method.Now reading your solution it looks so silly of me. I have added additional details in the question. Basically i need to merge the two items from the stream. – user2650973 Nov 10 '22 at 07:25
  • I mean that it returns a wrapper stream that has to forward its calls to your streams. I updated the code to accept a custom merge function. – shmosel Nov 10 '22 at 08:13
  • Thanks for all the help here. The above suggestion works fine. For my learning, i would want to understand how to debug this on my own, essentially the problem you mentioned about zip(). Reading the code did not help me to realize what the actual problem with zip is. Is there any other way to visualize this? – user2650973 Nov 10 '22 at 18:58
  • 1
    Consider the fact that each call to `zip()` returns a new stream (you can verify by calling `hashCode()` or `toString()`). What's happening to the streams you passed in? What's the relationship between all the output streams? If you graph it out, you'll see that the final stream must be a wrapper for another stream, which in turn wraps another stream, and so on. Think about how many layers you have to penetrate to read the innermost stream from the outermost wrapper. – shmosel Nov 10 '22 at 21:21
  • 1
    Edited the answer to elaborate further. – shmosel Nov 10 '22 at 21:22
  • Another difference i noticed between guava zip and this implementation is that we are not closing the streams. Will the streams be closed once i close the final zipped stream? – user2650973 Nov 14 '22 at 07:19
  • 1
    Doesn't look like `zip()` does that. And I'm not sure it would make sense, considering it doesn't promise to consume both input streams. But if you want to do that here, just tack on a `.onClose(() -> streams.forEach(Stream::close))`. – shmosel Nov 14 '22 at 07:26