5

I've seen some takeWhile implementations for the Java 8 stream API but they all seem to turn the stream into a non-parallel stream. For example this one:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}

Here StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false); turns the stream passed to takeWhile into a sequential stream. Is anyone aware of an implementation that supports parallel streams or how can I modify this code to make it maintain/support parallel streams?

Community
  • 1
  • 1
Johan
  • 37,479
  • 32
  • 149
  • 237
  • 3
    You can't, really. Sorry, but you'll have to deal with that; it's really an inherently sequential operation. You can use the default very limited parallelism that works with everything, which is what you'd get as a result of using `.parallel` on that stream, but that's as good as you can get. – Louis Wasserman Jan 14 '17 at 19:18
  • In order to extract any real parallelism here, the predicate would have to be insanely expensive (e.g., trying to factor very large numbers). It's not impossible, but it's pretty unlikely. – Brian Goetz Jan 16 '17 at 01:43

1 Answers1

2

If your source is known to be unordered, then the following implementation should work:

static final class UnorderedTakeWhileSpliterator<T> implements Spliterator<T>, Consumer<T>, Cloneable {
    private final Predicate<? super T> predicate;
    private final AtomicBoolean checked = new AtomicBoolean();
    private Spliterator<T> source;
    private T cur;

    UnorderedTakeWhileSpliterator(Spliterator<T> source, Predicate<? super T> predicate) {
        this.predicate = predicate;
        this.source = source;
    }

    @Override
    public void accept(T t) {
        this.cur = t;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if (!checked.get() && source.tryAdvance(this)) {
            if (predicate.test(cur)) {
                action.accept(cur);
                return true;
            } else {
                checked.set(true);
            }
        }
        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        Spliterator<T> prefix = source.trySplit();
        if(prefix == null) {
            return null;
        }
        if(checked.get()) {
            return Spliterators.emptySpliterator();
        }
        UnorderedTakeWhileSpliterator<T> clone;
        try {
            clone = (UnorderedTakeWhileSpliterator<T>) clone();
        } catch (CloneNotSupportedException e) {
            throw new InternalError(e);
        }
        clone.source = prefix;
        return clone;
    }

    @Override
    public long estimateSize() {
        return source.estimateSize();
    }

    @Override
    public int characteristics() {
        return source.characteristics() & (DISTINCT | SORTED | NONNULL);
    }

    @Override
    public Comparator<? super T> getComparator() {
        return source.getComparator();
    }
}

Create the stream with the following method:

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(UnorderedTakeWhileSpliterator<>(stream.spliterator(), predicate), stream.isParallel());
}

Ordered implementation would be much more tricky as it should buffer non-prefixed items and propagate the cancelling to the suffixes. Something like this is implemented in JDK-9 (not as spliterator, but as normal stream operation), but I doubt that even this tricky implementation wins in many cases over sequential stream.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • Thanks but I don't seem to have a class named UnorderedTDOfRef? Where can I find this? – Johan Jan 15 '17 at 06:13
  • 1
    @Johan, fixed, sorry. – Tagir Valeev Jan 15 '17 at 06:29
  • It works :) Off topic, but is this what you will include in the StreamEx library? That would be awesome! – Johan Jan 15 '17 at 06:49
  • 2
    @Johan, basically yes, I just need to extend it to `dropWhile` and `takeWhileInclusive` and write more tests. – Tagir Valeev Jan 15 '17 at 08:38
  • 1
    This doesn’t look right to me. You are treating hitting the end the same way as a negative result from the predicate, in other words, once any of the spliterators hits its end, all others stop traversal. – Holger Jan 16 '17 at 10:50
  • @Holger, should be better now – Tagir Valeev Jan 22 '17 at 08:36
  • I don’t think that it is a good idea to return an empty spliterator in `trySplit`. That may cause the caller to waste more resources in trying to actually process it. Denying the `split` in the `checked.get()` case is straight-forward (afaik, all other spliterators will also return `null` from `trySplit` when there are no more elements left). And you may do that check first, before attempting to split the source. Btw, I don’t think that using `clone` here has any benefit. Having a dedicate `private` constructor is much simpler… – Holger Jan 23 '17 at 10:22