Thanks for the responses. The comments pointed the way to a solution, which I will describe here. It's very much inspired by StreamEx, but adapted to my particular situation.
Firstly, I define an implementation of Stream called XdmStream which in general delegates all methods to an underlying Stream which it wraps.
This immediately gives me the opportunity to define new methods, so for example my users can do stream.last()
instead of stream.reduce((first,second)->second)
, which is a useful convenience.
As an example of a short-circuiting method I have implemented XdmStream.untilFirst(Predicate)
as follows (base
is the wrapped Stream). The idea of this method is to return a stream that delivers the same results as the original stream, except that when a predicate is satisfied, no more results are delivered.
public XdmStream<T> untilFirst(Predicate<? super XdmItem> predicate) {
Stream<T> stoppable = base.peek(item -> {
if (predicate.test(item)) {
base.close();
}
});
return new XdmStream<T>(stoppable);
}
When I first create the base Stream I call its onClose()
method so that a call on close() triggers the supplier of data to stop supplying data.
The close()
mechanism doesn't seem particularly well documented (it relies on the concept of a "stream pipeline" and it's not entirely clear when a new stream returned by some method is part of the same pipeline as the original stream) - but it's working for me. I guess I should probably ensure that this is only an optimization, so that the results will still be correct even if the flow of data isn't immediately turned off (e.g. if there is any buffering in the stream).