0

I have a Stream<E> of some elements (can not recreate the stream/traverse it twice) and I want to reduce the elements to a single value. With Collectors.reducing etc. this task is somewhat easy. But now, I want to short-circuit the reduction based on some Predicate<E> and prevent the whole stream from being traversed. Below I will provide some code for more explanation, but the question is the following: How do I implement this with java streams?

Code required for MCVE:

static class Element<T> {

    private final boolean pred;
    private final T value;

    Element(
            final boolean pred,
            final T value) {
        this.pred = pred;
        this.value = value;
    }
}

@SuppressWarnings("unused")
static <E, T> T implementMe(
        final Stream<E> stream,
        final Predicate<E> shortCircuitCondition,
        final T shortCircuitValue,
        final E identityValue,
        final BinaryOperator<E> operator,
        final Function<E, T> finisher) {
    // how should I implement this?
    return null;
}

@Test
public void test() {
    final Collection<Element<Integer>> elements = Arrays.asList(
            new Element<>(true, 3),
            new Element<>(true, 3),
            new Element<>(false, 3),
            new Element<>(true, 3));

    final AtomicInteger counter = new AtomicInteger(0);
    final Stream<Element<Integer>> stream = elements.stream()
            .peek(t -> counter.incrementAndGet());
    final int value =
            ShortCircuit.<Element<Integer>, Integer> implementMe(
                    stream,
                    e -> !e.pred,
                    Integer.MIN_VALUE,
                    new Element<>(true, 0),
                    (l, r) -> new Element<>(true, l.value + r.value),
                    e -> e.value);

    Assert.assertEquals(Integer.MIN_VALUE, value);
    Assert.assertEquals(3, counter.get());
}

With a simple for loop, I can achieve the short-circuit behavior in the following way:

static <E, T> T implementMe(
        final Stream<E> stream,
        final Predicate<E> shortCircuitCondition,
        final T shortCircuitValue,
        final E identityValue,
        final BinaryOperator<E> operator,
        final Function<E, T> finisher) {
    E current = identityValue;
    for (final E t : (Iterable<E>) stream::iterator) {
        if (shortCircuitCondition.test(t)) {
            return shortCircuitValue;
        }
        current = operator.apply(current, t);
    }

    return finisher.apply(current);
}

But I would rather use the stream API, so I tried it with a custom collector. This approach does not short-circuit though (Assertion Error because the counter reaches 4):

static <E, T> T implementMe(
        final Stream<E> stream,
        final Predicate<E> shortCircuitCondition,
        final T shortCircuitValue,
        final E identityValue,
        final BinaryOperator<E> operator,
        final Function<E, T> finisher) {
    final AtomicBoolean shortCircuit = new AtomicBoolean(false);
    return stream.collect(
            Collectors.collectingAndThen(
                    Collectors.reducing(identityValue, (l, r) -> {
                        if (shortCircuitCondition.test(r)) {
                            shortCircuit.set(true);
                        }
                        return operator.apply(l, r);
                    }),
                    e -> shortCircuit.get() ? shortCircuitValue : finisher.apply(e)));
}

Now, I could use a custom spliterator to prevent advancing to the next element once the condition is met:

static <E, T> T implementMe(
        final Stream<E> stream,
        final Predicate<E> shortCircuitCondition,
        final T shortCircuitValue,
        final E identityValue,
        final BinaryOperator<E> operator,
        final Function<E, T> finisher) {

    final AtomicBoolean shortCircuit = new AtomicBoolean(false);
    final Iterator<E> underlyingIterator = stream.iterator();
    final Stream<E> wrappedStream = StreamSupport.stream(
            new Spliterators.AbstractSpliterator<E>(0, 0) {

                @Override
                public boolean tryAdvance(final Consumer<? super E> action) {
                    if (!underlyingIterator.hasNext()) {
                        return false;
                    }
                    final E next = underlyingIterator.next();

                    if (shortCircuitCondition.test(next)) {
                        shortCircuit.set(true);
                        return false;
                    }

                    action.accept(next);
                    return true;
                }

            }, false);

    return wrappedStream.collect(
            Collectors.collectingAndThen(
                    Collectors.reducing(
                            identityValue,
                            operator),
                    e -> shortCircuit.get() ? shortCircuitValue : finisher.apply(e)));
}

None of these approaches feels simple enough. My question(s) are the following:

  • Is there any easy way to implement this?
  • After seeing Sink.cancellationRequested: Is there any way to provide a custom Sink implementation, or a custom collector/terminal operation, so that the traversal of the stream is short-circuited?
  • Does my last approach even make sense, or should I just use the for-loop?

EDIT: I realize that the takeWhile operation is probably what I want in this case (and a java 8 solution is mentioned here). While my concerns remain and I would like to hear good answers to them, my updated question is whether we can also stop the iteration in a terminal operation (takeWhile being a short-circuiting stateful intermediate operation), e.g. if we want to base the stop-condition on some stateful accumulator.

Example code:

class Acc implements Consumer<Integer> {

    int value = 0;

    @Override
    public void accept(Integer i) {
        value += i;
    }

    public Acc merge(Acc other) {
        value += other.value;
        return this;
    }
}

Predicate<Acc> threshold = a -> a.value > 6;
Arrays.asList(3, 4, 5).stream()
            .collect(Acc::new, (acc, e) -> {
                if(threshold.test(acc)) {
                    // How do I stop the traversal here?
                }
                acc.accept(e);
            }, Acc::merge);
sfiss
  • 2,119
  • 13
  • 19
  • 1
    This problem comes up quite frequently on this site and so far I have not seen simple solutions. Check out this related question: [How to short-circuit a reduce() operation on a Stream?](https://stackoverflow.com/questions/32495069/how-to-short-circuit-a-reduce-operation-on-a-stream). Basically, you have to keep state about whether you found the short-circuiting element and the the reduction result. You can see that this causes issues, e.g. with your use of the `AtomicBoolean`. I would think a for loop or a `Spliterator` that encapsulates the state are the best approaches. – Malte Hartwig Jul 23 '19 at 01:20
  • It’s a strange inconsistency to implement a Stream via `Spliterator` but request an `Iterator` from the source Stream. – Holger Jul 23 '19 at 09:18
  • If you happen to be using Java 9+, [takeWhile](https://docs.oracle.com/javase/9/docs/api/java/util/stream/Stream.html#takeWhile-java.util.function.Predicate-) could help you cut short the stream (something like stream.takeWhile(val -> !predicate.test(val)); ). However, this is generally a very annoying operation to implement otherwise. – Avi Aug 07 '19 at 16:18

0 Answers0