5

This is probably very basic, but I am not a Java person. Here is my processing code which simply prints and sleeps:

    private static void myProcessings(int value)
    {
        System.out.println("Processing " + value);
    
        try
        {
            Thread.sleep(2000);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    
        System.out.println("Finished processing " + value);
    }

Now, this parallel stream seems to work in parallel:

    IntStream iit = IntStream.rangeClosed(1,3);
    iit.parallel().forEach(Main::myProcessings);
    
    // output:

    // Processing 2
    // Processing 1
    // Processing 3
    // Finished processing 3
    // Finished processing 2
    // Finished processing 1

But this one (made from an Iterator) does not:

    static class MyIter implements Iterator<Integer>
    {
        private int max;
        private int current;
    
        public MyIter(int maxVal)
        {
            max = maxVal;
            current = 1;
        }
    
        @Override
        public boolean hasNext()
        {
            return current <= max;
        }
    
        @Override
        public Integer next()
        {
            return current++;
        }
    }
    
    MyIter it = new MyIter(3);
    StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), true)
                 .forEach(Main::myProcessings);

    // output:

    // Processing 1
    // Finished processing 1
    // Processing 2
    // Finished processing 2
    // Processing 3
    // Finished processing 3

What am I doing wrong in the custom Iterator version? (I am using Java 8)

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
mshsayem
  • 17,557
  • 11
  • 61
  • 69
  • 2
    You have `parallel()` in the one but not in the other? – akuzminykh Jun 30 '20 at 08:42
  • 2
    @akuzminykh I used `.parallel()` in the later one too, no use. Because the 2nd parameter in `StreamSupport.stream()` already makes it parallel. – mshsayem Jun 30 '20 at 08:47
  • 1
    @akuzminykh `stream(Spliterators.spliteratorUnknownSize(it, 0), true)` - second parameter - `true` is a flag whether or not the result stream have to be parallel. – Amongalen Jun 30 '20 at 08:47
  • From javadocs for [Spliterators#spliteratorUnknownSize](https://docs.oracle.com/javase/8/docs/api/java/util/Spliterators.html#spliteratorUnknownSize-java.util.Iterator-int-) "The spliterator is not late-binding, inherits the fail-fast properties of the iterator, and implements trySplit to permit **limited** parallelism." It could be the factor here - the "limited parallelism" whatever it means in this case. – Amongalen Jun 30 '20 at 08:53
  • @Amongalen Can you suggest a way? – mshsayem Jun 30 '20 at 09:01
  • 1
    related and possibly duplicate https://stackoverflow.com/questions/46709455/understanding-sequential-vs-parallel-stream-spliterators-in-java-8-and-java-9, it mentions that the `true` parameter DOES NOT MAKE the stream `parallel` – Nikos M. Jun 30 '20 at 09:04
  • 6
    https://stackoverflow.com/a/48308511/4949750 - check this answer - it explains exactly what is the problem here. Spliterators will split the work only with big enough collections, like 10000+ elements. If you decrease sleep time and increase amount of elements your code works just fine. – Amongalen Jun 30 '20 at 09:05

2 Answers2

8

There was a flaw in Spliterators.spliteratorUnknownSize() implementation. I fixed it in Java 19, see JDK-8280915. Since 19-ea+19-1283 early access build the problem is not reproduced anymore, your code is parallelized correctly without explicit size specification:

Processing 2
Processing 3
Processing 1
Finished processing 3
Finished processing 1
Finished processing 2
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
5

One way is to give an estimate of the size of the stream:

Spliterators.spliterator(it, 3, 0);

The number (3 here) doesn't have to be precise, but if you give say 10000, only one thread will be used for an actual size of 3. If you give say 10, multiple threads will be used, even with a size of 3.

The estimate (3 in my example) is used to determine the size of the batches (number of tasks sent to one thread before moving on to the next thread). If you provide a large estimated number and only submit a few tasks, they will probably all be grouped and run on the first thread and nothing will be sent to the second thread.

assylias
  • 321,522
  • 82
  • 660
  • 783
  • 5
    An `Iterator` doesn’t have to be thread safe, which is part of the problem. The Stream has to build parallel processing atop a source that doesn’t support parallel processing. – Holger Jun 30 '20 at 10:11
  • The problem is, my objects are a bit bulky; say, images stored in db, 80kB each. I can't use 10000, not even 1000. I am planning to fetch 250 objects from dB; the driver provides an iterator for memory-issues. I have a processor function which processes (CPU bound) those objects. Just wanted to use the CPU cores. – mshsayem Jun 30 '20 at 10:13
  • It seems, if I have `c` cpu-cores, for estimate, `n` and actual number of data, `m`: if `n>=m` it spawns `c` threads (or `n`, whichever is lower), and those threads complete processing all. But if `n – mshsayem Jun 30 '20 at 11:24
  • @assylias Now the question is, if I provide a large number as an estimate but actual number of data is much lower, will there be any negative impact (memory/perforrnamce)? – mshsayem Jun 30 '20 at 11:26
  • 2
    @mshsayem my understanding is that the number is used to determine the size of the batches (number of tasks sent to one thread before moving on to the next thread). If you provide a large estimated number and only submit a few tasks, they will probably all run on the same thread. – assylias Jun 30 '20 at 14:37
  • You are absolutely right! Now, that makes sense; Many thanks! You could update the answer to reflect this _batch_ issue. – mshsayem Jun 30 '20 at 14:47
  • 3
    To my experience, with the reference implementation, specifying an entirely bogus number still works much better than streams of unknown size. E.g., being off by factor five usually is no problem. With unknown size, you’re running into the problem described in [this comment](https://stackoverflow.com/questions/48307102/parallelism-with-streams-created-from-iterators/48308511#comment83607190_48308595). Even after all source elements have been buffered, the implementation will not understand that it know knows the exact size. – Holger Jul 02 '20 at 13:09
  • 1
    The advice to specify imprecise size is incorrect. `Spliterators.spliterator` accepts the exact size, rather than estimation, and subsequent stream operations may trust it. E.g., replace stream with `List result = StreamSupport.stream(Spliterators.spliterator(it, 10, 0), true).toList();` and you'll see that the list is oversized. – Tagir Valeev Apr 22 '22 at 08:26