9

I'm trying to work out how to implement a custom intermediate operation on a Java 8 Stream. And it seems that I'm locked out :(

Specifically I want to take a stream and return every entry up to and including the first one that has a particular value. And I want to stop generating any after that - making it short-circuiting.

It's running a series of validation checks on input data. I want to stop on the first Error, if there is one, but I want to collate Warnings on the way. And because these validation checks might be expensive - involving database lookups, for example - I want to only run the minimum set needed.

So the code would be something like:

Optional<ValidationResult> result = validators.stream()
    .map(validator -> validator.validate(data))
    .takeUntil(result -> result.isError()) // This is the bit I can't do
    .reduce(new ValidationResult(), ::mergeResults);

It seems that I should be able to do something with ReferencePipeline.StatefulOp, except that it's all package scope and so I can't extend it. And so I'm wondering what the correct way to achieve this is? Or if it's even possible?

Note as well - this needs to be in Java 8, and not 9+ since we're not there yet for various unrelated reasons.

Cheers

Graham
  • 4,095
  • 4
  • 29
  • 37
  • 1
    Looking for `takeWhile` in Java-9? – Naman Jun 14 '19 at 06:01
  • Maybe [this](https://stackoverflow.com/a/20765715/9662601) answer will help you creating your own `takeWhile()` in Java 8. – Samuel Philipp Jun 14 '19 at 06:02
  • What values do you have in `ValidationResult` ? Is it possible to ignore its fields & only care about `isError` and which validators are left? If so check my answer... – buræquete Jun 14 '19 at 06:49
  • https://stackoverflow.com/questions/32290278/picking-elements-of-a-list-until-condition-is-met-with-java-8-lambdas/32291089#32291089 this seems to be what you are trying to achieve. – Angel Koh Jun 14 '19 at 07:26
  • 1
    @Naman `takeWhile` would not work because of the *and including* part of the question – Eugene Jun 14 '19 at 08:40
  • Did you check any of the answers? I had an answer with a different approach, I'd love to hear your feedback on that! – buræquete Aug 02 '19 at 09:54

4 Answers4

3

Generally, custom operations will need to deal with the Spliterator interface. It extends the concept of the Iterator by adding characteristics and size information and the ability to split off a part of the elements as another spliterator (hence its name). It also simplifies the iteration logic by only needing one method.

public static <T> Stream<T> takeWhile(Stream<T> s, Predicate<? super T> condition) {
    boolean parallel = s.isParallel();
    Spliterator<T> spliterator = s.spliterator();
    return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(
        spliterator.estimateSize(),
        spliterator.characteristics()&~(Spliterator.SIZED|Spliterator.SUBSIZED)) {
            boolean active = true;
            Consumer<? super T> current;
            Consumer<T> adapter = t -> {
                if((active = condition.test(t))) current.accept(t);
            };

            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                if(!active) return false;
                current = action;
                try {
                    return spliterator.tryAdvance(adapter) && active;
                }
                finally {
                    current = null;
                }
            }
        }, parallel).onClose(s::close);
}

To keep the stream’s properties, we query the parallel status first, to reestablish it for the new stream. Also, we register a close action that will close the original stream.

The main work is to implement a Spliterator decorating the previous stream state’s spliterator.

The characteristics are kept, except for the SIZED and SUBSIZED, as our operation results in an unpredictable size. The original size is still passed through, it will now be used as an estimate.

This solution stores the Consumer passed to tryAdvance for the duration of the operation, to be able to use the same adapter consumer, avoiding to create a new one for each iteration. This works, as it is guaranteed that tryAdvance is never invoked concurrently.

Parallelism is done via splitting, which is inherited from AbstractSpliterator. This inherited implementation will buffer some elements, which is reasonable, as implementing a better strategy for an operation like takeWhile is really complicated.

So you can use it like

    takeWhile(Stream.of("foo", "bar", "baz", "hello", "world"), s -> s.length() == 3)
        .forEach(System.out::println);

which will print

foo
bar
baz

or

takeWhile(Stream.of("foo", "bar", "baz", "hello", "world")
    .peek(s -> System.out.println("before takeWhile: "+s)), s -> s.length() == 3)
    .peek(s -> System.out.println("after takeWhile: "+s))
    .forEach(System.out::println);

which will print

before takeWhile: foo
after takeWhile: foo
foo
before takeWhile: bar
after takeWhile: bar
bar
before takeWhile: baz
after takeWhile: baz
baz
before takeWhile: hello

which shows that it does not process more than necessary. Before the takeWhile stage, we have to encounter the first non-matching element, after that, we only encounter the elements up to that.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • `Spliterator` is a perfect example of how badly broken the Java streams design is. You want a simple streams "intermediate operation" like, say, filtering away null elements of a stream? You get your choice of using `mapMulti` - and good luck reading that inline in your stream pipeline! - _or_ you can implement it reusably with _this_ monstrosity! _Everyone_ has to deal with the super-duper ability to _split_ streams for parallelism even though that's a feature not needed - or _wanted_ - in 98.5% of use cases. Compare to implementing same thing in C# LINQ w/ extension methods. Good job, Java! – davidbak Nov 20 '22 at 23:01
  • 1
    @davidbak you “filter away null elements of a stream” using `.filter(x -> x != null)` or `.filter(Objects::nonNull)` if you prefer. There is no need to implement custom processing for such a trivial task. – Holger Nov 21 '22 at 09:07
  • 1
    @davidbak `mapMulti` is just an optimization for `flatMap` with a low number of elements. Before JDK 16 introduced it, you didn’t even know that you “need `mapMulti`”. That’s just another discussion around contrived scenarios. If you think, C# is great, fine, use C#, be happy, and stop bothering me with your rants. – Holger Nov 21 '22 at 14:51
1

I admit that code wise, Holger's answer is a lot more sexy, but may be this is somehow easier to read:

public static <T> Stream<T> takeUntilIncluding(Stream<T> s, Predicate<? super T> condition) {

    class Box implements Consumer<T> {

        boolean stop = false;

        T t;

        @Override
        public void accept(T t) {
            this.t = t;
        }
    }

    Box box = new Box();

    Spliterator<T> original = s.spliterator();

    return StreamSupport.stream(new AbstractSpliterator<>(
        original.estimateSize(),
        original.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED)) {

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {

            if (!box.stop && original.tryAdvance(box) && condition.test(box.t)) {
                action.accept(box.t);
                return true;
            }

            box.stop = true;

            return false;
        }
    }, s.isParallel());

}
Eugene
  • 117,005
  • 15
  • 201
  • 306
0

You can do it with a trick:

List<ValidationResult> res = new ArrayList<>(); // Can modify it with your `mergeResults` instead of list

Optional<ValidationResult> result = validators.stream()
    .map(validator -> validator.validate(data))
    .map(v -> {
       res.add(v);
       return v;
    })
    .filter(result -> result.isError())
    .findFirst();

The List<ValidationResult> res will contains your interested values.

Mạnh Quyết Nguyễn
  • 17,677
  • 1
  • 23
  • 51
0

You can use the following structure;

AtomicBoolean gateKeeper = new AtomicBoolean(true);    
Optional<Foo> result = validators.stream()
    .filter(validator -> gateKeeper.get() 
                && gateKeeper.compareAndSet(true, !validator.validate(data).isError()) 
                && gateKeeper.get())
    .reduce(...) //have the first n non-error validators here

The filter with gateKeeper acts as an short circuiting logic and keeps going until it encounters the first isError() == true case, rejects it, & then shuts the doors for other validate() calls from then on. It looks a bit crazy, but it is much simpler than other custom implementations & might work perfectly if it suits your requirement.

Not 100% sure if this is helpful since I ignore the result of validator.validate(data) apart from isError() result, and the fact that it belongs to whichever validator in the list.

buræquete
  • 14,226
  • 4
  • 44
  • 89