172

In JDK 8 with lambda b93 there was a class java.util.stream.Streams.zip in b93 which could be used to zip streams (this is illustrated in the tutorial Exploring Java8 Lambdas. Part 1 by Dhananjay Nene). This function :

Creates a lazy and sequential combined Stream whose elements are the result of combining the elements of two streams.

However in b98 this has disappeared. Infact the Streams class is not even accessible in java.util.stream in b98.

Has this functionality been moved, and if so how do I zip streams concisely using b98?

The application I have in mind is in this java implementation of Shen, where I replaced the zip functionality in the

  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

functions with rather verbose code (which doesn't use functionality from b98).

Per Lundberg
  • 3,837
  • 1
  • 36
  • 46
artella
  • 5,068
  • 4
  • 27
  • 35
  • 6
    Ah just found out that it seems to have been removed completely : http://mail.openjdk.java.net/pipermail/lambda-libs-spec-observers/2013-June/002029.html – artella Jul 15 '13 at 06:33
  • "Exploring Java8 Lambdas. Part 1" - new link for this article is http://blog.dhananjaynene.com/2013/02/exploring-java8-lambdas-part-1/ – Aleksei Egorov Dec 20 '17 at 11:09
  • Thanks @AlekseiEgorov, fixed the link in the post now as well – Per Lundberg Nov 11 '20 at 09:03
  • Reading the thread from @artella's link in 2023 gives me hope that with the new `Record` support now-ish, and value types a little further down the road, that maybe we'll see some good `zip` functionality re-introduced not too long from now! – Ti Strga Mar 30 '23 at 19:44

14 Answers14

85

I needed this as well so I just took the source code from b93 and put it in a "util" class. I had to modify it slightly to work with the current API.

For reference here's the working code (take it at your own risk...):

public static<A, B, C> Stream<C> zip(Stream<? extends A> a,
                                     Stream<? extends B> b,
                                     BiFunction<? super A, ? super B, ? extends C> zipper) {
    Objects.requireNonNull(zipper);
    Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator();
    Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator();

    // Zipping looses DISTINCT and SORTED characteristics
    int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() &
            ~(Spliterator.DISTINCT | Spliterator.SORTED);

    long zipSize = ((characteristics & Spliterator.SIZED) != 0)
            ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
            : -1;

    Iterator<A> aIterator = Spliterators.iterator(aSpliterator);
    Iterator<B> bIterator = Spliterators.iterator(bSpliterator);
    Iterator<C> cIterator = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return aIterator.hasNext() && bIterator.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(aIterator.next(), bIterator.next());
        }
    };

    Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
    return (a.isParallel() || b.isParallel())
           ? StreamSupport.stream(split, true)
           : StreamSupport.stream(split, false);
}
jub0bs
  • 60,866
  • 25
  • 183
  • 186
siki
  • 9,077
  • 3
  • 27
  • 36
  • 1
    Shouldn't the resulting stream be `SIZED` if _either_ stream is `SIZED`, not both? – Didier L Jun 28 '15 at 10:49
  • 6
    I don't think so. Both streams have to be `SIZED` for this implementation to work. It actually depends on how you define zipping. Should you be able to zip two streams that are of different size, for example? What would the resulting stream look like then? I believe this is why this function was actually omitted from the API. There are many ways to do this and it's up for the user to decide what behavior should be the "correct" one. Would you discard the elements from the longer stream or pad the shorter list? If so, with what value(s)? – siki Jun 29 '15 at 14:47
  • Unless I'm missing something, there is no need for any cast (e.g. to `Spliterator`). – jub0bs Aug 11 '16 at 14:41
  • Is there a website where the Java 8 b93 source code is hosted? I'm having trouble finding it. – starwarswii Jul 07 '17 at 18:08
47

zip is one of the functions provided by the protonpack library.

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");

List<String> zipped = StreamUtils.zip(streamA,
                                      streamB,
                                      (a, b) -> a + " is for " + b)
                                 .collect(Collectors.toList());

assertThat(zipped,
           contains("A is for Apple", "B is for Banana", "C is for Carrot"));
Samir Talwar
  • 14,220
  • 3
  • 41
  • 65
Dominic Fox
  • 1,039
  • 11
  • 8
  • 1
    also found in StreamEx: http://amaembo.github.io/streamex/javadoc/one/util/streamex/StreamEx.html#zip-java.util.List-java.util.List-java.util.function.BiFunction- – tokland Apr 11 '16 at 10:31
45

If you have Guava in your project, you can use the Streams.zip method (was added in Guava 21):

Returns a stream in which each element is the result of passing the corresponding element of each of streamA and streamB to function. The resulting stream will only be as long as the shorter of the two input streams; if one stream is longer, its extra elements will be ignored. The resulting stream is not efficiently splittable. This may harm parallel performance.

 public class Streams {
     ...

     public static <A, B, R> Stream<R> zip(Stream<A> streamA,
             Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
         ...
     }
 }
lbalazscs
  • 17,474
  • 7
  • 42
  • 50
ZhekaKozlov
  • 36,558
  • 20
  • 126
  • 155
29

Since I can't conceive any use of zipping on collections other than indexed ones (Lists) and I am a big fan of simplicity, this would be my solution:

<A,B,C>  Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){
     int shortestLength = Math.min(lista.size(),listb.size());
     return IntStream.range(0,shortestLength).mapToObj( i -> {
          return zipper.apply(lista.get(i), listb.get(i));
     });        
}
Rafael
  • 563
  • 4
  • 10
28

Zipping two streams using JDK8 with lambda (gist).

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) {
    final Iterator<A> iteratorA = streamA.iterator();
    final Iterator<B> iteratorB = streamB.iterator();
    final Iterator<C> iteratorC = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return iteratorA.hasNext() && iteratorB.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(iteratorA.next(), iteratorB.next());
        }
    };
    final boolean parallel = streamA.isParallel() || streamB.isParallel();
    return iteratorToFiniteStream(iteratorC, parallel);
}

public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) {
    final Iterable<T> iterable = () -> iterator;
    return StreamSupport.stream(iterable.spliterator(), parallel);
}
Karol Król
  • 3,320
  • 1
  • 34
  • 37
  • 2
    Nice solution and (relatively) compact! Requires that you put `import java.util.function.*;` and `import java.util.stream.*;` at the top of your file. – sffc Sep 13 '15 at 08:17
  • Note that this is a terminal operation on the stream. This means that for infinite streams, this method breaks down – smac89 Oct 26 '17 at 17:47
  • 2
    So much useless wrappers: Here `() -> iterator` and here again: `iterable.spliterator()`. Why not implementing directly a `Spliterator` rather than an `Iterator`? Check @Doradus answer https://stackoverflow.com/a/46230233/1140754 – Miguel Gamboa Feb 04 '19 at 15:49
13

The methods of the class you mentioned have been moved to the Stream interface itself in favor to the default methods. But it seems that the zip method has been removed. Maybe because it is not clear what the default behavior for different sized streams should be. But implementing the desired behavior is straight-forward:

static <T> boolean every(
  Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next()));
}
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next()))
      .findFirst().orElse(null);
}
Holger
  • 285,553
  • 42
  • 434
  • 765
  • Isn't the `predicate` you passed to the filter *stateful*? That violates the method contract and especially won't work when processing the stream in parallel. – Andreas Aug 17 '15 at 09:25
  • 2
    @Andreas: none of the solution here supports parallel processing. Since my methods don’t return a stream, they make sure that the streams don’t run in parallel. Similarly, the code of the accepted answer returns a stream which can be turned into parallel but won’t actually do anything in parallel. That said, statefull predicates are discouraged but not violating the contract. They might be even used in parallel context if you ensure that the state update is thread-safe. In some situations they are unavoidable, e.g. turning a stream into distinct is a statefull predicate *per se*. – Holger Aug 17 '15 at 09:47
  • 2
    @Andreas: you may guess why these operations have been removed from the Java API… – Holger Aug 17 '15 at 09:48
11

I humbly suggest this implementation. The resulting stream is truncated to the shorter of the two input streams.

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) {
    Spliterator<L> lefts = leftStream.spliterator();
    Spliterator<R> rights = rightStream.spliterator();
    return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right))));
        }
    }, leftStream.isParallel() || rightStream.isParallel());
}
Doradus
  • 938
  • 1
  • 9
  • 15
  • 1
    I like your proposal. But I don't totally agree with last `.., leftStream.isParallel() || rightStream.isParallel()`. I think it has no effect because `AbstractSpliterator` offers limited parallelism by default. So I think that the final result will be the same as passing `false`. – Miguel Gamboa Feb 04 '19 at 15:46
  • @MiguelGamboa - thanks for your comment. I'm not sure what you mean by "limited parallelism by default" -- do you have a link to some docs? – Doradus Feb 08 '19 at 14:28
10

Using the latest Guava library (for the Streams class) you should be able to do

final Map<String, String> result = 
    Streams.zip(
        collection1.stream(), 
        collection2.stream(), 
        AbstractMap.SimpleEntry::new)
    .collect(Collectors.toMap(e -> e.getKey(), e  -> e.getValue()));
Dan Borza
  • 3,419
  • 3
  • 22
  • 16
7

The Lazy-Seq library provides zip functionality.

https://github.com/nurkiewicz/LazySeq

This library is heavily inspired by scala.collection.immutable.Stream and aims to provide immutable, thread-safe and easy to use lazy sequence implementation, possibly infinite.

Nick Siderakis
  • 1,961
  • 2
  • 21
  • 39
4

Would this work for you? It's a short function, which lazily evaluates over the streams it's zipping, so you can supply it with infinite streams (it doesn't need to take the size of the streams being zipped).

If the streams are finite it stops as soon as one of the streams runs out of elements.

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Stream;

class StreamUtils {
    static <ARG1, ARG2, RESULT> Stream<RESULT> zip(
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner) {
        final var i2 = s2.iterator();
        return s1.map(x1 -> i2.hasNext() ? combiner.apply(x1, i2.next()) : null)
                .takeWhile(Objects::nonNull);
    }
}

Here is some unit test code (much longer than the code itself!)

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;

class StreamUtilsTest {
    @ParameterizedTest
    @MethodSource("shouldZipTestCases")
    <ARG1, ARG2, RESULT>
    void shouldZip(
            String testName,
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner,
            Stream<RESULT> expected) {
        var actual = StreamUtils.zip(s1, s2, combiner);

        assertEquals(
                expected.collect(Collectors.toList()),
                actual.collect(Collectors.toList()),
                testName);
    }

    private static Stream<Arguments> shouldZipTestCases() {
        return Stream.of(
                Arguments.of(
                        "Two empty streams",
                        Stream.empty(),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One singleton and one empty stream",
                        Stream.of(1),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One empty and one singleton stream",
                        Stream.empty(),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "Two singleton streams",
                        Stream.of("blah"),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blah", 1))),
                Arguments.of(
                        "One singleton, one multiple stream",
                        Stream.of("blob"),
                        Stream.of(2, 3),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blob", 2))),
                Arguments.of(
                        "One multiple, one singleton stream",
                        Stream.of("foo", "bar"),
                        Stream.of(4),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("foo", 4))),
                Arguments.of(
                        "Two multiple streams",
                        Stream.of("nine", "eleven"),
                        Stream.of(10, 12),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("nine", 10), pair("eleven", 12)))
        );
    }

    private static List<Object> pair(Object o1, Object o2) {
        return List.of(o1, o2);
    }

    static private <T1, T2> List<Object> combine(T1 o1, T2 o2) {
        return List.of(o1, o2);
    }

    @Test
    void shouldLazilyEvaluateInZip() {
        final var a = new AtomicInteger();
        final var b = new AtomicInteger();
        final var zipped = StreamUtils.zip(
                Stream.generate(a::incrementAndGet),
                Stream.generate(b::decrementAndGet),
                (xa, xb) -> xb + 3 * xa);

        assertEquals(0, a.get(), "Should not have evaluated a at start");
        assertEquals(0, b.get(), "Should not have evaluated b at start");

        final var takeTwo = zipped.limit(2);

        assertEquals(0, a.get(), "Should not have evaluated a at take");
        assertEquals(0, b.get(), "Should not have evaluated b at take");

        final var list = takeTwo.collect(Collectors.toList());

        assertEquals(2, a.get(), "Should have evaluated a after collect");
        assertEquals(-2, b.get(), "Should have evaluated b after collect");
        assertEquals(List.of(2, 4), list);
    }
}
dominic
  • 401
  • 5
  • 12
  • i had to drop the `takeWhile` at the end was that doesn't seem to be in java8 but it isn't a problem as the callee can filter out any nulls which occur when the zipped streams are not the same size. i think that this answer should be the number 1 answer as it is consist and understandable. great job thanks again. – simbo1905 Oct 30 '19 at 15:12
3
public class Tuple<S,T> {
    private final S object1;
    private final T object2;

    public Tuple(S object1, T object2) {
        this.object1 = object1;
        this.object2 = object2;
    }

    public S getObject1() {
        return object1;
    }

    public T getObject2() {
        return object2;
    }
}


public class StreamUtils {

    private StreamUtils() {
    }

    public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) {
        Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed();
        Iterator<Integer> integerIterator = integerStream.iterator();
        return stream.map(x -> new Tuple<>(integerIterator.next(), x));
    }
}
2

AOL's cyclops-react, to which I contribute, also provides zipping functionality, both via an extended Stream implementation, that also implements the reactive-streams interface ReactiveSeq, and via StreamUtils that offers much of the same functionality via static methods to standard Java Streams.

 List<Tuple2<Integer,Integer>> list =  ReactiveSeq.of(1,2,3,4,5,6)
                                                  .zip(Stream.of(100,200,300,400));


  List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6),
                                                  Stream.of(100,200,300,400));

It also offers more generalized Applicative based zipping. E.g.

   ReactiveSeq.of("a","b","c")
              .ap3(this::concat)
              .ap(of("1","2","3"))
              .ap(of(".","?","!"))
              .toList();

   //List("a1.","b2?","c3!");

   private String concat(String a, String b, String c){
    return a+b+c;
   }

And even the ability to pair every item in one stream with every item in another

   ReactiveSeq.of("a","b","c")
              .forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b);

   //ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2")
John McClean
  • 5,225
  • 1
  • 22
  • 30
2

If anyone needs this yet, there is StreamEx.zipWith function in streamex library:

StreamEx<String> givenNames = StreamEx.of("Leo", "Fyodor")
StreamEx<String> familyNames = StreamEx.of("Tolstoy", "Dostoevsky")
StreamEx<String> fullNames = givenNames.zipWith(familyNames, (gn, fn) -> gn + " " + fn);

fullNames.forEach(System.out::println);  // prints: "Leo Tolstoy\nFyodor Dostoevsky\n"
const.grigoryev
  • 375
  • 2
  • 10
-1

This is great. I had to zip two streams into a Map with one stream being the key and other being the value

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");    
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA,
                    streamB,
                    (a, b) -> {
                        final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b);
                        return entry;
                    });

System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));

Output: {A=Apple, B=Banana, C=Carrot}

Gnana
  • 614
  • 1
  • 7
  • 18