19

I am trying to understand if there is a way to terminate reduction operation without examining the whole stream and I cannot figure out a way.

The use-case is roughly as follows: let there be a long list of Integers which needs to be folded into an Accumulator. Each element examination is potentially expensive, so within the Accumulator, I perform a check on the incoming Accumulator to see if we even need to perform expensive operation - if we don't, then I simply return the accumulator.

This is obviously a fine solution for small(er) lists but huge lists incur unnecessary stream element visiting costs I'd like to avoid.

Here's a code sketch - assume serial reductions only.

class Accumulator {
    private final Set<A> setA = new HashSet<>;
    private final Set<B> setB = new HashSet<>;
}

class ResultSupplier implements Supplier<Result> {

    private final List<Integer> ids;

    @Override
    public Result get() {
        Accumulator acc = ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);

        return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
    }

    private static BiFunction<Accumulator, Integer, Accumulator> f() {
        return (acc, element) -> {
            if (acc.setA.size() <= 1) {
                // perform expensive ops and accumulate results
            }
            return acc;
        };
    }
}

In addition to having to traverse the whole Stream, there is another fact I dislike - I have to check the same condition twice (namely, setA size check).

I have considered map() and collect() operations but they just seemed like more of the same and didn't find they materially change the fact that I just can't finish the fold operation without examining the entire stream.

Additionally, my thinking is that imaginary takeWhile(p : (A) => boolean) Stream API correspondent would also buy us nothing, as the terminating condition depends on the accumulator, not stream elements per se.

Bear in mind I am a relative newcomer to FP so - is there a way to make this work as I expect it? Have I set up the whole problem improperly or is this limitation by design?

quantum
  • 3,000
  • 5
  • 41
  • 56
  • 2
    Your entire use of `reduce` is wrong as you are using a mutable container. You should use `collect` for that. – Holger Jun 09 '15 at 19:28
  • As stated in question, I tried that as well - I made my `Accumulator` an `IntConsumer` and later on a fully-fledged `Collector`, but I couldn't see a way to break full stream examination. Can you hint further at what should be done? – quantum Jun 09 '15 at 20:19
  • Related (maybe even duplicate, but not strictly) : http://stackoverflow.com/questions/20746429/java-8-limit-infinite-stream-by-a-predicate – Marco13 Jun 09 '15 at 20:51
  • 1
    btw, your `f()` could be rewritten as `private static final BiFunction f = (acc, element) -> {...}`, or just as a static 2-argument function and passing it as `ResultSupplier::f` – the8472 Jun 09 '15 at 21:46
  • 2
    @Quantum: I didn’t say that using `collect` will solve the problem, just that using `reduce` with a mutable container is wrong in general, regardless of which problem you want to solve. If you plan to do more with streams in the future, “mutable” combined with “`reduce`” should trigger a “no good” reflex. – Holger Jun 10 '15 at 08:34
  • Can you easily filter out those elements you do not want to examine? – Thorbjørn Ravn Andersen Jan 14 '16 at 15:21

6 Answers6

8

Instead of starting with ids.stream() you can

  1. use ids.spliterator()
  2. wrap resulting spliterator into custom spliterator that has a volatile boolean flag
  3. have the custom spliterator's tryAdvance return false if the flag is changed
  4. turn your custom spliterator into a stream with StreamSupport.stream(Spliterator<T>, boolean)
  5. continue your stream pipeline as before
  6. shut down the stream by toggling the boolean when your accumulator is full

Add some static helper methods to keep it functional.

the resulting API could look about this

Accumulator acc = terminateableStream(ids, (stream, terminator) ->
   stream.reduce(new Accumulator(terminator), f(), (x, y) -> null));

Additionally, my thinking is that imaginary takeWhile(p : (A) => boolean) Stream API correspondent would also buy us nothing

It does work if the condition is dependent on the accumulator state and not on the stream members. That's essentially the approach i've outlined above.

It probably would be forbidden in a takeWhile provided by the JDK but a custom implementation using spliterators is free to take a stateful approach.

the8472
  • 40,999
  • 5
  • 70
  • 122
  • Hm, this is an interesting proposal - it does sound a little too elaborate and low-level for this but I'll give it a try. – quantum Jun 09 '15 at 20:22
7

Of course, there will be an interesting, purely FP answer that might help solve this problem in the way you intend.

In the meantime, why use FP at all when the simple solution is pragmatically imperative and your original data source is a List anyway, which is already fully materialised, and you will use serial reduction, not parallel reduction. Write this instead:

@Override
public Result get() {
    Accumulator acc = new Accumulator();

    for (Integer id : ids) {
        if (acc.setA.size() <= 1) {
            // perform expensive ops and accumulate results
        }

        // Easy:
        if (enough)
            break;
    }

    return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
}
Lukas Eder
  • 211,314
  • 129
  • 689
  • 1,509
3

As mentioned in the comments: The usage scenario sounds a bit dubious. On the one hand, because of the usage of reduce instead of collect, on the other hand because of the fact that the condition that should be used for stopping the reduction also appears in the accumulator. It sounds like simply limiting the stream to a certain number of elements, or based on a condition, as shown in another question, may be more appropriate here.

Of course, in the real application, it might be that the condition is in fact unrelated to the number of elements that have been processed. For this case, I sketched a solution here that basically corresponds to the answer by the8472, and is very similar to the solution from the question mentioned above: It uses a Stream that is created from a Spliterator that simply delegates to the original Spliterator, unless the stopping condition is met.

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StopStreamReduction
{
    public static void main(String[] args)
    {
        ResultSupplier r = new ResultSupplier();
        System.out.println(r.get());
    }
}

class Accumulator
{
    final Set<Integer> set = new HashSet<Integer>();
}

class ResultSupplier implements Supplier<String>
{
    private final List<Integer> ids;
    ResultSupplier()
    {
        ids = new ArrayList<Integer>(Collections.nCopies(20, 1));
    }

    public String get()
    {
        //return getOriginal();
        return getStopping();
    }

    private String getOriginal()
    {
        Accumulator acc =
            ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);
        return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set);
    }

    private String getStopping()
    {
        Spliterator<Integer> originalSpliterator = ids.spliterator();
        Accumulator accumulator = new Accumulator();
        Spliterator<Integer> stoppingSpliterator = 
            new Spliterators.AbstractSpliterator<Integer>(
                originalSpliterator.estimateSize(), 0)
            {
                @Override
                public boolean tryAdvance(Consumer<? super Integer> action)
                {
                    return accumulator.set.size() > 10 ? false : 
                        originalSpliterator.tryAdvance(action);
                }
            };
        Stream<Integer> stream = 
            StreamSupport.stream(stoppingSpliterator, false);
        Accumulator acc =
            stream.reduce(accumulator, f(), (x, y) -> null);
        return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set);
    }

    private static int counter = 0;
    private static BiFunction<Accumulator, Integer, Accumulator> f()
    {
        return (acc, element) -> {

            System.out.print("Step " + counter);
            if (acc.set.size() <= 10)
            {
                System.out.print(" expensive");
                acc.set.add(counter);
            }
            System.out.println();
            counter++;
            return acc;
        };
    }
}

Edit in response to the comments:

Of course, it is possible to write it "more functional". However, due to the vague descriptions in the questions and the rather "sketchy" code example, it's hard to find "THE" most appropriate solution here. (And "appropriate" refers to the specific caveats of the actual task, and to the question of how functional it should be without sacrificing readability).

Possible functionalization steps might include the creation of a generic StoppingSpliterator class that operates on a delegate Spliterator and has a Supplier<Boolean> as its stopping condition, and feeding this with a Predicate on the actual Accumulator, together with using some utility methods and method references here and there.

But again: It is debatable whether this is actually an appropriate solution, or whether one should not rather use the simple and pragmatic solution from the answer by Lukas Eder...

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

public class StopStreamReduction
{
    public static void main(String[] args)
    {
        List<Integer> collection = 
            new ArrayList<Integer>(Collections.nCopies(20, 1));
        System.out.println(compute(collection));
    }

    private static String compute(List<Integer> collection)
    {
        Predicate<Accumulator> stopCondition = (a) -> a.set.size() > 10;
        Accumulator result = reduceStopping(collection, 
            new Accumulator(), StopStreamReduction::accumulate, stopCondition);
        return (result.set.size() > 11) ? "invalid" : String.valueOf(result.set);
    }

    private static int counter;
    private static Accumulator accumulate(Accumulator a, Integer element)
    {
        System.out.print("Step " + counter);
        if (a.set.size() <= 10)
        {
            System.out.print(" expensive");
            a.set.add(counter);
        }
        System.out.println();
        counter++;
        return a;
    }

    static <U, T> U reduceStopping(
        Collection<T> collection, U identity,
        BiFunction<U, ? super T, U> accumulator,
        Predicate<U> stopCondition)
    {
       // This assumes that the accumulator always returns
       // the identity instance (with the accumulated values).
       // This may not always be true!
       return StreamSupport.stream(
           new StoppingSpliterator<T>(
               collection.spliterator(), 
               () -> stopCondition.test(identity)), false).
                   reduce(identity, accumulator, (x, y) -> null);
    }
}

class Accumulator
{
    final Set<Integer> set = new HashSet<Integer>();
}

class StoppingSpliterator<T> extends Spliterators.AbstractSpliterator<T>
{
    private final Spliterator<T> delegate;
    private final Supplier<Boolean> stopCondition;

    StoppingSpliterator(Spliterator<T> delegate, Supplier<Boolean> stopCondition)
    {
        super(delegate.estimateSize(), 0);
        this.delegate = delegate;
        this.stopCondition = stopCondition;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        if (stopCondition.get())
        {
            return false;
        }
        return delegate.tryAdvance(action);
    }
}
Community
  • 1
  • 1
Marco13
  • 53,703
  • 9
  • 80
  • 159
  • Just for the record - your supposition is correct: stopping the reduction is unrelated to number of stream elements visited and, in fact, cannot be a priori inferred without inspecting the `Accumulator` state. That is exactly why I suspected there was something wrong with my solution design and was hoping someone would present a 'more-FP' solution sketch or hint. At any rate, thanks for your effort and your code sketch, I'll give it a shot as soon as possible and see how it looks. – quantum Jun 09 '15 at 21:11
  • That is some nifty functional programming right there. Now I understand why they say it's so concise ;) – Lukas Eder Jun 09 '15 at 21:21
  • 4
    i'm fairly certain it could be made more concise and more functional in appearance. – the8472 Jun 09 '15 at 21:33
  • @the8472 There's not much going on in this method. A few objects are created, and apart from "hiding" these creations or making 2 or 3 lines a tad more concise by combining them into a utility method, I don't see much room for a "more functional" solution. Originally, I created a class `StoppableSpliterator` that received a `Supplier` for supplying the stopping condition, which might "look" more functional at the first glance, but effectively boils down to the same thing. However, I'm open for suggestions.... – Marco13 Jun 10 '15 at 08:42
  • *but effectively boils down to the same thing*, just like streams effectively are a pile of objects and loops that you could all write out manually. i'm considering the stoppablespliterator as an external helper (to be written once) that can then be used to write the partial reduction more concisely. also, as written in an other comment `f()` could be shortened. In fact, since OP was asking for a more functional approach, the whole `ResultSupplier` might be reducible into a single static function with a few things in its local scope as effectively final variables. Higher order functions... – the8472 Jun 10 '15 at 09:00
  • @the8472 I'm not sure how far one should go here. Java is not Lisp or Haskell. But I have added an "edit", for those who are interested... – Marco13 Jun 10 '15 at 10:37
  • yeah, that's about how I would have done it, give or take a few changes. – the8472 Jun 10 '15 at 10:57
3

There is no real FP solution, simply because your entire accumulator isn’t FP. We can’t help you in this regard as we don’t know what it is actually doing. All we see is that it relies on two mutable collections and hence, can’t be a part of a pure FP solution.

If you accept the limitations and that there is no clean way using the Stream API you might strive for the simple way. The simple way incorporates a stateful Predicate which is not the best thing around but sometimes unavoidable:

public Result get() {
    int limit = 1;
    Set<A> setA=new HashSet<>();
    Set<B> setB=new HashSet<>();
    return ids.stream().anyMatch(i -> {
        // perform expensive ops and accumulate results
        return setA.size() > limit;
    })? Result.invalid(): Result.valid(setB);
}

But I want to note that given your specific logic, i.e. that your result is considered invalid when the set grows too large, your attempt of processing not too much elements is an optimization of the erroneous case. You shouldn’t waste effort on optimizing that. If a valid result is the result of processing all elements, then process all elements…

Holger
  • 285,553
  • 42
  • 434
  • 765
1

I think, it's possible to throw a RuntimeException of special type from your custom collector (or reduce operation) which incorporates the result inside the exception object and catch it outside of collect operation unwrapping the result. I know that using the exception for non-exceptional control flow is not idiomatic, but it should work in your case even for parallel streams.

Actually there are many cases when short-circuit reduction could be useful. For example, collect the enum values to EnumSet (you can stop as soon as you discover that all the possible enum values are already collected). Or intersect all the elements of Stream<Set> (you can stop if your resulting set becomes empty after some step: continuing the reduction is useless). Internally there's a SHORT_CIRCUIT flag used in stream operations like findFirst, but it's not exposed to the public API.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • 1
    @Holger: it will be rethrown by `ForkJoinTask.join` and other tasks will be cancelled as soon as possible. See this proof-of-concept [gist](https://gist.github.com/amaembo/c0e24c7dbc9f1728e638). It solves another popular task: check if sum of non-negative numbers is greater than the limit (you can stop summing as soon as limit is reached). – Tagir Valeev Jun 10 '15 at 08:41
  • 3
    “as soon as possible” does not imply “immediately”, thus, currently executing accumulators might still complete. That’s ok, if you just want to check for a condition but doesn’t work if you want to *enforce a limit*. In other words, it can’t solve the OP’s problem. But it even doesn’t work with your example: all threads might sum up to a local sum below the limit and when the overflow is detected in the combiner, the work of all threads has been completed already so the throwing doesn’t really save anything. – Holger Jun 10 '15 at 08:49
  • 1
    @Holger: sure. You will have the same problems with standard `findFirst` and `findAny` methods: you may process much more elements in parallel execution than in sequential. The same with `anyMatch` in your solution (suppose that you are using thread-safe sets and switch to parallel): even if the limit is hit, other threads may continue doing expensive ops for a while. That's the common limitation of all the short-circuit operations for parallel streams. My code is not worse than standard methods. – Tagir Valeev Jun 10 '15 at 08:57
  • 1
    The difference is that I never claimed that my solution works with `parallel` streams. – Holger Jun 10 '15 at 09:01
1

I agree with all previous answers. You're doing it wrong by forcing a reduction on a mutable accumulator. Also, the process that you're describing can't be expressed as a pipeline of transformations and reductions.

If you really, really need to do it FP style, I'd do as @the8472 points out.

Anyway, I give you a new more compact alternative, similar to @lukas-eder's solution, using an Iterator:

Function<Integer, Integer> costlyComputation = Function.identity();

Accumulator acc = new Accumulator();

Iterator<Integer> ids = Arrays.asList(1, 2, 3).iterator();

while (!acc.hasEnough() && ids.hasNext())
  costlyComputation.andThen(acc::add).apply(ids.next());

You have two different concerns regarding FP here:

How to stop iterating

As you're depending on mutable state, FP is only going to make your life harder. You can iterate externally the collection or use an Iterator as I propose.

Then, use an if() to stop iteration.

You can have think of different strategies, but at the end of the day, this is what you're using.

I prefer the iterator because is more idiomatic (expresses better your intention in this case).

How to design the Accumulator and the costly operation

This is the most interesting for me.

A pure function can't have state, must receive something and must return something, and always the same something for the same input (like a mathematical function). Can you express your costly operation like this?

Does it need some shared state with the Accumulator? Maybe that shared doesn't belong to neither of them.

Will you transform your input and then append it in the Accumulator or is that the Accumulator's responsibility? Does it make sense to inject the function into the Accumulator?

ggalmazor
  • 710
  • 6
  • 18