I have a Stream<E>
of some elements (can not recreate the stream/traverse it twice) and I want to reduce the elements to a single value. With Collectors.reducing
etc. this task is somewhat easy. But now, I want to short-circuit the reduction based on some Predicate<E>
and prevent the whole stream from being traversed. Below I will provide some code for more explanation, but the question is the following: How do I implement this with java streams?
Code required for MCVE:
static class Element<T> {
private final boolean pred;
private final T value;
Element(
final boolean pred,
final T value) {
this.pred = pred;
this.value = value;
}
}
@SuppressWarnings("unused")
static <E, T> T implementMe(
final Stream<E> stream,
final Predicate<E> shortCircuitCondition,
final T shortCircuitValue,
final E identityValue,
final BinaryOperator<E> operator,
final Function<E, T> finisher) {
// how should I implement this?
return null;
}
@Test
public void test() {
final Collection<Element<Integer>> elements = Arrays.asList(
new Element<>(true, 3),
new Element<>(true, 3),
new Element<>(false, 3),
new Element<>(true, 3));
final AtomicInteger counter = new AtomicInteger(0);
final Stream<Element<Integer>> stream = elements.stream()
.peek(t -> counter.incrementAndGet());
final int value =
ShortCircuit.<Element<Integer>, Integer> implementMe(
stream,
e -> !e.pred,
Integer.MIN_VALUE,
new Element<>(true, 0),
(l, r) -> new Element<>(true, l.value + r.value),
e -> e.value);
Assert.assertEquals(Integer.MIN_VALUE, value);
Assert.assertEquals(3, counter.get());
}
With a simple for loop, I can achieve the short-circuit behavior in the following way:
static <E, T> T implementMe(
final Stream<E> stream,
final Predicate<E> shortCircuitCondition,
final T shortCircuitValue,
final E identityValue,
final BinaryOperator<E> operator,
final Function<E, T> finisher) {
E current = identityValue;
for (final E t : (Iterable<E>) stream::iterator) {
if (shortCircuitCondition.test(t)) {
return shortCircuitValue;
}
current = operator.apply(current, t);
}
return finisher.apply(current);
}
But I would rather use the stream API, so I tried it with a custom collector. This approach does not short-circuit though (Assertion Error because the counter reaches 4):
static <E, T> T implementMe(
final Stream<E> stream,
final Predicate<E> shortCircuitCondition,
final T shortCircuitValue,
final E identityValue,
final BinaryOperator<E> operator,
final Function<E, T> finisher) {
final AtomicBoolean shortCircuit = new AtomicBoolean(false);
return stream.collect(
Collectors.collectingAndThen(
Collectors.reducing(identityValue, (l, r) -> {
if (shortCircuitCondition.test(r)) {
shortCircuit.set(true);
}
return operator.apply(l, r);
}),
e -> shortCircuit.get() ? shortCircuitValue : finisher.apply(e)));
}
Now, I could use a custom spliterator to prevent advancing to the next element once the condition is met:
static <E, T> T implementMe(
final Stream<E> stream,
final Predicate<E> shortCircuitCondition,
final T shortCircuitValue,
final E identityValue,
final BinaryOperator<E> operator,
final Function<E, T> finisher) {
final AtomicBoolean shortCircuit = new AtomicBoolean(false);
final Iterator<E> underlyingIterator = stream.iterator();
final Stream<E> wrappedStream = StreamSupport.stream(
new Spliterators.AbstractSpliterator<E>(0, 0) {
@Override
public boolean tryAdvance(final Consumer<? super E> action) {
if (!underlyingIterator.hasNext()) {
return false;
}
final E next = underlyingIterator.next();
if (shortCircuitCondition.test(next)) {
shortCircuit.set(true);
return false;
}
action.accept(next);
return true;
}
}, false);
return wrappedStream.collect(
Collectors.collectingAndThen(
Collectors.reducing(
identityValue,
operator),
e -> shortCircuit.get() ? shortCircuitValue : finisher.apply(e)));
}
None of these approaches feels simple enough. My question(s) are the following:
- Is there any easy way to implement this?
- After seeing
Sink.cancellationRequested
: Is there any way to provide a custom Sink implementation, or a custom collector/terminal operation, so that the traversal of the stream is short-circuited? - Does my last approach even make sense, or should I just use the for-loop?
EDIT: I realize that the takeWhile
operation is probably what I want in this case (and a java 8 solution is mentioned here). While my concerns remain and I would like to hear good answers to them, my updated question is whether we can also stop the iteration in a terminal operation (takeWhile
being a short-circuiting stateful intermediate operation), e.g. if we want to base the stop-condition on some stateful accumulator.
Example code:
class Acc implements Consumer<Integer> {
int value = 0;
@Override
public void accept(Integer i) {
value += i;
}
public Acc merge(Acc other) {
value += other.value;
return this;
}
}
Predicate<Acc> threshold = a -> a.value > 6;
Arrays.asList(3, 4, 5).stream()
.collect(Acc::new, (acc, e) -> {
if(threshold.test(acc)) {
// How do I stop the traversal here?
}
acc.accept(e);
}, Acc::merge);