Interesting problem. I'll answer the second question first, since it's a simpler issue.
Second question, how the heck is that stream on the BlockingQueue getting closed?
By "closed" I think you mean, the stream has certain number of elements and then it finishes, disregarding any elements that may be added to the queue in the future. The reason is that a stream on a queue represents only the current contents of a queue as of the time the stream is created. It doesn't represent any future elements, that is, those that some other thread might add in the future.
If you want a stream that represents the current and future contents of the queue, then you can use the technique described in this other answer. Basically use Stream.generate()
to call queue.take()
. I don't think this is what you want to do, though, so I won't discuss it further here.
Now to your larger issue.
You have a source of objects upon which you want to do some processing, including filtering. You then want to take the results and send them through two different downstream processing steps. Essentially you have a single producer and two consumers.
One of the fundamental issues you have to handle is how to deal with the cases where the different processing steps occur at different rates. Suppose that we've solve the issue of how to get a stream from queue without the stream terminating prematurely. If the producer can produce elements faster than the consumer can process elements from this queue, the queue will accumulate elements until it fills all available memory.
You also have to decide how to deal with the different consumers processing elements at different rates. If one consumer is significantly slower than the other, either an arbitrary number of elements may need to be buffered (which might fill memory) or the faster consumer will have to be slowed down to match the average rate of the slower consumer.
Let me toss out a sketch of how you might proceed. I don't know your actual requirements, though, so I have no idea whether this will be satisfactory. One thing to note is that using parallel streams with this kind of application can be problematic, since parallel streams don't deal with blocking and load-balancing very well.
First, I'd start off with a stream processing elements from the producer and accumulating them into an ArrayBlockingQueue
:
BlockingQueue<T> queue = new ArrayBlockingQueue<>(capacity);
producer.map(...)
.filter(...)
.forEach(queue::put);
(Note that put
throws InterruptedException
, so you can't just put queue::put
here. You have to put a try-catch block here, or write a helper method instead. But it's not obvious what to do if InterruptedException
is caught.)
If the queue fills up, this will block the pipeline. Either run this sequentially in its own thread, or if in parallel, in a dedicated thread pool, to avoid blocking up the common pool.
Next, the consumers:
while (true) {
// wait until the queue is full, or a timeout has expired,
// depending upon how frequently you want to continue
// processing elements emitted by the producer
List<T> list = new ArrayList<>();
queue.drainTo(list);
downstream1 = list.stream().filter(...).map(...).collect(...);
downstream2 = list.stream().filter(...).map(...).collect(...);
// deal with results downstream1 and downstream2
}
The key here is the handoff from the producer to the consumers is done in batches with the drainTo
method, which adds the elements of the queue to the destination and atomically empties the queue. This way, the consumers don't have to wait for the producer to finish its processing (which won't happen if it's infinite). In addition, the consumers are operating on a known quantity of data and won't block in the midst of processing. Each consumer stream could thus conceivably be run in parallel, if that's helpful.
Here, I have the consumers running in lockstep. If you want the consumers to run at different rates, you'll have to construct additional queues (or something) to buffer up their workloads independently.
If the consumers are overall slower than the producer, the queue will eventually fill up and be blocked, slowing down the producer to the rate the consumers can accept. If the consumers are faster than the producer on average, then maybe you don't need to worry about the relative processing rates of the consumers. You can just have them loop and pick up whatever the producer has managed to put into the queue, or even have them block until something is available.
I should say that what I've outlined is a very simplistic approach to multi-stage pipelining. If your application is performance critical, you may find yourself doing a lot of work tuning memory consumption, load balancing, increasing throughput and reducing latency. There are other frameworks out there that may be more amenable to your application. You might take a look at the LMAX Disruptor, for example, though I don't have any experience with it myself.