7

Is it possible to implement a Collector that stops processing of the stream as soon as an answer is available?

For example, if the Collector is computing an average, and one of the values is NaN, I know the answer is going to be NaN without seeing any more values, so further computation is pointless.

Jacob G.
  • 28,856
  • 5
  • 62
  • 116
Michael Kay
  • 156,231
  • 11
  • 92
  • 164
  • 4
    `Stream.collect` is not a short-circuting operation, so the answer is *No*. However, you can emulate that short-circuiting behavior by throwing an unchecked exception when you want to finish processing and then catching it. It's certainly quite a nasty workaround... – fps Mar 23 '18 at 19:25
  • Thanks to all who have responded. In the absence of new information it looks as if the answer is going to be no. I don't think exceptions will work because (apart from the general messiness of the approach) I want the short-circuiting to be an internal optimization that isn't visible to the user of my Collector. – Michael Kay Mar 23 '18 at 22:47
  • 1
    Take a look at [StreamEx.collect(...)](https://github.com/amaembo/streamex/blob/master/src/main/java/one/util/streamex/AbstractStreamEx.java) This implementation uses this approach, hiding it from the user. It uses a `Predicate` to check whether to "short-circuit" or not on each accumulation step. – fps Mar 23 '18 at 22:50
  • 1
    Thanks for the pointer to StreamEx - it looks very interesting. I'm not sure I want to bring in the whole of StreamEx as a dependency, but I think there might be ideas there that I can use. – Michael Kay Mar 24 '18 at 10:05

5 Answers5

2

Thanks for the responses. The comments pointed the way to a solution, which I will describe here. It's very much inspired by StreamEx, but adapted to my particular situation.

Firstly, I define an implementation of Stream called XdmStream which in general delegates all methods to an underlying Stream which it wraps.

This immediately gives me the opportunity to define new methods, so for example my users can do stream.last() instead of stream.reduce((first,second)->second), which is a useful convenience.

As an example of a short-circuiting method I have implemented XdmStream.untilFirst(Predicate) as follows (base is the wrapped Stream). The idea of this method is to return a stream that delivers the same results as the original stream, except that when a predicate is satisfied, no more results are delivered.

public XdmStream<T> untilFirst(Predicate<? super XdmItem> predicate) {
    Stream<T> stoppable = base.peek(item -> {
        if (predicate.test(item)) {
            base.close();
        }
    });
    return new XdmStream<T>(stoppable);
}

When I first create the base Stream I call its onClose() method so that a call on close() triggers the supplier of data to stop supplying data.

The close() mechanism doesn't seem particularly well documented (it relies on the concept of a "stream pipeline" and it's not entirely clear when a new stream returned by some method is part of the same pipeline as the original stream) - but it's working for me. I guess I should probably ensure that this is only an optimization, so that the results will still be correct even if the flow of data isn't immediately turned off (e.g. if there is any buffering in the stream).

Michael Kay
  • 156,231
  • 11
  • 92
  • 164
1

In addition to Federico's comment, it is possible to emulate a short-circuiting Collector by ceasing accumulation once a certain condition has been met. Though, this method will only be beneficial if accumulation is expensive. Here's an example, but keep in mind that there are flaws with this implementation:

public class AveragingCollector implements Collector<Double, double[], Double> {
    private final AtomicBoolean hasFoundNaN = new AtomicBoolean();

    @Override
    public Supplier<double[]> supplier() {
        return () -> new double[2];
    }

    @Override
    public BiConsumer<double[], Double> accumulator() {
        return (a, b) -> {
            if (hasFoundNaN.get()) {
                return;
            }

            if (b.equals(Double.NaN)) {
                hasFoundNaN.set(true);
                return;
            }

            a[0] += b;
            a[1]++;
        };
    }

    @Override
    public BinaryOperator<double[]> combiner() {
        return (a, b) -> {
            a[0] += b[0];
            a[1] += b[1];

            return a;
        };
    }

    @Override
    public Function<double[], Double> finisher() {
        return average -> average[0] / average[1];
    }

    @Override
    public Set<Characteristics> characteristics() {
        return new HashSet<>();
    }
}

The following use-case returns Double.NaN, as expected:

public static void main(String args[]) throws IOException {
    DoubleStream.of(1, 2, 3, 4, 5, 6, 7, Double.NaN)
                .boxed()
                .collect(new AveragingCollector()));
}
Jacob G.
  • 28,856
  • 5
  • 62
  • 116
  • Collectors should be stateless. You should put your state into an object which as well holds the double[] and use that as an accumulator. – glglgl Mar 23 '18 at 19:59
  • This works, however it's not short-circuiting, because all the elements of the stream are passed to the accumulator, even after the `AtomicBoolean` is set to `true`. – fps Mar 23 '18 at 21:40
  • @FedericoPeraltaSchaffner Exactly. I don't think there's much else that can be done besides what you mentioned regarding throwing an unchecked `Exception`. – Jacob G. Mar 23 '18 at 21:45
  • 1
    I think this is method used by Tagir Valeev in his StreamEx library, he once commented that one of his `Collector` implementations returned by `MoreCollectors.something(...)` was implemented this way. – fps Mar 23 '18 at 21:47
  • Interesting... I'll look into it because I'm curious of the implementation along with where the exception is even caught. – Jacob G. Mar 23 '18 at 21:48
  • [AbstractStreamEx, method collect](https://github.com/amaembo/streamex/blob/master/src/main/java/one/util/streamex/AbstractStreamEx.java) – fps Mar 23 '18 at 22:41
  • 2
    @FedericoPeraltaSchaffner can't tell why the flag is `AtomicBoolean` when a plain `boolean` would be enough, `accumulator` are safe in this manner; then the definition could be `Collector` not to expose the intermediary result (but this is nitpicking); and last this is not really short-circuiting, you are still getting elements from the source of the stream (imagine this is a DB or File call...) as you mention. Throwing an Exception without a stacktrace would make it short-circuiting – Eugene Mar 24 '18 at 08:35
1

Instead of using a Collector, you could use Stream.allMatch(..) to terminate the Stream early and use the util classes like LongSummaryStatistics directly. If all values (and at least one) were present, you return them, e.g.:

Optional<LongSummaryStatistics> toLongStats(Stream<OptionalLong> stream) {
    LongSummaryStatistics stat = new LongSummaryStatistics();
    boolean allPresent = stream.allMatch(opt -> {
        if (opt.isEmpty()) return false;
        stat.accept(opt.getAsLong());
        return true;
    });
    return allPresent && stat.getCount() > 0 ? Optional.of(stat) : Optional.empty();
}

Instead of a Stream<OptionalLong> you might use a DoubleStream and check for your NaN case.

benez
  • 1,856
  • 22
  • 28
0

For the case of NaN, it might be acceptable to consider this an Exceptional outcome, and so throw a custom NaNAverageException, short circuiting the collection operation. Normally using exceptions for normal control flow is a bad practice, however, it may be justified in this case.

Gonen I
  • 5,576
  • 1
  • 29
  • 60
  • But this was only an example. Another example might be a Collector that returns true or false depending on whether the stream contains duplicate values; here early termination might be the more common case. – Michael Kay Mar 23 '18 at 22:52
  • I believe your question was answered here:https://stackoverflow.com/questions/20746429/limit-a-stream-by-a-predicate as well as here: https://stackoverflow.com/questions/23996454/terminate-or-break-java-8-stream-loop Not quite implementing a collector as you asked, because the built in stream operation is not necessarily a loop, but probably what you were trying to achieve. – Gonen I Mar 24 '18 at 06:29
0
Stream<String> s = Stream.of("1","2","ABC", "3");
    try
    {
        double result = s.collect(Collectors.averagingInt(n -> Integer.parseInt(n)));
        System.err.println("Average :"+ result);
    }
    catch (NumberFormatException e)
    {
        // exception will be thrown it encounters ABC and collector won't go for "3"
        e.printStackTrace();
    }
ddinde
  • 96
  • 1
  • 9