21

I am trying to switch my mind to think the functional way and recently faced a situation in which I needed to pick up elements from a list until a condition is met and I could not find an easy natural way of achieving this. Obviously I am still learning.

Say I have this list:

List<String> tokens = Arrays.asList("pick me", "Pick me", "pick Me",
    "PICK ME", "pick me and STOP", "pick me", "pick me and Stop", "pick me");

// In a non lambdas was you would do it like below
List<String> myTokens = new ArrayList<>();
for (String token : tokens) {
    myTokens.add(token);
    if (token.toUpperCase().endsWith("STOP")) {
        break;
    }
}

Thank you in advance for your inputs

NOTE: Before publishing this I read Limit a stream by a predicate but I could not see how I can adapt that answer to my problem. Any help would be appreciated thanks.

Community
  • 1
  • 1
Julian
  • 3,678
  • 7
  • 40
  • 72
  • I have read that question before I published mine but I thought and still think that java would have some out of the box of achieving this. Say something like `myTokens = tokens.stream().collect(toListWhile(...))` Or a nice and easy way for implementing `toListWhile()` custom collector – Julian Aug 29 '15 at 20:30
  • 1
    I don't think there's a single operation in the Streams API which would assume an ordered stream, and fail for an unordered one. Such is your imagined operation. – Marko Topolnik Aug 29 '15 at 20:56
  • possible duplicate of [Limit a stream by a predicate](http://stackoverflow.com/questions/20746429/limit-a-stream-by-a-predicate) – Anid Monsur Aug 29 '15 at 21:14
  • 1
    @AnidMonsur, This is not an exact duplicate as here OP wants to take the STOP-element as well. It's a little bit different problem. – Tagir Valeev Aug 30 '15 at 02:58
  • 1
    @MarkoTopolnik, `takeWhile` is actually something like this. The [JavaDoc statement](http://hg.openjdk.java.net/jdk9/dev/jdk/file/cb39fd2ecf8a/src/java.base/share/classes/java/util/stream/Stream.java#l494) for unordered streams is especially wonderful! Any subset, sic! To my opinion it would actually be better if it throws an exception... – Tagir Valeev Aug 30 '15 at 03:59
  • 2
    @tagir Ah, so they're adding it back---many if these were planned before release of Java 8, but then dropped. Java usually has fail-fast semantics as opposed to GIGO. This is sure to generate a lot of dumbfounded SO questions. – Marko Topolnik Aug 30 '15 at 07:18
  • It's a good and interesting question, but I disagree with its premise. I don't think streams are inteded to be used for what you are trying to use them. Especially because your operation depends on the stream being ordered and, most likely, not parallelized. What I think you should do is manually (stream-less) build a valid collection and then stream it to process it. – Dariusz Oct 06 '15 at 08:22

5 Answers5

18

If you really must use Streams API, keep it simple and use a stream of indexes:

int lastIdx = IntStream.range(0, tokens.size())
        .filter(i -> tokens.get(i).toUpperCase().endsWith("STOP"))
        .findFirst()
        .orElse(-1);

List<String> myTokens = tokens.subList(0, lastIdx + 1);

Or make a new List out of the sublist if you want an independent copy that's not backed by the original list.

Misha
  • 27,433
  • 6
  • 62
  • 78
  • A very clean and nice solution instead – Julian Aug 30 '15 at 23:28
  • 1
    Won't this result in iterating through the entire list, the benefit of the simple for loop approach that OP wrote was that it breaks the loop when the condition is met, or do I have this wrong? – Ghos3t Nov 16 '22 at 23:02
  • The specification doesn't require it, but any reasonable implementation of `findFirst` would short-circuit and terminate once the condition is met. – Misha Nov 17 '22 at 02:41
17

In JDK9 there will be a new Stream operation called takeWhile which does the thing similar to what you need. I backported this operation to my StreamEx library, so you can use it even in Java-8:

List<String> list = StreamEx.of(tokens)
                            .takeWhile(t -> !t.toUpperCase().endsWith("STOP"))
                            .toList();

Unfortunately it does not take the "STOP" element itself, so the second pass is necessary to add it manually:

list.add(StreamEx.of(tokens).findFirst(t -> t.toUpperCase().endsWith("STOP")).get());

Note that both takeWhile and findFirst are short-circuit operations (they will not process the whole input stream if unnecessary), so you can use them with very long or even infinite streams.

However using StreamEx you can solve it in single pass using the trick with groupRuns. The groupRuns method groups adjacent Stream elements to the List based on the supplied predicate which tells whether two given adjacent elements should be grouped or not. We may consider that the group ends with the element containing "STOP". Then we just need to take the first group:

List<String> list = StreamEx.of(tokens)
                            .groupRuns((a, b) -> !a.toUpperCase().endsWith("STOP"))
                            .findFirst().get();

This solution also will not do extra work when the first group is finished.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • Thanks Tagir. Your latest solution using groupRuns would be the nicest way of solving this. A pity they did not include takeWhile in the stream API in the first place. It is a basic feature of functional programming. – Julian Aug 30 '15 at 23:39
  • 1
    @Julian, Stream API was developed to support both sequential and parallel processing, but `takeWhile` operation is sequential by nature. It's not so easy to parallelize it properly. Probably that was the reason why it was not included in JDK 8. – Tagir Valeev Aug 31 '15 at 01:44
  • 1
    @TagirValeev It's possible to parallelize but it potentially involves a lot of buffering. We mainly ran out of time to do it in Java 8. – Stuart Marks Aug 31 '15 at 03:15
  • 1
    @StuartMarks, yes, I studied the implementation. Nevertheless for ordered stream parallelization would rarely improve the performance (except the cases when condition is failed close to the stream end or is always true). For unordered parallel stream this operation is unlikely to be useful. I cannot imagine any scenario where "any subset" would be satisfactory result. After all you can use `filter` which also would return some subset. My backport implementation is quite lame `AbstractSpliterator` subclass with default `trySplit`: I decided that special parallel handling is waste of time. – Tagir Valeev Aug 31 '15 at 03:48
  • 2
    @TagirValeev Unordered `takeWhile` is indeed mostly useless for the conventional collection-based problems that we see a lot. It turns out to be useful if the predicate is based on external state, e.g. processing a stream of events for a fixed amount of time. – Stuart Marks Aug 31 '15 at 15:09
  • 1
    @StuartMarks, good point! I never thought about using the Stream API this way. Probably it would be good to add such example to `takeWhile` JavaDoc... – Tagir Valeev Aug 31 '15 at 16:26
6

Using strictly just Java 8 API:

public static <R> Stream<? extends R> takeUntil(Iterator<R> iterator, Predicate<? super R> stopFilter) {
    final boolean isParallelStream = false;
    
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<R>() {
        private R next = null;
        private boolean conditionSatisfied = false;
        private boolean hasTaken = true;
        
        @Override
        public boolean hasNext() {
            if (conditionSatisfied || !iterator.hasNext()) {
                return false;
            }

            if (hasTaken) {
                next = iterator.next();
                conditionSatisfied = stopFilter.test(next);
                hasTaken = false;
            }
            return !conditionSatisfied;
        }

        @Override
        public R next() {
            if (!hasNext()) {
                throw new NoSuchElementException("There are no more items to consume");
            }
            hasTaken = true;
            return next;
        }
    }, 0), isParallelStream);
}

You can then specialize it in the following ways:

For streams

public static <R> Stream<? extends R> takeUntil(Stream<R> stream, Predicate<? super R> stopFilter) {
    return takeUntil(stream.iterator(), stopFilter);
}

For collections

public static <R> Stream<? extends R> takeUntil(Collection<R> col, Predicate<? super R> stopFilter) {
    return takeUntil(col.iterator(), stopFilter);
}
smac89
  • 39,374
  • 15
  • 132
  • 179
2

One option uses a collector requiring two functions one that adds strings to lists and another which combines lists previously potentially created in parallel. For each it adds the string or the whole list only if the previous partial output doesn't end with an element that that ends with STOP:

tokens.stream().collect(() -> new ArrayList<String>(), (l, e) -> {
    if(l.isEmpty() || !l.get(l.size()-1).toUpperCase().endsWith("STOP"))
        l.add(e);
}, (l1, l2) -> {
    if(l1.isEmpty() || !l1.get(l1.size()-1).toUpperCase().endsWith("STOP"))
        l1.addAll(l2);
});
WillShackleford
  • 6,918
  • 2
  • 17
  • 33
  • So using reduce was the answer. Thank you very much. – Julian Aug 29 '15 at 22:37
  • 5
    Note that this solution is not short-circuit: you will still need to iterate over the whole input stream, ignoring everything after the `"STOP"`. This might be problematic if you have very long (or unlimited) stream. – Tagir Valeev Aug 30 '15 at 02:23
  • 1
    Just realized that this solution would actually be very slow even for not very long inputs as it copies the whole list when every new element is processed. If you like non-short-circuiting solution, it's better to implement a `Collector` instead. – Tagir Valeev Aug 30 '15 at 03:45
  • 2
    I would definitely not settle down for an O(n2) solution here. – Marko Topolnik Aug 30 '15 at 07:20
  • @TagirValeev I don't see how using a Collector would make any difference you still have to implement an accumulator and combiner which are essentially the same two lambdas passed to reduce. – WillShackleford Aug 30 '15 at 10:42
  • 1
    @WillShackleford, a collector performs a *mutable* reduction, thus it can modify the existing accumulator. You don't have to copy anything, just check the current accumulator and add new element if necessary. The linked-list-like solution in update is surely overengineered. – Tagir Valeev Aug 30 '15 at 10:46
  • 1
    @WillShackleford, [here's](https://gist.github.com/amaembo/0bc8e982d179f2e45d7a) collect-based solution. I don't like it either, as it's non short-circuit, but it's at least shorter than yours and O(n). Use it for your answer if you like :-) – Tagir Valeev Aug 30 '15 at 10:53
  • @TagirValeev tried to add some explanation and yet another option. – WillShackleford Aug 30 '15 at 11:22
  • 1
    Hah, here comes the `peek`. Actually it's never guaranteed that `findFirst` will stop the underlying stream processing *exactly* when the solution is found (even for sequential streams). Try replace `tokens.stream()` with `Stream.of(tokens).flatMap(List::stream)`. It's also perfectly legal stream which produces the same elements, but your solution will not work with it. Stream API is harsh. – Tagir Valeev Aug 30 '15 at 11:23
1

Althought the above answers are perfectly valid, they require to collect and/or pre-fetch the elements before processing them (both can be an issue if the Stream is very long).

For my needs, I therefore adapted Louis's answer to the question pointed out by Julian and adapted it to keep the stop/break item. See the keepBreak parameter ::

public static <T> Spliterator<T> takeWhile(final Spliterator<T> splitr, final Predicate<? super T> predicate, final boolean keepBreak) {
    return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
        boolean stillGoing = true;

        @Override
        public boolean tryAdvance(final Consumer<? super T> consumer) {
            if (stillGoing) {
                final boolean hadNext = splitr.tryAdvance(elem -> {
                    if (predicate.test(elem)) {
                        consumer.accept(elem);
                    } else {
                        if (keepBreak) {
                            consumer.accept(elem);
                        }
                        stillGoing = false;
                    }
                });
                return hadNext && (stillGoing || keepBreak);
            }
            return false;
        }
    };
}

public static <T> Stream<T> takeWhile(final Stream<T> stream, final Predicate<? super T> predicate, final boolean keepBreak) {
    return StreamSupport.stream(takeWhile(stream.spliterator(), predicate, keepBreak), false);
}

Usage:

public List<String> values = Arrays.asList("some", "words", "before", "BREAK", "AFTER");

    @Test
    public void testStopAfter() {
        Stream<String> stream = values.stream();
        //how to filter stream to stop at the first BREAK
        stream = stream.filter(makeUntil(s -> "BREAK".equals(s)));
        final List<String> actual = stream.collect(Collectors.toList());

        final List<String> expected = Arrays.asList("some", "words", "before", "BREAK");
        assertEquals(expected, actual);
    }

Disclaimer: I am not 100% sure this will work on parallel (the new Stream is certainly not parallel) or non-sequential streams. Please comment/edit if you have some hints on this.

Community
  • 1
  • 1
Benoît
  • 1,080
  • 11
  • 28