2

How can I convert multiple Streams into one Stream? For example, I have 3 IntStreams and I want to combine them into one Stream of int arrays.

In the Javadoc, most Stream operations take one stream as input, and the concat doesn't answer my use case.

Here's what I had in mind

Stream 1: 1, 2, 3
Stream 2: 4, 5, 6
Combined Stream ex1: [1,4],[2,5],[3,6]
Combined Stream ex2: 1+4,2+5,3+6
Combined Stream ex3: new MyObject(1,4), new MyObject(2,5), new MyObject(3,6)
Tunaki
  • 132,869
  • 46
  • 340
  • 423
H-H
  • 374
  • 7
  • 14
  • 2
    This is similar to this question in spirit http://stackoverflow.com/questions/37702703/is-it-possible-to-flatmap-or-collect-evenly-multiple-collections – Tunaki Sep 11 '16 at 18:14
  • mm, I read your answer, and you're talking about interleaving which is kinda similar to concat. I'll change my example to clarify – H-H Sep 11 '16 at 18:22
  • 1
    Ah, I see, you want to [zip the streams](http://stackoverflow.com/questions/17640754/zipping-streams-using-jdk8-with-lambda-java-util-stream-streams-zip) with a custom zipper function. – Tunaki Sep 11 '16 at 18:45

2 Answers2

2

Sadly, there is nothing native to the Stream that does this for you. An unfortunate shortcoming to the API.

That said, you could do this by taking out an Iterator on each of the streams, similar to:

public static <T,U,R> Stream<R> zipStreams (Stream<T> a, Stream<U> b, BiFunction<T,U,R> zipFunc) {
    Iterator<T> itA = a.iterator();
    Iterator<U> itB = b.iterator();
    Iterator<R> itRet = new Iterator<R>() {

        @Override
        public boolean hasNext() {
            return itA.hasNext() && itB.hasNext();
        }

        @Override
        public R next() {
            return zipFunc.apply(itA.next(), itB.next());
        }

    };
    Iterable<R> ret = () -> itRet;
    return StreamSupport.stream(ret.spliterator(), a.isParallel() || b.isParallel());
}
Joe C
  • 15,324
  • 8
  • 38
  • 50
2

In functional terms, the problem comes down to zipping a list of streams, and applying a custom zipper for each elements.

There is no facility to do that directly with the Stream API. We can use 3rd party libraries, like the protonpack library, that provides a zip method to do that. Considering the data:

List<Stream<Integer>> streams = Arrays.asList(Stream.of(1,2,3), Stream.of(4,5,6));

you can have

Stream<Integer> stream = StreamUtils.zip(streams, l -> l.stream().mapToInt(i -> i).sum());
// the Stream is now "1+4,2+5,3+6"

or

Stream<Integer[]> stream = StreamUtils.zip(streams, l -> l.toArray(new Integer[l.size()]));
// the Stream is now "[1,4][2,5][3,6]"

The mapper takes the list of elements to zip and returns the zipped value. In the first example, it sums the value together, while it returns an array in the second.

Community
  • 1
  • 1
Tunaki
  • 132,869
  • 46
  • 340
  • 423