5

Let's say I have an array of Java 8 streams: Stream<T>[] streams, I'd like to make a Stream where each element of the new stream is an array composed by picking one element from each of the initial base streams (let's assume they're all sequential).

For instance if I have:

  streams [ 0 ] returning: ( "A", "B", "C" ), 
  streams [ 1 ] returning ( "X", "Y", "Z" ) 
  and streams [ 2 ] as ( "0", "1", "2" )

I'd like a stream that returns

  ( { "A", "X", "0" }, { "B", "Y", "1" }, { "C", "Z", "2" } )

Is there some code that already implements this? I have an idea of how to do it, it would be a generalisation of the pair case, but I'd like to know if something reusable is already around.

EDIT: sorry, I realised I need some clarification:

  • I don't want to create the whole matrix, I want a stream that dynamically returns one row at a time (first A/X/0, then B/Y/1, etc), without having to occupy memory with all the rows in advance. I'm fine with reasonable assumptions over the sizes of base streams (eg, taking the minimum, stopping as soon as there is a stream that has no more elements to return).

  • I know this can be implemented by first turning the base streams into iterators, then creating a new iterator which of next() picks one element from each of the underlining iterators and returns a new row. That is what the pair example I've linked above does and I could implement it that way on myself, here I'm trying to understand if it has been already done in some library (I know JDK has no such function).

zakmck
  • 2,715
  • 1
  • 37
  • 53

4 Answers4

3

First things first, it's a very bad idea to keep an array of streams, because they can't be reused and it complicates already complicated possible solutions.

No, it's not possible in the plain JDK. There is no zip functionality, neither we have Tuples, so I am afraid this is the best thing you can come up with:

Stream[] streams = Stream.of(
  Stream.of("A", "B", "C"),
  Stream.of("X", "Y", "Z"),
  Stream.of("0", "1", "2"))
    .toArray(Stream[]::new);

String[][] arrays = Arrays.stream(streams)
  .map(s -> s.toArray(String[]::new))
  .toArray(String[][]::new);

int minSize = Arrays.stream(arrays)
  .mapToInt(s -> s.length)
  .min().orElse(0);

String[][] zipped = IntStream.range(0, minSize)
  .mapToObj(i -> Arrays.stream(arrays)
  .map(s -> s[i])
    .toArray(String[]::new))
  .toArray(String[][]::new);

First, we need to convert an array of streams into an array of arrays or anything else that we can traverse more than once.

Second, you did not specify what to do if streams inside the array have varying lengths, I assumed standard zip behaviour which joins elements as long as we can extract elements from each collection.

Third, I am creating here a stream of all possible indexes for zipping (IntStream.range(0, minSize)) and manually extracting element by element from each nested array.

It's fine to use .get() on Optional here because calculating minSize guarantees that there will be something in there.

Here is a more reasonable approach assuming that we are dealing with lists of lists:

List<List<String>> lists = Arrays.asList(
  Arrays.asList("A", "B", "C"),
  Arrays.asList("X", "Y", "Z"),
  Arrays.asList("0", "1", "2"));

final int minSize = lists.stream()
  .mapToInt(List::size)
  .min().orElse(0);

List<List<String>> result = IntStream.range(0, minSize)
  .mapToObj(i -> lists.stream()
  .map(s -> s.get(i))
    .collect(Collectors.toList()))
  .collect(Collectors.toList());

Java 9's Stream API additions will probably allow us to drop the calculation of minSize.

If you want the generation of sequences to remain lazy, you can simply not collect the results:

IntStream.range(0, minSize)
  .mapToObj(i -> lists.stream()
    .map(s -> s.get(i))
    .collect(Collectors.toList()));
Grzegorz Piwowarek
  • 13,172
  • 8
  • 62
  • 93
  • Interesting, but I don't want to create the matrix, I want to create a stream that dynamically returns a new array ( { "A", "X", "0" } the first time, { "B", "Y", "1" } the second time, etc), without the need to create the matrix, I only expect to create every array item. I know this can be done by turning the streams into iterators, use them to define a new iterator (which returns one of those arrays at every next() ) and finally turning this iterator back to a new stream. What I'm trying to understand is if some library has already implemented this, or if I have to write it on my own. – zakmck Jul 25 '17 at 08:48
  • @zakmck I do not think any lib can do that. Have a look at the last example, I added the implementation for the lazy creation of sequences. Is this what you had in mind? – Grzegorz Piwowarek Jul 25 '17 at 08:53
1

If you really mean an arbitrary number of Streams as input - the's not TupleX that I can think of, but if you really know that the incoming streams are all the same size (no infinite Streams), then may be this will fit your needs:

@SafeVarargs
static <T> Stream<Stream<T>> streamOfStreams(Stream<T>... streams) {

    @SuppressWarnings("unchecked")
    Iterator<T>[] iterators = new Iterator[streams.length];
    for (int i = 0; i < streams.length; ++i) {
        iterators[i] = streams[i].iterator();
    }

    Iterator<T> first = iterators[0];

    Builder<Stream<T>> outer = Stream.builder();
    Builder<T> inner = Stream.builder();
    while (first.hasNext()) {
        for (int i = 0; i < streams.length; ++i) {
            inner.add(iterators[i].next());
        }
        outer.add(inner.build());
        inner = Stream.builder();
    }

    return outer.build();
}
Eugene
  • 117,005
  • 15
  • 201
  • 306
  • something like that, but this creates the whole matrix (see my comment to privarit above) and I don't want that, I'd rather put your while in an iterator and use that to build a more dynamic stream. However, for me the point is not much how to do so (though it's useful to other readers and I can learn by comparing my solutions to others), but if anything like that has already been implemented in some library. – zakmck Jul 25 '17 at 08:52
  • 1
    @zakmck I also doubt this already exists... at least I've looked into `StreamEx` - probably the most famous one there is, and I have not seen such a thing... – Eugene Jul 25 '17 at 09:09
1

Since Guava version 21, you can use the Streams.zip utility method, which does what you want, except that it only works for two streams.

Now, if you turn your array of streams into a stream of streams, you could use this Streams.zip method to perform a reduction:

Stream<List<String>> zipped = Arrays.stream(streams)
    .map(s -> s.map(e -> {
        List<String> l = new ArrayList<>();
        l.add(e);
        return l;
    }))
    .reduce((s1, s2) -> Streams.zip(s1, s2, (l1, l2) -> {
        l1.addAll(l2);
        return l1;
    }))
    .orElse(Stream.empty());

List<List<String>> tuples = zipped.collect(Collectors.toList());

System.out.println(tuples); // [[A, X, 0], [B, Y, 1], [C, Z, 2]]

Note that before reducing, you need to map each Stream<T> to Stream<List<T>>, so that you can use List.addAll to zip the streams.


Edit: The code above works, but I have serious concerns regarding its performance and memory footprint, mainly due to the creation of multiple lists of one single element.

Maybe using the version of Stream.reduce that accepts an identity, an accumulator and a combiner works better:

Stream<List<String>> zipped = Arrays.stream(streams)
    .reduce(
        IntStream.range(0, streams.length).mapToObj(n -> new ArrayList<>()),
        (z, s) -> Streams.zip(z, s, (l, e) -> {
            l.add(e);
            return l;
        }),
        (s1, s2) -> Streams.zip(s1, s2, (l1, l2) -> {
            l1.addAll(l2);
            return l1;
        }));

List<List<String>> tuples = zipped.collect(Collectors.toList());

System.out.println(tuples); // [[A, X, 0], [B, Y, 1], [C, Z, 2]]

The identity needs to be a stream of n empty lists, with n being the length of the streams array, while the accumulator uses Streams.zip to zip a stream of lists with a stream of elements. The combiner remains the same as before: it uses Streams.zip to zip two streams of lists.

fps
  • 33,623
  • 8
  • 55
  • 110
1

OK, it seeems there isn't anything like that around, so I've written it myself:

  • TupleSpliterator, to build a tuple spliterator starting from an array of spliterators;
  • Tuple Stream Builder, which builds a tuple stream, starting from an array of streams and exploiting a tuple iterator.
  • The Spliteraror/Iterator based allow for parallelism (under certain conditions), in case you want something simpler, but sequential, a TupleIterator is available as well.

Usage examples available in unit tests (here and here), the classes are part of this utility package.

EDIT: I've added the Spliterator implementation, after the comment from Federico, noticing that the Iterator-based version can't be parallel.

zakmck
  • 2,715
  • 1
  • 37
  • 53
  • The problem with iterators is that they turn your streams sequential. If you're ok with that, then you've found your answer. – fps Jul 26 '17 at 14:17
  • 1
    Hi @FedericoPeraltaSchaffner, gosh! You're right, but the simple solution should be to to realise TupleSplitterator as well (Stream.spliterator() exists). I'll do it later, thanks for your comment. – zakmck Jul 26 '17 at 17:02