4

I've implemented functional unzip() operation as follows:

public static <T, U, V> Tuple2<Stream<U>, Stream<V>> unzip(
        Stream<T> stream, 
        Function<T, Tuple2<U, V>> unzipper) {

    return stream.map(unzipper)
        .reduce(new Tuple2<>(Stream.<U>empty(), Stream.<V>empty()),
            (unzipped, tuple) -> new Tuple2<>(
                Stream.concat(unzipped.$1(), Stream.of(tuple.$1())),
                Stream.concat(unzipped.$2(), Stream.of(tuple.$2()))),
            (unzipped1, unzipped2) -> new Tuple2<>(
                Stream.concat(unzipped1.$1(), unzipped2.$1()),
                Stream.concat(unzipped1.$2(), unzipped2.$2())));
}

This works fine, given input streams don't have a lot of elements. This is because accessing an element of a deeply concatenated stream can cause StackOverflowException. According to the docs of Stream.concat():

Implementation Note:

Use caution when constructing streams from repeated concatenation. Accessing an element of a deeply concatenated stream can result in deep call chains, or even StackOverflowException.

For few elements, my unzip implementation works. Given a class Person:

class Person {

    public final String name;

    public final int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }
}

If I have a stream of people:

Stream<Person> people = Stream.of(
    new Person("Joe", 52), 
    new Person("Alan", 34), 
    new Person("Peter", 42));

I can use my unzip() implementation this way:

Tuple2<Stream<String>, Stream<Integer>> result = StreamUtils.unzip(people, 
        person -> new Tuple2<>(person.name, person.age));

List<String> names = result.$1()
    .collect(Collectors.toList()); // ["Joe", "Alan", "Peter"]
List<Integer> ages = result.$2()
    .collect(Collectors.toList()); // [52, 34, 42]

Which is correct.

So my question is: is there a way for unzip() to work with many elements (potentially infinite)?

Note: for completeness, here's my immutable Tuple2 class:

public final class Tuple2<A, B> {

    private final A $1;

    private final B $2;

    public Tuple2(A $1, B $2) {
        this.$1 = $1;
        this.$2 = $2;
    }

    public A $1() {
        return $1;
    }

    public B $2() {
        return $2;
    }
}
Community
  • 1
  • 1
fps
  • 33,623
  • 8
  • 55
  • 110
  • 2
    Very related: http://stackoverflow.com/questions/23860533/copy-a-stream-to-avoid-stream-has-already-been-operated-upon-or-closed-java-8 Somehow to do this effectively, we'd need to clone the given stream in 2 stream and iterate independently on each of them. The only I see how to that (and supporting multithreading) would be to collect all elements in a list beforehand... – Tunaki Mar 10 '16 at 16:00
  • @Tunaki Thanks for that link, the point is to do it without collecting, because the input stream could be infinite, and unzipping it should still work. – fps Mar 10 '16 at 16:10
  • 2
    Also related http://stackoverflow.com/questions/24474838/can-i-duplicate-a-stream-in-java-8 – Tunaki Mar 10 '16 at 16:17
  • No, I would expect this to be impossible. Java 8 streams are not like the sorts of streams in other languages where this might be possible. – Louis Wasserman Mar 10 '16 at 16:32
  • @LouisWasserman Yep, I believe this is not possible, at least the way I'm trying. If I use a collection to keep input stream elements and then create two streams from there, I might run out of memory. If I create new streams via concatenation while reducing the input stream (as In my question), I would get a stack overflow exception. I see no way out... – fps Mar 10 '16 at 17:29

1 Answers1

4

Your solution is not only prone to potential StackOverflowErrors, it is far away from handling potentially infinity streams, even if the risk of a StackOverflowError didn’t exist. The point is, you are constructing a stream, but it’s a stream of concatenated single element streams, one for each element of the source stream. In other words, you have a fully materialized data structure upon return of the unzip method which will consume even more memory than the result of collecting into an ArrayList or a simple toArray() operation.

However, when you want to perform collect afterwards, the idea of supporting potentially infinite streams is moot anyway, as collecting implies processing of all elements without short-circuiting.

Once you drop the idea of supporting infinite streams and focus on the collect operation, there’s a simpler solution. Taking the code from this solution, replacing Pair with Tuple2 and changing the accumulator logic from “conditional” to “both”, we get:

public static <T, A1, A2, R1, R2> Collector<T, ?, Tuple2<R1,R2>> both(
    Collector<T, A1, R1> first, Collector<T, A2, R2> second) {

    Supplier<A1> s1=first.supplier();
    Supplier<A2> s2=second.supplier();
    BiConsumer<A1, T> a1=first.accumulator();
    BiConsumer<A2, T> a2=second.accumulator();
    BinaryOperator<A1> c1=first.combiner();
    BinaryOperator<A2> c2=second.combiner();
    Function<A1,R1> f1=first.finisher();
    Function<A2,R2> f2=second.finisher();
    return Collector.of(
        ()->new Tuple2<>(s1.get(), s2.get()),
        (p,t)->{ a1.accept(p.$1(), t); a2.accept(p.$2(), t); },
        (p1,p2)->new Tuple2<>(c1.apply(p1.$1(), p2.$1()), c2.apply(p1.$2(), p2.$2())),
        p -> new Tuple2<>(f1.apply(p.$1()), f2.apply(p.$2())));
}

This can be used like

Tuple2<List<String>, List<Integer>> namesAndAges=
    Stream.of(new Person("Joe", 52), new Person("Alan", 34), new Person("Peter", 42))
        .collect(both(
            Collectors.mapping(p->p.name, Collectors.toList()),
            Collectors.mapping(p->p.age,  Collectors.toList())));
List<String> names = namesAndAges.$1(); // ["Joe", "Alan", "Peter"]
List<Integer> ages = namesAndAges.$2(); // [52, 34, 42]

The statement of the linked answer also holds here. You can do almost everything you can express as a stream operation within a collector.

If you want to be closer to your original code with a function, mapping from the stream element to a Tuple2, you can wrap above solution like

public static <T, T1, T2, A1, A2, R1, R2> Collector<T, ?, Tuple2<R1,R2>> both(
    Function<? super T, ? extends Tuple2<? extends T1, ? extends T2>> f,
    Collector<T1, A1, R1> first, Collector<T2, A2, R2> second) {

    return Collectors.mapping(f, both(
            Collectors.mapping(Tuple2::$1, first),
            Collectors.mapping(Tuple2::$2, second)));
}

and use it like

Tuple2<List<String>, List<Integer>> namesAndAges=
    Stream.of(new Person("Joe", 52), new Person("Alan", 34), new Person("Peter", 42))
        .collect(both(
            p -> new Tuple2<>(p.name, p.age), Collectors.toList(), Collectors.toList()));

You may recognize the function p -> new Tuple2<>(p.name, p.age), just like the one you passed to your unzip method. The above solutions are lazy but require the operations after “unzipping” to be expressed as collectors. If you want Streams instead and accept the non-lazy nature of the solution, much like your original unzip operation, but want it to be more efficient than concat, you may use:

public static <T, U, V> Tuple2<Stream<U>, Stream<V>> unzip(
    Stream<T> stream,  Function<T, Tuple2<U, V>> unzipper) {

    return stream.map(unzipper)
        .collect(Collector.of(()->new Tuple2<>(Stream.<U>builder(), Stream.<V>builder()),
            (unzipped, tuple) -> {
                unzipped.$1().accept(tuple.$1()); unzipped.$2().accept(tuple.$2());
            },
            (unzipped1, unzipped2) -> {
                unzipped2.$1().build().forEachOrdered(unzipped1.$1());
                unzipped2.$2().build().forEachOrdered(unzipped1.$2());
                return unzipped1;
            },
            tuple -> new Tuple2<>(tuple.$1().build(), tuple.$2().build())
        ));
}

This can work as a drop in replacement for your concat based solution. It will also fully store the stream elements, but it will use a Stream.Builder which is optimized for the use case of being incrementally filled and consumed once (in a Stream operation). This works even more efficient than collecting into an ArrayList (at least with the reference implementation) as it uses a “spined buffer” which doesn’t require copying when increasing the capacity. For streams with a potentially unknown size, this is the most efficient solution (for streams with a known size, toArray() will perform even better).

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765
  • Thanks @Holger, for such a complete answer. Initially, I wrote unzip with `collect()`, but I switched to `reduce()` because I was getting an `IllegalStateException` (stream already consumed). All your implementations work great, though. You're right in that I didn't want to collect to an `ArrayList`, but instead, I was creating my own linked list by invoking `concat()` repeatedly :) My bad – fps Mar 10 '16 at 21:50