168

I'd like to duplicate a Java 8 stream so that I can deal with it twice. I can collect as a list and get new streams from that;

// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff

But I kind of think there should be a more efficient/elegant way.

Is there a way to copy the stream without turning it into a collection?

I'm actually working with a stream of Eithers, so want to process the left projection one way before moving onto the right projection and dealing with that another way. Kind of like this (which, so far, I'm forced to use the toList trick with).

List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());

Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );

Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
Lii
  • 11,553
  • 8
  • 64
  • 88
Toby
  • 9,523
  • 8
  • 36
  • 59
  • Could you elaborate more on "process one way"... are you consuming the objects? Mapping them? partitionBy() and groupingBy() can get you directly to 2+ lists, but you might benefit from mapping first or just having a decision fork in your forEach(). – charles-allen Aug 02 '17 at 12:31
  • In some cases, turning it into a Collection could not be an option if we are dealing with infinite stream. You may find an alternative for memoization here: https://dzone.com/articles/how-to-replay-java-streams – Miguel Gamboa May 14 '20 at 09:44

10 Answers10

111

I think your assumption about efficiency is kind of backwards. You get this huge efficiency payback if you're only going to use the data once, because you don't have to store it, and streams give you powerful "loop fusion" optimizations that let you flow the whole data efficiently through the pipeline.

If you want to re-use the same data, then by definition you either have to generate it twice (deterministically) or store it. If it already happens to be in a collection, great; then iterating it twice is cheap.

We did experiment in the design with "forked streams". What we found was that supporting this had real costs; it burdened the common case (use once) at the expense of the uncommon case. The big problem was dealing with "what happens when the two pipelines don't consume data at the same rate." Now you're back to buffering anyway. This was a feature that clearly didn't carry its weight.

If you want to operate on the same data repeatedly, either store it, or structure your operations as Consumers and do the following:

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });

You might also look into the RxJava library, as its processing model lends itself better to this kind of "stream forking".

Brian Goetz
  • 90,105
  • 23
  • 150
  • 161
  • 2
    Perhaps I shouldn't have used "efficiency", I'm kind of getting at why would I bother with streams (and not store anything) if all I do is immediately store the data (`toList`) to be able to process it (the `Either` case being the example)? – Toby Jun 24 '15 at 18:48
  • 11
    Streams are both _expressive_ and _efficient_. They are expressive in that they let you set up complex aggregate operations without a lot of accidental detail (e.g., intermediate results) in the way of reading the code. They are also efficient, in that they (generally) make a single pass on the data and do not populate intermediate result containers. These two properties together make them an attractive programming model for many situations. Of course, not all programming models fit all problems; you still need to decide whether you're using an appropriate tool for the job. – Brian Goetz Jun 24 '15 at 18:55
  • 1
    But the inability to reuse a stream causes situations where the developer is forced to store intermediate results (collecting) in order to process a stream in two different ways. The implication that the stream is generated more than once (unless you collect it) seems clear - because otherwise you wouldn't need a collect method. – Niall Connaughton Jun 20 '17 at 00:29
  • 2
    @NiallConnaughton I'm not sure want your point is. If you want to traverse it twice, someone has to store it, or you have to regenerate it. Are you suggesting the library should buffer it just in case someone needs it twice? That would be silly. – Brian Goetz Jun 20 '17 at 07:06
  • 1
    Not suggesting that the library should buffer it, but saying that by having streams as one-offs, it forces people who want to reuse a seed stream (ie: sharing the declarative logic used to define it) to build multiple derived streams to either collect the seed stream, or have access to a provider factory that will create a duplicate of the seed stream. Both options have their pain points. This answer has much more detail on the topic: https://stackoverflow.com/a/28513908/114200. – Niall Connaughton Jun 20 '17 at 16:20
  • Scala manages to do this with Eithers... but perhaps that's another question :) – Toby Nov 15 '18 at 14:08
98

You can use a local variable with a Supplier to set up common parts of the stream pipeline.

From http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/:

Reusing Streams

Java 8 streams cannot be reused. As soon as you call any terminal operation the stream is closed:

Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

Calling `noneMatch` after `anyMatch` on the same stream results in the following exception:
java.lang.IllegalStateException: stream has already been operated upon or closed
at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at 
java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)

To overcome this limitation we have to to create a new stream chain for every terminal operation we want to execute, e.g. we could create a stream supplier to construct a new stream with all intermediate operations already set up:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

Each call to get() constructs a new stream on which we are save to call the desired terminal operation.

Community
  • 1
  • 1
user4975679
  • 1,461
  • 16
  • 21
  • 2
    nice and elegant solution. much more java8-ish than the most upvoted solution. – dylaniato Feb 02 '18 at 14:23
  • 4
    Just a note on using `Supplier` if the `Stream` is built with a "costly" manner, **you pay that cost for each call to `Supplier.get()`**. i.e if a database query... that query is done each time – Julien May 04 '20 at 10:13
  • 1
    You can't seem to follow this pattern after a mapTo though using an IntStream. I found I had to convert it back to a `Set` using `collect(Collectors.toSet())` ... and do a couple of operations on that. I wanted `max()` and if a specific value was in set as two operations... `filter(d -> d == -1).count() == 1;` – JGFMK May 06 '20 at 10:14
  • @Julien Is there a way to avoid this? – sander Jun 28 '23 at 12:00
52

Use a Supplier to produce the stream for each termination operation.

Supplier<Stream<Integer>> streamSupplier = () -> list.stream();

Whenever you need a stream of that collection, use streamSupplier.get() to get a new stream.

Examples:

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);
Lii
  • 11,553
  • 8
  • 64
  • 88
Rams
  • 729
  • 10
  • 11
  • 1
    Upvote you as you are the first to have pointed Suppliers out here. – bonnal-enzo Feb 03 '20 at 11:12
  • To avoid null pointer in case where list could be null, better to use Stream.empty().... e.g. Supplier> streamSupplier = () -> list == null ? Stream.empty() : list.stream(); – 01000001 Mar 29 '22 at 16:13
  • Supplier make sense if you have only stream. If you have list then use list.stream() only. becuase both will create new stream then why we will add extra supplier. – Vijay Jun 22 '22 at 08:44
  • 1
    Using a `Supplier` has no benefit here. You can just as well just call `list.stream()` twice directly. – Jesper Jul 29 '22 at 13:31
9

We've implemented a duplicate() method for streams in jOOλ, an Open Source library that we created to improve integration testing for jOOQ. Essentially, you can just write:

Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();

Internally, there is a buffer storing all values that have been consumed from one stream but not from the other. That's probably as efficient as it gets if your two streams are consumed about at the same rate, and if you can live with the lack of thread-safety.

Here's how the algorithm works:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final List<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

More source code here

Tuple2 is probably like your Pair type, whereas Seq is Stream with some enhancements.

Lukas Eder
  • 211,314
  • 129
  • 689
  • 1,509
7

You could create a stream of runnables (for example):

results.stream()
    .flatMap(either -> Stream.<Runnable> of(
            () -> failure(either.left()),
            () -> success(either.right())))
    .forEach(Runnable::run);

Where failure and success are the operations to apply. This will however create quite a few temporary objects and may not be more efficient than starting from a collection and streaming/iterating it twice.

assylias
  • 321,522
  • 82
  • 660
  • 783
5

Another way to handle the elements multiple times is to use Stream.peek(Consumer):

doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));

peek(Consumer) can be chained as many times as needed.

doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));
Martin
  • 2,573
  • 28
  • 22
  • 1
    It seems peek is not supposed to be used for this (see https://softwareengineering.stackexchange.com/a/308979/195787) – HectorJ Oct 29 '19 at 14:51
  • 2
    @HectorJ The other thread is about modifying elements. I assumed that is not done here. – Martin Oct 29 '19 at 19:44
2

cyclops-react, a library I contribute to, has a static method that will allow you duplicate a Stream (and returns a jOOλ Tuple of Streams).

    Stream<Integer> stream = Stream.of(1,2,3);
    Tuple2<Stream<Integer>,Stream<Integer>> streams =  StreamUtils.duplicate(stream);

See comments, there is performance penalty that will be incurred when using duplicate on an existing Stream. A more performant alternative would be to use Streamable :-

There is also a (lazy) Streamable class that can be constructed from a Stream, Iterable or Array and replayed multiple times.

    Streamable<Integer> streamable = Streamable.of(1,2,3);
    streamable.stream().forEach(System.out::println);
    streamable.stream().forEach(System.out::println);

AsStreamable.synchronizedFromStream(stream) - can be used to create a Streamable that will lazily populate it's backing collection, in a way such that can be shared across threads. Streamable.fromStream(stream) will not incur any synchronization overhead.

John McClean
  • 5,225
  • 1
  • 22
  • 30
  • 2
    And, of course it should be noted that the resulting streams have significant CPU/memory overhead and very poor parallel performance. Also this solution is not thread-safe (you cannot pass one of the resulting streams to another thread and process it safely in parallel). It would be much more performant and safe to `List list = stream.collect(Collectors.toList()); streams = new Tuple2<>(list.stream(), list.stream())` (as OP suggests). Also please disclose explicitly in the answer that you're the author of cyclop-streams. Read [this](http://stackoverflow.com/help/promotion). – Tagir Valeev Sep 17 '15 at 04:26
  • Updated to reflect I'm the author. Also a good point to discuss the performance charachteristics of each. Your assessment above is pretty much spot on for StreamUtils.duplicate. StreamUtils.duplicate works by buffering data from one Stream to the other, incurring both a CPU and Memory overhead (use case depending). For Streamable.of(1,2,3) however, a new Stream is created directly from the Array each time and the performance characteristics, included parallel performance, will be the same as for normally created Stream. – John McClean Sep 17 '15 at 09:12
  • Also, there is an AsStreamable class which allows the creation of a Streamable instance from a Stream but synchronizes access to the collection backing the Streamable as it is created (AsStreamable.synchronizedFromStream). Making it more suitable for use across threads (if that is what you need - I would imagine 99% of the time Streams are created and reused on the same thread). – John McClean Sep 17 '15 at 09:19
  • Hi Tagir - shouldn't you also disclose in your comment that you are author of a competing library? – John McClean Sep 17 '15 at 09:23
  • 2
    Comments are not answers and I don't advertise my library here as my library has no feature to duplicate the stream (just because I think it's useless), so we don't compete here. Of course when I propose a solution involving my library I always say explicitly that I'm the author. – Tagir Valeev Sep 17 '15 at 09:41
  • Compete was the wrong word on my part, cyclops-streams and cyclops generally will work just fine with Streamex. It (Streamex) definitely will help inform your perspectives on Streams (and be informed by it - for example I'm guessing from your response the parallel CPU bound use case is important to you?). Duplication /or replayability mightn't seem useful at all to you, but possibly some of your users might like to have it. – John McClean Sep 17 '15 at 10:40
1

For this particular problem you can use also partitioning. Something like

     // Partition Eighters into left and right
     List<Either<Pair<A, Throwable>, A>> results = doSomething();
     Map<Boolean, Object> passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft()));
     passingFailing.get(true) <- here will be all passing (left values)
     passingFailing.get(false) <- here will be all failing (right values)
Lubomir Varga
  • 109
  • 2
  • 8
0

We can make use of Stream Builder at the time of reading or iterating a stream. Here's the document of Stream Builder.

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html

Use case

Let's say we have employee stream and we need to use this stream to write employee data in excel file and then update the employee collection/table [This is just use case to show the use of Stream Builder]:

Stream.Builder<Employee> builder = Stream.builder();

employee.forEach( emp -> {
   //store employee data to excel file 
   // and use the same object to build the stream.
   builder.add(emp);
});

//Now this stream can be used to update the employee collection
Stream<Employee> newStream = builder.build();
Lokesh Singal
  • 303
  • 3
  • 8
0

I had a similar problem, and could think of three different intermediate structures from which to create a copy of the stream: a List, an array and a Stream.Builder. I wrote a little benchmark program, which suggested that from a performance point of view the List was about 30% slower than the other two which were fairly similar.

The only drawback of converting to an array is that it is tricky if your element type is a generic type (which in my case it was); therefore I prefer to use a Stream.Builder.

I ended up writing a little function that creates a Collector:

private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
    return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
        b2.build().forEach(b1);
        return b1;
    }, Stream.Builder::build);
}

I can then make a copy of any stream str by doing str.collect(copyCollector()) which feels quite in keeping with the idiomatic usage of streams.

Jeremy Hicks
  • 121
  • 6