1

1) How can I use a Supplier (supplier) to create a sized stream of N values in parallel, while ensuring that no more than N calls are made to the supplier? I need this because I have a supplier with a costly supplier.get() operation.

2) The 'obvious' answer to my question, Streams.generate(supplier).limit(N), does not work and often results in more than N calls being made to the supplier. Why is this?

As 'proof' of the fact that Streams.generate(supplier).limit(N) results in more than N calls to supplier.get(), consider the following code:

public class MWE {
    static final int N_ELEMENTS=100000;
    static Supplier<IntSupplier> mySupplier = () -> new IntSupplier() {
        AtomicInteger ai = new AtomicInteger(-1);
        @Override
        public int getAsInt() {
            return ai.incrementAndGet();
        }
    };
    public static void main(String[] args) {
        int[] a = IntStream.generate(mySupplier.get()).limit(N_ELEMENTS).toArray();
        int[] b = IntStream.generate(mySupplier.get()).parallel().limit(N_ELEMENTS).toArray();
    }
}

a is equal to [0, 1, ..., N_ELEMENTS-1] as expected, but contrary to what you might expect b does not contain the same elements as a. Instead, b often contains elements that are greater than or equal to N_ELEMENTS, which indicates more than N_ELEMENTS number of calls to the supplier.

Another illustration would be that Streams.generate(new Random(0)::nextDouble()).limit(5) does not always generate the same set of numbers.

Semafoor
  • 1,942
  • 1
  • 15
  • 13
  • How many elements does b contain that are larger than or equal to N_ELEMENTS? How many cores do you have? – Thorbjørn Ravn Andersen May 26 '16 at 20:37
  • 2
    Have you tried putting the `.parallel()` call after the `limit` instead of before? – Louis Wasserman May 26 '16 at 21:08
  • 1
    I don't think it should matter @LouisWasserman (in theory). – Tunaki May 26 '16 at 21:14
  • 1
    @Tunaki I wouldn't be sure. So I asked if they'd tried. (If that doesn't work I wouldn't expect anything to work.) – Louis Wasserman May 26 '16 at 21:16
  • 1
    I think this is because `IntStream.generate` returns an unordered Stream, like here http://stackoverflow.com/questions/30843279/stream-skip-behavior-with-unordered-terminal-operation With `IntStream.iterate(0, i -> i + 1)`, which returns an ordered Stream, the arrays are equal. – Tunaki May 26 '16 at 21:20
  • b does not contain many elements larger than `N_ELEMENTS`; using the value for `N_ELEMENTS` that is in the code I don't seem to regularly get values greater than 10006. I have 4 cores / 8 threads. Moving the `.parallel()` around does not seem to matter. – Semafoor May 26 '16 at 21:25
  • @Louis The ordered/unordered part of the documentation is indeed what should lead me to expect this behavior and should be part of an answer. Thanks for the useful link. The question, however, still stands: what is the best way to generate the b that I want in parallel? – Semafoor May 26 '16 at 22:16
  • somewhat related: https://stackoverflow.com/questions/30825708 – tkruse Jul 03 '19 at 14:19

2 Answers2

4

The stream API does not guarantee that IntStream.generate() will call the generator specified number of times. Also this call does not respect ordering.

If you actually need a parallel stream of increasing numbers, it's much better to use IntStream.range(0, N_ELEMENTS).parallel(). This not only ensures that you will actually have all the numbers from 0 to N_ELEMENTS-1, but greatly reduces the contention and guarantees order. If you need to generate something more complex, consider using custom source defining your own Spliterator class.

Note that the proposed IntStream.iterate solution may not parallelize greatly as it's sequential-by-nature source.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • 1
    “*Also this call does not respect ordering*” is a misleading sentence. It sounds like there was something that is disrespected, but actually, `Stream.generate` produces an *unordered* stream by definition. Since the stream has no order, there is nothing to disrespect. The problem here is the stateful `Supplier`. – Holger May 27 '16 at 08:52
  • I realise now that generate() generates an unordered stream. However, I do not actually need a stream of increasing numbers - the stream of increasing numbers was just to illustrate the problem. I actually need a stream of objects that take a long time to construct. I will look into the Spliterator class. – Semafoor May 27 '16 at 13:48
  • @Semafoor, probably you should instead ask about your actual problem, not trying to invent new problem which solution is not suitable for you. – Tagir Valeev May 29 '16 at 15:24
  • @Tagir I updated my question and implemented my own Spliterator like you suggested. Thanks! – Semafoor May 29 '16 at 22:57
1

Calling .limit() is not guaranteed to result in a stream of the first N elements generated by the supplier because Stream.generate() creates an unordered stream, which leaves limit() free to decide on what 'part' of the stream to keep. Actually, it is not even semantically sound to refer to "the first N elements" or "(the first) part of the stream", because the stream is unordered. This behavior is clearly laid out in the API documentation; many thanks to everyone who pointed this out to me!

Since asking this question, I have come up with two solutions to my own question. My thanks go to Tagir who set me off in the right direction.

Solution 1: Misusing IntStream.range()

A simple and fairly efficient way of creating an unordered, sized, parallel stream backed by a supplier that makes no more calls to the supplier than is absolutely necessary is to (mis)use IntStream.range() like this:

IntStream.range(0,N_ELEMENTS).parallel().mapToObj($ -> generator.get())

Basically, we are using IntStream.range() only to create a sized stream that can be processed in parallel.

Solution 2: Custom spliterator

Because we never actually use the integers inside of the stream created by IntStream.range(), it seems like we can do slightly better by creating a custom Spliterator:

final class SizedSuppliedSpliterator<T> implements Spliterator<T> {
    private int remaining;

    private final Supplier<T> supplier;

    private SizedSuppliedSpliterator(Supplier<T> supplier, int remaining) {
        this.remaining = remaining;
        this.supplier = supplier;
    }

    static <T> SizedSuppliedSpliterator of(Supplier<T> supplier, int limit) {
        return new SizedSuppliedSpliterator(supplier, limit);
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> consumer) {
        Objects.requireNonNull(consumer);
        if (remaining > 0) {
            remaining--;
            final T supplied = supplier.get();
            consumer.accept(supplied);
            return true;
        }
        return false;
    }

    @Override
    public void forEachRemaining(final Consumer<? super T> consumer) {
        while (remaining > 0) {
            consumer.accept(supplier.get());
            remaining--;
        }
    }

    @Override
    public SizedSuppliedSpliterator<T> trySplit() {
        int split = (int)remaining/2;
        remaining -= split;
        return new SizedSuppliedSpliterator<>(supplier, split);
    }

    @Override
    public long estimateSize() {
        return remaining;
    }

    @Override
    public int characteristics() {
        return SIZED | SUBSIZED | IMMUTABLE;
    }
}

We can use this spliterator to create the stream as follows:

StreamSupport.stream(SizedSuppliedSpliterator.of(supplier, N_ELEMENTS), true)

Of course, computing a couple of integers is hardly expensive, and I have not been able to notice or even measure any improvement in performance over solution 1.

Community
  • 1
  • 1
Semafoor
  • 1,942
  • 1
  • 15
  • 13