6

I understand that in general Java streams do not split. However, we have an involved and lengthy pipeline, at the end of which we have two different types of processing that share the first part of the pipeline.

Due to the size of the data, storing the intermediate stream product is not a viable solution. Neither is running the pipeline twice.

Basically, what we are looking for is a solution that is an operation on a stream that yields two (or more) streams that are lazily filled and able to be consumed in parallel. By that, I mean that if stream A is split into streams B and C, when streams B and C consume 10 elements, stream A consumes and provides those 10 elements, but if stream B then tries to consume more elements, it blocks until stream C also consumes them.

Is there any pre-made solution for this problem or any library we can look at? If not, where would we start to look if we want to implement this ourselves? Or is there a compelling reason not to implemented at all?

Torque
  • 3,319
  • 2
  • 27
  • 39
  • 1
    Java streams may not be the right tool for the job. You should look into using something like [`CyclicBarrier`](https://docs.oracle.com/javase/10/docs/api/java/util/concurrent/CyclicBarrier.html). – Andreas Jul 26 '18 at 18:30
  • 1
    I would give up on using streams for this problem. – Louis Wasserman Jul 26 '18 at 18:33
  • 1
    Sounds like a dangerous concept. Would `fork(stream).get(0).count()` block indefinitely? – shmosel Jul 26 '18 at 18:34
  • What is the source? A `Stream`? An `Inputstream`? A `List` please elaborate more – Lino Jul 26 '18 at 18:40
  • Also how would you know how much elements the `Stream` has? To block for N elements you'd know that the `Stream` has more than N elements in it – Lino Jul 26 '18 at 18:48
  • @shmosel with the OP’s proposal, *any* operation a Stream will block indefinitely, unless someone performs processing the other Stream in a different thread. And it would disallow any short-circuiting terminal operation. – Holger Jul 27 '18 at 15:48

2 Answers2

4

I don't know about functionality that would fulfill your blocking requirement, but you might be interested in jOOλ's Seq.duplicate() method:

Stream<T> streamA = Stream.of(/* your data here */);
Tuple2<Seq<T>, Seq<T>> streamTuple = Seq.seq(streamA).duplicate();
Stream<T> streamB = streamTuple.v1();
Stream<T> streamC = streamTuple.v2();

The Streams can be consumed absolutely independently (including consumption in parallel) thanks to the SeqBuffer class that's used internally by this method.

Note that:

  • SeqBuffer will cache even the elements that are no longer needed because they have already been consumed by both streamB and streamC (so if you cannot afford to keep them in memory, it's not a solution for you);
  • as I mentioned at the beginning, streamB and streamC will not block one another.

Disclaimer: I am the author of the SeqBuffer class.

Tomasz Linkowski
  • 4,386
  • 23
  • 38
  • Please look at [answer] about link in answers. This could be a comment at this point. – AxelH Jul 27 '18 at 13:03
  • Looks interesting. To check if I understand you correctly, it means that the common part of the pipelines of the streams is also executed twice, right? – Malte Hartwig Jul 27 '18 at 13:43
  • 1
    @MalteHartwig No, the pipeline is executed only once (through `streamA.spliterator()`), but all the elements that have been consumed by either `streamB` or `streamC` are being cached inside `SeqBuffer` in an `ArrayList`. This `ArrayList` is synchronized in such a way that it also guards the `Spliterator` obtained from `streamA.spliterator()`. – Tomasz Linkowski Jul 27 '18 at 14:08
3

You can implement a custom Spliterator in order to achieve such behavior. We will split your streams into the common "source" and the different "consumers". The custom spliterator then forwards the elements from the source to each consumer. For this purpose, we will use a BlockingQueue (see this question).

Note that the difficult part here is not the spliterator/stream, but the syncing of the consumers around the queue, as the comments on your question already indicate. Still, however you implement the syncing, Spliterator helps to use streams with it.

@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
    return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}

private static class ForkingSpliterator<T>
    extends AbstractSpliterator<T>
{
    private Spliterator<T>   sourceSpliterator;

    private BlockingQueue<T> queue      = new LinkedBlockingQueue<>();

    private AtomicInteger    nextToTake = new AtomicInteger(0);
    private AtomicInteger    processed  = new AtomicInteger(0);

    private boolean          sourceDone;
    private int              consumerCount;

    @SafeVarargs
    private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
    {
        super(Long.MAX_VALUE, 0);

        sourceSpliterator = source.spliterator();
        consumerCount = consumers.length;

        for (int i = 0; i < consumers.length; i++)
        {
            int index = i;
            Consumer<Stream<T>> consumer = consumers[i];
            new Thread(new Runnable()
            {
                @Override
                public void run()
                {
                    consumer.accept(StreamSupport.stream(new ForkedConsumer(index), false));
                }
            }).start();
        }
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        sourceDone = !sourceSpliterator.tryAdvance(queue::offer);
        return !sourceDone;
    }

    private class ForkedConsumer
        extends AbstractSpliterator<T>
    {
        private int index;

        private ForkedConsumer(int index)
        {
            super(Long.MAX_VALUE, 0);

            this.index = index;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action)
        {
            // take next element when it's our turn
            while (!nextToTake.compareAndSet(index, index + 1))
            {
            }
            T element;
            while ((element = queue.peek()) == null)
            {
                if (sourceDone)
                {
                    // element is null, and there won't be no more, so "terminate" this sub stream
                    return false;
                }
            }

            // push to consumer pipeline
            action.accept(element);

            if (consumerCount == processed.incrementAndGet())
            {
                // start next round
                queue.poll();
                processed.set(0);
                nextToTake.set(0);
            }

            return true;
        }
    }
}

With the approach used, the consumers work on each element in parallel, but wait for each other before starting on the next element.

Known issue If one of the consumers is "shorter" than the others (e.g. because it calls limit()) it will also stop the other consumers and leave the threads hanging.


Example

public static void sleep(long millis)
{
    try { Thread.sleep((long) (Math.random() * 30 + millis)); } catch (InterruptedException e) { }
}

streamForked(Stream.of("1", "2", "3", "4", "5"),
             source -> source.map(word -> { sleep(50); return "fast   " + word; }).forEach(System.out::println),
             source -> source.map(word -> { sleep(300); return "slow      " + word; }).forEach(System.out::println),
             source -> source.map(word -> { sleep(50); return "2fast        " + word; }).forEach(System.out::println));

fast   1
2fast        1
slow      1
fast   2
2fast        2
slow      2
2fast        3
fast   3
slow      3
fast   4
2fast        4
slow      4
2fast        5
fast   5
slow      5
Malte Hartwig
  • 4,477
  • 2
  • 14
  • 30
  • This is pretty much exactly what we were looking for, thank you very much. The issue with one stream being 'shorter' is a given with a design like this, and not a problem for us. – Torque Aug 09 '18 at 13:10
  • @Torque glad to hear it! Browsing the code briefly revealed one potential issue (though probably not very likely): When kicking off the next round, it is probably better to first reset `processed` before `nextToTake` because `nextToTake` is what everbody is waiting for. I'll change it above, just to let you know. – Malte Hartwig Aug 09 '18 at 14:37