13

Why the below code doesn't print any output whereas if we remove parallel, it prints 0, 1?

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .parallel()
         .distinct()
         .limit(10)
         .forEach(System.out::println);

Though I know ideally limit should be placed before distinct, but my question is more related with the difference caused by adding parallel processing.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
krmanish007
  • 6,749
  • 16
  • 58
  • 100
  • On my machine, this locks up 3/4 cores at 100% CPU usage without producing an answer! I think this may be a bug in the interaction between limit and parallel. – Straw1239 Feb 04 '16 at 00:15
  • @Straw1239 - agree that this should be considered a bug; there's no apparent reason why this code does not print something promptly. – ZhongYu Feb 04 '16 at 00:17
  • 1
    It tries to buffer the entire stream contents in the `distinct` op. That happens even without the `limit`. While it's clear that threads processing subsequent chunks have to wait for their previous for an ordered stream, the first one does not need to wait... – Holger Feb 04 '16 at 00:17
  • @Holger -- if buffering theory is true, and we use LongStream and `i->i+1` generator, we should see OutOfMemoryError pretty soon, right? I did that experiment, and waited for no avail. I had to kill it because the CUP is melting. – ZhongYu Feb 04 '16 at 00:38
  • We can't have OutOfMemory in Stream, as it has a fixed memory and it never tries to store the full content, but just enough to fill its own memory. As full content is anyway stored in the collection. – krmanish007 Feb 04 '16 at 00:41
  • The only numbers being stored in the buffer are 0 and 1. – Straw1239 Feb 04 '16 at 00:44
  • 1
    @bayou.io: it’s a known phenomenon not to see an OOME but having a process hanging in doing GC activity only. Just monitor the JVMs activity, the ratio between used memory and max heap and the ratio between CPU usage and GC activity. Though, when GC starts going mad, most monitoring methods stop working as well… – Holger Feb 04 '16 at 08:35
  • 1
    @Straw1239: bayou.io talked about using a `i->i+1` function rather than `i->(i+1)%2`. When not using `%2`, the buffer really starts eating up all memory. – Holger Feb 04 '16 at 08:45

4 Answers4

8

The real cause is that ordered parallel .distinct() is the full barrier operation as described in documentation:

Preserving stability for distinct() in parallel pipelines is relatively expensive (requires that the operation act as a full barrier, with substantial buffering overhead), and stability is often not needed.

The "full barrier operation" means that all the upstream operations must be performed before the downstream can start. There are only two full barrier operations in Stream API: .sorted() (every time) and .distinct() (in ordered parallel case). As you have non short-circuit infinite stream supplied to the .distinct() you end up with infinite loop. By contract .distinct() cannot just emit elements to the downstream in any order: it should always emit the first repeating element. While it's theoretically possible to implement parallel ordered .distinct() better, it would be much more complex implementation.

As for solution, @user140547 is right: add .unordered() before .distinct() this switches distinct() algorithm to unordered one (which just uses shared ConcurrentHashMap to store all the observed elements and emits every new element to the downstream). Note that adding .unordered() after .distinct() will not help.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
5

Stream.iterate returns 'an infinite sequential ordered Stream'. Therefore, making a sequential stream parallel is not too useful.

According to the description of the Stream package:

For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant. Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism. In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations. However, most stream pipelines, such as the "sum of weight of blocks" example above, still parallelize efficiently even under ordering constraints.

this seems to be the case in your case, using unordered(), it prints 0,1.

    IntStream.iterate(0, i -> (i + 1) % 2)
            .parallel()
            .unordered()
            .distinct()
            .limit(10)
            .forEach(System.out::println);
user140547
  • 7,750
  • 3
  • 28
  • 80
2

I know the code is not correct and as suggested in the solution too, if we move the limit before distinct, we won't have infinite loop.

Parallel function is using fork and join concept to allocate the work, which allocates all the available thread for the work, rather than single thread.

We are rightly expecting infinite loop, as multiple thread is infinitely processing a data and nothing stopping them, as the limit of 10 is never hitting after distinct.

It might be possible that it keeps trying to fork and never tries to join to move it forward. But still I believe its a defect in the java with more than anything else.

krmanish007
  • 6,749
  • 16
  • 58
  • 100
  • 1
    Moving the `limit` before the `distinct` will change the semantic. Besides that, you can make your code correct by using `limit(2)` but the problem still remains. – Holger Feb 04 '16 at 09:01
1

This code has a major problem, even without the parallel: After .distinct(), the stream will only have 2 elements- so the limit never kicks in- it will print the two elements and then continue wasting your CPU time indefinitely. This might have been what you intended though.

With parallel and limit, I believe the problem is exacerbated due to the way the work is divided. I haven't traced all the way through the parallel stream code, but here is my guess:

The parallel code divides the work between multiple threads, which all chug along indefinitely as they never fill their quota. The system probably waits for each thread to finish so it can combine their results to ensure distinctness in order- but this will never happen in the case you provide.

Without the order requirement, results from each worker thread can be immediately uses after being checked against a global distinctness set.

Without limit, I suspect that different code is used to handle infinite streams: instead of waiting for the required 10 to fill up, results are reported as discovered. Its a little like making an iterator that reports hasNext() = true, first produces 0, then 1, then the next() call hangs forever without producing a result- in the parallel case something is waiting for several reports so it can properly combine/order them before outputting, while in the serial case it does what it can then hangs.

Ill try to find the exact difference in call stack with and without distinct() or limit(), but so far it seems very difficult to navigate the rather complex stream library calling sequence.

Straw1239
  • 589
  • 2
  • 8
  • 1
    You must have tried a different implementation. As of Oracle’s `1.8.0_60` and `1.8.0_65`, it doesn’t matter whether you use `.limit(10)` or `.limit(2)` and even omitting `limit` won’t change the behavior. – Holger Feb 04 '16 at 08:58