81

Sometimes I want to perform a set of operations on a stream, and then process the resulting stream two different ways with other operations.

Can I do this without having to specify the common initial operations twice?

For example, I am hoping a dup() method such as the following exists:

Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup();
Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10
Stuart Marks
  • 127,867
  • 37
  • 205
  • 259
necromancer
  • 23,916
  • 22
  • 68
  • 115
  • 2
    I do realize that there will be no performance gain because streams are evaluated lazily; I am just hoping to avoid duplicating code. – necromancer Jun 29 '14 at 09:39
  • Why not turn the streams into lists? – Elazar Jun 29 '14 at 09:39
  • 1
    Locate what varies in your code and extract this into variables. Then create a method to extract reusable piece of code and apply variables to it. – d1e Jun 29 '14 at 09:42
  • @Elazar doing so would not be memory efficient, and would not work for infinite streams! – necromancer Jun 29 '14 at 09:43
  • 2
    Nothing can duplicate general infinite streams, without further information. – Elazar Jun 29 '14 at 19:07
  • @Elazar I realize that's right. Feel free to add an answer that elaborates. I am not sure if it would be worth switching the currently accepted answer because it does say so in its first sentence: "It is not possible ...", but I am sure readers will appreciate knowing that this is fundamentally impossible in general. – necromancer Jun 29 '14 at 23:59
  • @necromancer I gave it a try. – Elazar Jun 30 '14 at 00:55
  • @Elazar good job, esp the phrase: 'copy the state of the whole "outside world"'. That is the key point. The example I thought of after your earlier comment was that of a stream that reads a temperature sensor on demand. Your mention of the `|n1 - n2|` solution is great because it is a very nice solution to the fundamental problem. I almost feel like editing your answer to suggest using a `Queue`, what do you think? – necromancer Jun 30 '14 at 02:06

11 Answers11

63

It is not possible to duplicate a stream in this way. However, you can avoid the code duplication by moving the common part into a method or lambda expression.

Supplier<IntStream> supplier = () ->
    IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);
nosid
  • 48,932
  • 13
  • 112
  • 139
  • 3
    i am considering switching the accepted answer to the one by Elazar and linking to yours as a great example of the second solution and to the solution of the specific example I used in my question. hope that's ok. thanks! – necromancer Jun 30 '14 at 02:12
  • 12
    @necromancer: Thanks for asking. Feel free to change the accepted answer. – nosid Jun 30 '14 at 06:57
46

It is not possible in general.

If you want to duplicate an input stream, or input iterator, you have two options:

A. Keep everything in a collection, say a List<>

Suppose you duplicate a stream into two streams s1 and s2. If you have advanced n1 elements in s1 and n2 elements with s2, you must keep |n2 - n1| elements in memory, just to keep pace. If your stream is infinite, there may be no upper bound for the storage required.

Take a look at Python's tee() to see what it takes:

This itertool may require significant auxiliary storage (depending on how much temporary data needs to be stored). In general, if one iterator uses most or all of the data before another iterator starts, it is faster to use list() instead of tee().

B. When possible: Copy the state of the generator that creates the elements

For this option to work, you'll probably need access to the inner workings of the stream. In other words, the generator - the part that creates the elements - should support copying in the first place. [OP: See this great answer, as an example of how this can be done for the example in the question]

It will not work on input from the user, since you'll have to copy the state of the entire "outside world". Java's Stream do not support copying, since it is designed to be as general as possible; for example, to work with files, network, keyboard, sensors, randomness etc. [OP: Another example is a stream that reads a temperature sensor on demand. It cannot be duplicated without storing a copy of the readings]

This is not only the case in Java; this is a general rule. You can see that std::istream in C++ only supports move semantics, not copy semantics ("copy constructor (deleted)"), for this reason (and others).

Elazar
  • 20,415
  • 4
  • 46
  • 67
  • 2
    +1 awesome answer; will likely accept and link to the currently accepted answer as a specific example of point "B." – necromancer Jun 30 '14 at 02:07
  • 3
    a blocking queue would be one solution that allows bounded storage problem where a reader to the first stream would be blocked until the second stream is consumed. naturally, not always applicable but might work for some use cases esp. with a large buffer. – necromancer Jun 30 '14 at 02:10
  • 1
    Note that you might be able to compress the `n2 - n1` elements, although I don't believe it to be practical too often. – Elazar Sep 07 '17 at 13:56
  • 1
    When I have encountered this type of problem (not so often as I am relatively new to java.streams!) my initial instinct was to go for Option A. However I always felt uneasy about this, it seems like we are moving out of the streams world, then back in again ... just for the sake of duplication. I *think* I'm much in favour of option B. I'm using it in my project and it looks to be working as desired. On the face of it maybe processing has to be duplicated, I guess this is more a question for the authors of the Streams API?? – Bill Naylor Jul 13 '20 at 17:50
9

It's possible if you're buffering elements that you've consumed in one duplicate, but not in the other yet.

We've implemented a duplicate() method for streams in jOOλ, an Open Source library that we created to improve integration testing for jOOQ. Essentially, you can just write:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate();

(note: we currently need to box the stream, as we haven't implemented an IntSeq yet)

Internally, there is a LinkedList buffer storing all values that have been consumed from one stream but not from the other. That's probably as efficient as it gets if your two streams are consumed about at the same rate.

Here's how the algorithm works:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final LinkedList<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

More source code here

In fact, using jOOλ, you'll be able to write a complete one-liner like so:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate()
 .map1(s -> s.filter(n -> n % 7 == 0))
 .map2(s -> s.filter(n -> n % 5 == 0));

// This will yield 14, 28, 42, 56...
desired_streams.v1.forEach(System.out::println)

// This will yield 10, 20, 30, 40...
desired_streams.v2.forEach(System.out::println);
Lukas Eder
  • 211,314
  • 129
  • 689
  • 1,509
  • 4
    Thanks but the currently accepted answer does state: "If you have advanced n1 elements in s1 and n2 elements with s2, you have to keep |n2 - n1| elements in memory, just to keep pace. If your stream is infinite, there will be no upper bound for the storage required." – necromancer Sep 23 '14 at 10:55
7

Starting with Java 12 we have Collectors::teeing that allows us to pass elements of the main stream pipeline to 2 or more downstream collectors.

Based on your example we can do the following:

@Test
void shouldProcessStreamElementsInTwoSeparateDownstreams() {
    class Result {
        List<Integer> multiplesOf7;
        List<Integer> multiplesOf5;

        Result(List<Integer> multiplesOf7, List<Integer> multiplesOf5) {
            this.multiplesOf7 = multiplesOf7;
            this.multiplesOf5 = multiplesOf5;
        }
    }

    var result = IntStream.range(1, 100)
            .filter(n -> n % 2 == 0)
            .boxed()
            .collect(Collectors.teeing(
                    Collectors.filtering(n -> n % 7 == 0, Collectors.toList()),
                    Collectors.filtering(n -> n % 5 == 0, Collectors.toList()),
                    Result::new
            ));

    assertTrue(result.multiplesOf7.stream().allMatch(n -> n % 7 == 0));
    assertTrue(result.multiplesOf5.stream().allMatch( n -> n % 5 == 0));
}

There are many other collectors that allows to do other things e.g. by using Collectors::mapping in downstream you can obtain two different objects/types from the same source as shown in this article.

Adrian
  • 2,984
  • 15
  • 27
5

You can also move the stream generation into separate method/function that returns this stream and call it twice.

Tomasz GORKA
  • 95
  • 2
  • 6
4

Either,

  • Move the initialisation into a method, and simply call the method again

This has the advantage of being explicit about what you are doing, and also works for infinite streams.

  • Collect the stream and then re-stream it

In your example:

final int[] arr = IntStream.range(1, 100).filter(n -> n % 2 == 0).toArray();

Then

final IntStream s = IntStream.of(arr);
Boris the Spider
  • 59,842
  • 6
  • 106
  • 166
  • 1
    Thanks, I realized there is a simpler answer (see my own answer); Collecting the stream is not quite memory efficient and simply wouldn't work for infinite streams. – necromancer Jun 29 '14 at 09:45
  • 1
    You answer doesn't involve _processing_ the streams. From your question I understood that you wanted to take a single stream and say, collect it to a `Map` and also `sum()` it. You are just taking about setting up pipelines. – Boris the Spider Jun 29 '14 at 09:47
3

Update: This doesn't work. See explanation below, after the text of the original answer.

How silly of me. All that I need to do is:

Stream desired_stream = IntStream.range(1, 100).filter(n -> n % 2 == 0);
Stream stream14 = desired_stream.filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_stream.filter(n -> n % 5 == 0); // multiples of 10

Explanation why this does not work:

If you code it up and try to collect both streams, the first one will collect fine, but trying to stream the second one will throw the exception: java.lang.IllegalStateException: stream has already been operated upon or closed.

To elaborate, streams are stateful objects (which by the way cannot be reset or rewound). You can think of them as iterators, which in turn are like pointers. So stream14 and stream10 can be thought of as references to the same pointer. Consuming the first stream all the way will cause the pointer to go "past the end." Trying to consume the second stream is like trying to access a pointer that is already "past the end," Which naturally is an illegal operation.

As the accepted answer shows, the code to create the stream must be executed twice but it can be compartmentalized into a Supplier lambda or a similar construct.

Full test code: save into Foo.java, then javac Foo.java, then java Foo

import java.util.stream.IntStream;

public class Foo {
  public static void main (String [] args) {
    IntStream s = IntStream.range(0, 100).filter(n -> n % 2 == 0);
    IntStream s1 = s.filter(n -> n % 5 == 0);
    s1.forEach(n -> System.out.println(n));
    IntStream s2 = s.filter(n -> n % 7 == 0);
    s2.forEach(n -> System.out.println(n));
  }
}

Output:

$ javac Foo.java
$ java Foo
0
10
20
30
40
50
60
70
80
90
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.<init>(AbstractPipeline.java:203)
    at java.util.stream.IntPipeline.<init>(IntPipeline.java:91)
    at java.util.stream.IntPipeline$StatelessOp.<init>(IntPipeline.java:592)
    at java.util.stream.IntPipeline$9.<init>(IntPipeline.java:332)
    at java.util.stream.IntPipeline.filter(IntPipeline.java:331)
    at Foo.main(Foo.java:8)
necromancer
  • 23,916
  • 22
  • 68
  • 115
1

For non-infinite streams, if you have access to the source, its straight forward:

@Test
public void testName() throws Exception {
    List<Integer> integers = Arrays.asList(1, 2, 4, 5, 6, 7, 8, 9, 10);
    Stream<Integer> stream1 = integers.stream();
    Stream<Integer> stream2 = integers.stream();

    stream1.forEach(System.out::println);
    stream2.forEach(System.out::println);
}

prints

1 2 4 5 6 7 8 9 10

1 2 4 5 6 7 8 9 10

For your case:

Stream originalStream = IntStream.range(1, 100).filter(n -> n % 2 == 0)

List<Integer> listOf = originalStream.collect(Collectors.toList())

Stream stream14 = listOf.stream().filter(n -> n % 7 == 0);
Stream stream10 = listOf.stream().filter(n -> n % 5 == 0);

For performance etc, read someone else's answer ;)

Blundell
  • 75,855
  • 30
  • 208
  • 233
  • 1
    Thanks, but the spirit of the question was to `tee` a stream for which the source is inaccessible, or indeterminate such as stdin. It is adequately clear to me now that the cost of such teeing stems primarily from differences in the rate of effective consumption. e. g. if one consumer runs far ahead of the other, the difference needs to be buffered separately and additionally. – necromancer Dec 07 '18 at 05:33
1

I used this great answer to write following class:

public class SplitStream<T> implements Stream<T> {
    private final Supplier<Stream<T>> streamSupplier;

    public SplitStream(Supplier<Stream<T>> t) {
        this.streamSupplier = t;
    }

    @Override
    public Stream<T> filter(Predicate<? super T> predicate) {
        return streamSupplier.get().filter(predicate);
    }

    @Override
    public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
        return streamSupplier.get().map(mapper);
    }

    @Override
    public IntStream mapToInt(ToIntFunction<? super T> mapper) {
        return streamSupplier.get().mapToInt(mapper);
    }

    @Override
    public LongStream mapToLong(ToLongFunction<? super T> mapper) {
        return streamSupplier.get().mapToLong(mapper);
    }

    @Override
    public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
        return streamSupplier.get().mapToDouble(mapper);
    }

    @Override
    public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
        return streamSupplier.get().flatMap(mapper);
    }

    @Override
    public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
        return streamSupplier.get().flatMapToInt(mapper);
    }

    @Override
    public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
        return streamSupplier.get().flatMapToLong(mapper);
    }

    @Override
    public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
        return streamSupplier.get().flatMapToDouble(mapper);
    }

    @Override
    public Stream<T> distinct() {
        return streamSupplier.get().distinct();
    }

    @Override
    public Stream<T> sorted() {
        return streamSupplier.get().sorted();
    }

    @Override
    public Stream<T> sorted(Comparator<? super T> comparator) {
        return streamSupplier.get().sorted(comparator);
    }

    @Override
    public Stream<T> peek(Consumer<? super T> action) {
        return streamSupplier.get().peek(action);
    }

    @Override
    public Stream<T> limit(long maxSize) {
        return streamSupplier.get().limit(maxSize);
    }

    @Override
    public Stream<T> skip(long n) {
        return streamSupplier.get().skip(n);
    }

    @Override
    public void forEach(Consumer<? super T> action) {
        streamSupplier.get().forEach(action);
    }

    @Override
    public void forEachOrdered(Consumer<? super T> action) {
        streamSupplier.get().forEachOrdered(action);
    }

    @Override
    public Object[] toArray() {
        return streamSupplier.get().toArray();
    }

    @Override
    public <A> A[] toArray(IntFunction<A[]> generator) {
        return streamSupplier.get().toArray(generator);
    }

    @Override
    public T reduce(T identity, BinaryOperator<T> accumulator) {
        return streamSupplier.get().reduce(identity, accumulator);
    }

    @Override
    public Optional<T> reduce(BinaryOperator<T> accumulator) {
        return streamSupplier.get().reduce(accumulator);
    }

    @Override
    public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
        return streamSupplier.get().reduce(identity, accumulator, combiner);
    }

    @Override
    public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
        return streamSupplier.get().collect(supplier, accumulator, combiner);
    }

    @Override
    public <R, A> R collect(Collector<? super T, A, R> collector) {
        return streamSupplier.get().collect(collector);
    }

    @Override
    public Optional<T> min(Comparator<? super T> comparator) {
        return streamSupplier.get().min(comparator);
    }

    @Override
    public Optional<T> max(Comparator<? super T> comparator) {
        return streamSupplier.get().max(comparator);
    }

    @Override
    public long count() {
        return streamSupplier.get().count();
    }

    @Override
    public boolean anyMatch(Predicate<? super T> predicate) {
        return streamSupplier.get().anyMatch(predicate);
    }

    @Override
    public boolean allMatch(Predicate<? super T> predicate) {
        return streamSupplier.get().allMatch(predicate);
    }

    @Override
    public boolean noneMatch(Predicate<? super T> predicate) {
        return streamSupplier.get().noneMatch(predicate);
    }

    @Override
    public Optional<T> findFirst() {
        return streamSupplier.get().findFirst();
    }

    @Override
    public Optional<T> findAny() {
        return streamSupplier.get().findAny();
    }

    @Override
    public Iterator<T> iterator() {
        return streamSupplier.get().iterator();
    }

    @Override
    public Spliterator<T> spliterator() {
        return streamSupplier.get().spliterator();
    }

    @Override
    public boolean isParallel() {
        return streamSupplier.get().isParallel();
    }

    @Override
    public Stream<T> sequential() {
        return streamSupplier.get().sequential();
    }

    @Override
    public Stream<T> parallel() {
        return streamSupplier.get().parallel();
    }

    @Override
    public Stream<T> unordered() {
        return streamSupplier.get().unordered();
    }

    @Override
    public Stream<T> onClose(Runnable closeHandler) {
        return streamSupplier.get().onClose(closeHandler);
    }

    @Override
    public void close() {
        streamSupplier.get().close();
    }
}

When you call any method of it's class, it delegates call to

streamSupplier.get()

So, instead of:

Supplier<IntStream> supplier = () ->
    IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);

You can do:

SplitStream<Integer> stream = 
    new SplitStream<>(() -> IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed());
stream.filter(...);
stream.filter(...);

You can expand it to work with IntStream, DoubleStream, etc...

Azazell
  • 11
  • 3
1

I think that the use of Concat with an empty stream could attend your need. Try something like this:

Stream<Integer> concat = Stream.concat(Stream.of(1, 2), Stream.empty());
1

Straight answer is: yes

There's no specific support for this but one can implement it. The possible approaches that I see are these:
a. copy the entire stream data and then create the stream copies based on it -> the RAM consumption might be an impediment
b. read the stream and relay each of its elements to the copies -> I'll detail this approach below

The Concept

Let's imagine b. solution:
<T> List<Stream<T>> copyStream(int copiesCount, Stream<T> originalStream)
allows one to create copiesCount copies of the originalStream.

To understand the solution one has to understand the difference between a stream and the data-elements that might flow through it: for example an apple, a carrot and a potato would be data-elements while a pipe through which they move to reach some destination would be the stream. Copying a Stream it's as if creating more pipes: one has then to connect the original pipe (i.e. originalStream) to the additional ones (aka streamCopies); while in real world one can't pass an apple-object from one pipe to more pipes (i.e. streamCopies) in programming this is possible: just pass the variable containing the apple-object reference to the stream copies.

Implementation Details

The Java implementation of the Stream has a great impact on the solution's shape. First impact is related to what happens when data-elements flow through a stream (aka pipe): to actually read (& process) the elements in a Stream a terminal method has to be used, e.g. forEach. In our case originalStream.forEach must be called so that each element is read and passed to the streamCopies (aka downstream pipes); this must happen before copyStream() method returns, which is bad because forEach would block till all originalStream elements are consumed. To solve this copyStream() implementation will spawn a thread in which to call originalStream.forEach. Consuming originalStream elements means passing them to the downstream pipes (i.e. streamCopies); because there's no cache one has to ensure that each originalStream element is transferred to each streamCopies before getting to the next one. This means that all streamCopies must consume the same time: if some streamCopies is not consuming it will block all other streamCopies because originalStream will stop transferring to downstream pipes till everyone consumed current element (aka it will cache nothing for the late streamCopies consumers). But to consume a Stream in Java implies calling a terminal operation on it (e.g. forEach) which blocks the execution till the entire stream is consumed; because we need all streamCopies to be consumed in parallel this must happen on a distinct thread for each! Well, as a miscellaneous fact, one of the streamCopies could in fact be consumed on the current (main) thread. Summarizing, the solution usage would look like below:

List<Stream<?>> streamCopies = copyStream(copiesCount, originalStream);`  
// start a thread for each `streamCopies` into which consume the corresponding 
// stream copy (one of them could be consumed on the current thread though)
// optionally join the consuming threads
// continue your whatever business logic you have

Final Considerations

Some of the limitations apparent above can be circumvented:

  • the copying process is destructive, i.e. originalStream will be unusable after calling copyStream() because it'll be in a pending-consumption. If one really wants to consume it he can create an additional copy which to maybe consume on the current (main) thread (but only after starting the consumption of all other copies)
  • streamCopies must consume all received originalStream elements, otherwise, if one stops, the others block too (read the "Implementation Details" part again to understand why). This means each streamCopies element consumption must occur in a try...catch to ensure the lack of failures (aka processing stop). A production implementation would in fact circumvent this by wrapping each Stream copy with something overwriting close() method such that to remove the failed stream copy from the originalStream-to-streamCopies transfer logic (aka discard the underlying blockingQueue used for the communication between originalStream thread and originalStream thread -> see the implementation below). This implies that the clients would be forced to close the Stream copies but that’s not so uncommon, e.g. see Spring’s JDBCTemplate.queryForStream() outcome having same requirement.
  • as pointed before, each streamCopies terminal operation must be executed in a distinct thread - there's no workaround for this

The Code

Below is the code implementing the b. solution and a test checking its correctness.

@Test
void streamCopyTest() throws ExecutionException, InterruptedException {
    // streamCopies are valid/normal Stream 
    // instances (e.g. it is allowed to be infinite)
    List<Stream<String>> streamCopies = copyStream(3, Stream.of("a", "b", "c", "d"));
    // The 3 copies relay on the original stream which can’t be
    // consumed more than once! Consuming the copies one by one
    // in the same thread isn’t possible because 1st consumed 
    // copy would leave nothing to consume for the others, 
    // so they must be consumed in parallel.
    ExecutorService executorService = Executors.newCachedThreadPool();
    CompletableFuture<?>[] futures =
            streamCopies.stream().map(stream -> CompletableFuture.runAsync(() -> {
                // the same consumption logic for all streamCopies is 
                // used here because this is just an example; the 
                // actual consumption logic could be distinct (and anything)
                String outcome = stream.collect(Collectors.joining(", "));
                // check the thread name in the message to differentiate the outcome
                log.info("\n{}", outcome);
            }, executorService)).toArray(CompletableFuture[]::new);
    CompletableFuture.allOf(futures).get();
    executorService.shutdown();
}

@RequiredArgsConstructor
@Slf4j
public class StreamCopiesFactory {

    /**
     * The amount of elements to be stored in the blockingQueue used 
     * to transfer elements from the original stream to its copies. 
     * This is very different to the cache use for the a. solution:
     * here is about the transfer between original stream and its 
     * copies instead of the entire original stream data-copy.
     * Change or make this configurable.
     */
    private static final int cacheSize = 1;

    /**
     * Each of these stream copies must execute (their terminal operation)
     * on a distinct thread! One of them could actually execute on the 
     * main thread, but only after all the others were started on their 
     * distinct thread.
     */
    public static <T> List<Stream<T>> copyStream(int copies, Stream<T> stream) {
        List<BlockingQueue<Object>> blockingQueues = new ArrayList<>(copies);
        // creating the queues used to relay the stream's elements to the stream's copies
        for (int i = 0; i < copies; i++) {
            blockingQueues.add(new LinkedBlockingQueue<>(cacheSize));
        }
        // consume the stream copies in a distinct thread, otherwise 
        // bq.put (transferring for the next stream copy) would block  
        // because the 2nd stream copy isn't yet consuming 
        Executors.newSingleThreadExecutor().execute(() -> {
            stream.forEach(streamElement -> blockingQueues.forEach(bq -> {
                try {
                    bq.put(streamElement);
                } catch (InterruptedException e) {
                    log.error(e.getMessage(), e);
                    // nothing to do here other than maybe simple optimization related to the
                    // failed bq.put (e.g. sending END_SIGNAL into bq then skipping its next put calls)
                }
            }));
            blockingQueues.forEach(bq -> {
                try {
                    bq.put(END_SIGNAL);
                } catch (InterruptedException e) {
                    log.error(e.getMessage(), e);
                    // nothing to do here
                }
            });
        });
        // creating the copies
        // A production implementation would wrap each Stream copy with 
        // something overwriting close() which to remove from blockingQueues
        // the blockingQueue corresponding to the closed Stream.
        return blockingQueues.stream().map(bq -> new SpliteratorCopy<T>(bq))
                .map(spliterator -> StreamSupport.stream(spliterator, false))
                .collect(Collectors.toList());
    }
}

@RequiredArgsConstructor
@Slf4j
public class SpliteratorCopy<T> implements Spliterator<T> {

    public static final Object END_SIGNAL = new Object();

    private final BlockingQueue<?> blockingQueue;

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        Object nextElement;
        try {
            nextElement = blockingQueue.take();
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
        if (nextElement == END_SIGNAL) {
            return false;
        }
        action.accept((T) nextElement);
        return true;
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public int characteristics() {
        return Spliterator.ORDERED;
    }
}
Adrian
  • 3,321
  • 2
  • 29
  • 46