1

It seems that when using ordered Streams to process a short-circuiting operation on a difficult to bound numeric range, parallel() cannot be used. E.g.:

public class InfiniteTest {

    private static boolean isPrime(int x) {
        if (x < 2) {
            return false;
        }
        if (x % 2 == 0 && x > 2) {
            return false;
        }
        // loop while i <= sqrt(x), using multiply for speedup
        for (int i = 3; i * i <= x; i += 2) {
            if (x % i == 0) {
                return false;
            }
        }
        return true;
    }

    private static int findNthPrime(final int n) {
        // must not use infinite stream, causes OOME
        // but even big size causes huge slowdown
        IntStream.range(1, 1000_000_000)            
                // .parallel()
                .filter(InfiniteTest::isPrime)
                .skip(n - 1)
                .findFirst()
                .getAsInt();
    }

    public static void main(String[] args) {
        int n = 1000; // find the nth prime number
        System.out.println(findNthPrime(n));
    }
}

This sequential stream works fine. But when I add parallel(), it seems to run forever (or very long at last). I assume it's because the stream threads work on arbitrary numbers instead of starting with the first numbers in the stream. I cannot usefully bound the range of integers to scan for prime numbers.

So is there any simple trick to run this problem in parallel with streams without that trap, such as forcing the splititerator to serve chunks of work from the beginning of the stream? Or building the stream from substreams that cover increasing number ranges? Or somehow setting up the multithreading as producer/consumer pattern but with streams?

Similar questions all just seem to try to discourage use of parallel:

tkruse
  • 10,222
  • 7
  • 53
  • 80
  • 6
    A parallel stream does not run in any particular order, so how are you going to find the nth prime using that? – RobOhRob Jul 03 '19 at 15:43
  • The stream is ordered, the specification says `skip(n-1).findFirst()` returns the nth element even when processing in parallel. parallel does not change ordered characteristics of stream. See https://stackoverflow.com/questions/29709140/why-parallel-stream-get-collected-sequentially-in-java-8 – tkruse Jul 03 '19 at 16:08
  • I think you are asking for more expressiveness than the stream API can reasonably provide. You can do better with a little math by first computing a close overestimate of the n-th prime and then go `.parallel()` over the now reduced range. – President James K. Polk Jul 03 '19 at 21:18
  • 2
    It turns out that the n-th prime is guaranteed to be less than n*(log(n)+log(log(n))) for n >= 6. For n=1000 this gives an upper bound of 8_841, much much less than 1_000_000_000. By the way, your `isPrime()` method incorrectly reports that 2 is not a prime. – President James K. Polk Jul 03 '19 at 21:57
  • Fixed bug for small numbers. I can try using the bounds, though this won't help for similar problems like find the nth prime with an odd number of digits. – tkruse Jul 03 '19 at 23:07
  • Well, it definitely doesn't answer the question in general, but I bet I (or someone smarter than me) can find a decent upper bound for the n-th prime with an odd number of digits. It might be worth looking at the source code for `.parallel()`. – President James K. Polk Jul 04 '19 at 00:12
  • 1
    Well the interesting part of this question is how to achieve this without an upper border. But if you'd provided an answer with borders, I would probably have to accept it and ask a similar question with a modification to make an upper border unsuitable – tkruse Jul 04 '19 at 00:24

2 Answers2

2

Apart from 2 and 3, all prime numbers are of the form 6n-1 or 6n+1. You already treat 2 as a special case in your code. You might want to try also treating 3 as special:

if (x % 3 == 0) {
    return x == 3;
}

And then run two parallel streams, one testing numbers of the form 6n-1, starting at 5, and the other testing numbers of the form 6n+1, starting at 7. Each stream can skip six numbers at a time.

You can use the Prime Number theorem to estimate the value of the nth prime and set the limit of your search slightly above that estimate for safety.

rossum
  • 15,344
  • 1
  • 24
  • 38
  • It's not a bad answer, but I think the code for the parallel bit should be included to be the accepted answer, because the devil is in the details. – tkruse Jul 05 '19 at 13:02
  • Unfortunately, my skill in parallel programming is minimal -- Algol 60 didn't do parallel when I first learned to program. Anything I posted would probably be incorrect. You are right about the devil being in the details. – rossum Jul 05 '19 at 13:16
0

TL/DR: It is not possible.

It seems processing unbounded streams in parallel with a short-circuit method to find the earliest occurrences(in stream order) of anything is not possible in a useful way ("useful" meaning better than sequential in terms of time to find the result).

Explanation I tried a custom implementation of AbstractIntSpliterator that splits the stream not in partitions (1-100, 101-200, ...) but instead splits them interleavingly ([0, 2, 4, 6, 8, ...], [1, 3, 5, 6 ...]). This works correctly in the sequential case:

/**
 * Provides numbers starting at n, on split splits such that child iterator and
 * this take provide interleaving numbers
 */
public class InterleaveSplitIntSplitIterator extends Spliterators.AbstractIntSpliterator {

    private int current;
    private int increment;

    protected InterleaveSplitIntSplitIterator(int start, int increment) {
        super(Integer.MAX_VALUE,
                        Spliterator.DISTINCT
                        // splitting is interleaved, not prefixing
                        // | Spliterator.ORDERED
                        | Spliterator.NONNULL
                        | Spliterator.IMMUTABLE
                        // SORTED must imply ORDERED
                        // | Spliterator.SORTED
        );
        if (increment == 0) {
            throw new IllegalArgumentException("Increment must be non-zero");
        }
        this.current = start;
        this.increment = increment;
    }

    @Override
    public boolean tryAdvance(IntConsumer action) {
        // Don't benchmark with this on
        // System.out.println(Thread.currentThread() + " " + current);
        action.accept(current);
        current += increment;
        return true;
    }

    // this is required for ORDERED even if sorted() is never called
    @Override
    public Comparator<? super Integer> getComparator() {
        if (increment > 0) {
            return null;
        }
        return Comparator.<Integer>naturalOrder().reversed();
    }

    @Override
    public OfInt trySplit() {
        if (increment >= 2) {
            return null;
        }
        int newIncrement = this.increment * 2;
        int oldIncrement = this.increment;

        this.increment = newIncrement;
        return new InterleaveSplitIntSplitIterator(current + oldIncrement, newIncrement);
    }

    // for convenience
    public static IntStream asIntStream(int start, int increment) {
        return StreamSupport.intStream(
                new InterleaveSplitIntSplitIterator(start, increment),
                /* no, never set parallel here */ false);
    }
}

However, such streams cannot have the Spliterator.ORDERED characteristics, because

If so, this Spliterator guarantees that method {@link #trySplit} splits a strict prefix of elements

and this also means such a stream cannot keep it's SORTED characteristics, because

A Spliterator that reports {@code SORTED} must also report {@code ORDERED}

So my splititerator in parallel ends up having (somewhat) jumbled numbers, which would have to be fixed by sorting before applying a limit, which does not work well with infinite streams (in the general case).

So all solutions to this must use a splititerator that splits in chunks or prefix data, which then are consumed in ~arbitrary order, which causes many number ranges beyond the actual result to be processed, becoming (much) slower in general than a sequential solution.

So other than bounding the number range to test, it seems there cannot be a solution using a parallel stream. The problem is in the specification requiring ORDERED characteristics to split a Stream by prefixing, instead of providing a different means of reassembling ordered stream results from multiple splititerators.

However a solution using a sequential stream with parallelly processed (buffered) inputs may still be possible (but not as simple as calling parallel()).

tkruse
  • 10,222
  • 7
  • 53
  • 80
  • you can use the *segmented* sieve of Eratosthenes. have an ordered queue of segments, and process N of them in parallel, pulling new one from the queue each time the older one is finished and its primes have been counted. When the segment with the target nth prime is identified, get that prime from it instead of just counting all the primes in it as usual. What does it mean in terms of Java specifics, I don't know. (it's easy to (over-)estimate the nth prime's magnitude, as `n (log n + log log n)` IIRC. (see Wikipedia for that). – Will Ness Jul 16 '19 at 18:40