Straight answer is: yes
There's no specific support for this but one can implement it. The possible approaches that I see are these:
a. copy the entire stream data and then create the stream copies based on it -> the RAM consumption might be an impediment
b. read the stream and relay each of its elements to the copies -> I'll detail this approach below
The Concept
Let's imagine b.
solution:
<T> List<Stream<T>> copyStream(int copiesCount, Stream<T> originalStream)
allows one to create copiesCount
copies of the originalStream
.
To understand the solution one has to understand the difference between a stream and the data-elements that might flow through it: for example an apple, a carrot and a potato would be data-elements while a pipe through which they move to reach some destination would be the stream. Copying a Stream
it's as if creating more pipes: one has then to connect the original pipe (i.e. originalStream
) to the additional ones (aka streamCopies
); while in real world one can't pass an apple-object from one pipe to more pipes (i.e. streamCopies
) in programming this is possible: just pass the variable containing the apple-object reference to the stream copies.
Implementation Details
The Java implementation of the Stream
has a great impact on the solution's shape. First impact is related to what happens when data-elements flow through a stream (aka pipe): to actually read (& process) the elements in a Stream
a terminal method has to be used, e.g. forEach
. In our case originalStream.forEach
must be called so that each element is read and passed to the streamCopies
(aka downstream pipes); this must happen before copyStream()
method returns, which is bad because forEach
would block till all originalStream
elements are consumed. To solve this copyStream()
implementation will spawn a thread in which to call originalStream.forEach
. Consuming originalStream
elements means passing them to the downstream pipes (i.e. streamCopies
); because there's no cache one has to ensure that each originalStream
element is transferred to each streamCopies
before getting to the next one. This means that all streamCopies
must consume the same time: if some streamCopies
is not consuming it will block all other streamCopies
because originalStream
will stop transferring to downstream pipes till everyone consumed current element (aka it will cache nothing for the late streamCopies
consumers). But to consume a Stream
in Java implies calling a terminal operation on it (e.g. forEach
) which blocks the execution till the entire stream is consumed; because we need all streamCopies
to be consumed in parallel this must happen on a distinct thread for each! Well, as a miscellaneous fact, one of the streamCopies
could in fact be consumed on the current (main) thread. Summarizing, the solution usage would look like below:
List<Stream<?>> streamCopies = copyStream(copiesCount, originalStream);`
// start a thread for each `streamCopies` into which consume the corresponding
// stream copy (one of them could be consumed on the current thread though)
// optionally join the consuming threads
// continue your whatever business logic you have
Final Considerations
Some of the limitations apparent above can be circumvented:
- the copying process is destructive, i.e.
originalStream
will be unusable after calling copyStream()
because it'll be in a pending-consumption. If one really wants to consume it he can create an additional copy which to maybe consume on the current (main) thread (but only after starting the consumption of all other copies)
streamCopies
must consume all received originalStream
elements, otherwise, if one stops, the others block too (read the "Implementation Details" part again to understand why). This means each streamCopies
element consumption must occur in a try...catch
to ensure the lack of failures (aka processing stop). A production implementation would in fact circumvent this by wrapping each Stream
copy with something overwriting close()
method such that to remove the failed stream copy from the originalStream
-to-streamCopies
transfer logic (aka discard the underlying blockingQueue
used for the communication between originalStream
thread and originalStream
thread -> see the implementation below). This implies that the clients would be forced to close
the Stream
copies but that’s not so uncommon, e.g. see Spring’s JDBCTemplate.queryForStream()
outcome having same requirement.
- as pointed before, each
streamCopies
terminal operation must be executed in a distinct thread - there's no workaround for this
The Code
Below is the code implementing the b.
solution and a test checking its correctness.
@Test
void streamCopyTest() throws ExecutionException, InterruptedException {
// streamCopies are valid/normal Stream
// instances (e.g. it is allowed to be infinite)
List<Stream<String>> streamCopies = copyStream(3, Stream.of("a", "b", "c", "d"));
// The 3 copies relay on the original stream which can’t be
// consumed more than once! Consuming the copies one by one
// in the same thread isn’t possible because 1st consumed
// copy would leave nothing to consume for the others,
// so they must be consumed in parallel.
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture<?>[] futures =
streamCopies.stream().map(stream -> CompletableFuture.runAsync(() -> {
// the same consumption logic for all streamCopies is
// used here because this is just an example; the
// actual consumption logic could be distinct (and anything)
String outcome = stream.collect(Collectors.joining(", "));
// check the thread name in the message to differentiate the outcome
log.info("\n{}", outcome);
}, executorService)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).get();
executorService.shutdown();
}
@RequiredArgsConstructor
@Slf4j
public class StreamCopiesFactory {
/**
* The amount of elements to be stored in the blockingQueue used
* to transfer elements from the original stream to its copies.
* This is very different to the cache use for the a. solution:
* here is about the transfer between original stream and its
* copies instead of the entire original stream data-copy.
* Change or make this configurable.
*/
private static final int cacheSize = 1;
/**
* Each of these stream copies must execute (their terminal operation)
* on a distinct thread! One of them could actually execute on the
* main thread, but only after all the others were started on their
* distinct thread.
*/
public static <T> List<Stream<T>> copyStream(int copies, Stream<T> stream) {
List<BlockingQueue<Object>> blockingQueues = new ArrayList<>(copies);
// creating the queues used to relay the stream's elements to the stream's copies
for (int i = 0; i < copies; i++) {
blockingQueues.add(new LinkedBlockingQueue<>(cacheSize));
}
// consume the stream copies in a distinct thread, otherwise
// bq.put (transferring for the next stream copy) would block
// because the 2nd stream copy isn't yet consuming
Executors.newSingleThreadExecutor().execute(() -> {
stream.forEach(streamElement -> blockingQueues.forEach(bq -> {
try {
bq.put(streamElement);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
// nothing to do here other than maybe simple optimization related to the
// failed bq.put (e.g. sending END_SIGNAL into bq then skipping its next put calls)
}
}));
blockingQueues.forEach(bq -> {
try {
bq.put(END_SIGNAL);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
// nothing to do here
}
});
});
// creating the copies
// A production implementation would wrap each Stream copy with
// something overwriting close() which to remove from blockingQueues
// the blockingQueue corresponding to the closed Stream.
return blockingQueues.stream().map(bq -> new SpliteratorCopy<T>(bq))
.map(spliterator -> StreamSupport.stream(spliterator, false))
.collect(Collectors.toList());
}
}
@RequiredArgsConstructor
@Slf4j
public class SpliteratorCopy<T> implements Spliterator<T> {
public static final Object END_SIGNAL = new Object();
private final BlockingQueue<?> blockingQueue;
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
Object nextElement;
try {
nextElement = blockingQueue.take();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
if (nextElement == END_SIGNAL) {
return false;
}
action.accept((T) nextElement);
return true;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.ORDERED;
}
}