4

From this question " Will inner parallel streams be processed fully in parallel before considering parallelizing outer stream?", I understood that streams perform work-stealing. However, I've noticed that it often doesn't seem to occur. For example, if I have a List of say 100,000 elements and I attempt to process it in parallelStream() fashion, I often notice towards the end that most of my CPU cores are sitting idle in the "waiting" state. (Note: Of the 100,000 elements in the list, some elements take a long time to process, whereas others are fast; and, the list is not balanced, which is why some threads may get "unlucky" and have lots to do, whereas others get lucky and have little to do).

So, my theory is that JIT compiler does an initial division of the 100,000 elements into the 16 threads (because I have 16 cores), but then within each thread, it just does a simple (sequential) for-loop (as that would be the most efficient) and therefore no work stealing would ever occurr (which is what I'm seeing).

I think the reason why Will inner parallel streams be processed fully in parallel before considering parallelizing outer stream? showed work stealing is that there was an OUTER loop that was streaming and an INNER LOOP that was streaming, and so in that case, each inner loop got evaluated at run time and would create new tasks that could, at runtime, be assigned to "idle" threads. Thoughts? Is there something I'm doing wrong that would "force" a simple list.parallelStream() to use work-stealing? (My current workaround is to attempt to balance the list based on various heurestics so that each thread sees, usually, the same amount of work; but, it's hard to predict that....)

Misha
  • 27,433
  • 6
  • 62
  • 78
Jonathan Sylvester
  • 1,275
  • 10
  • 23
  • I would be surprised if you could change the default strategy, which is about as simplistic as you describe. – Louis Wasserman May 11 '18 at 00:01
  • 2
    I don't think it has anything to do with the JIT compiler, as it is not supposed to alter the behaviour of the application. This is probably more related to how the fork-join pool behaves, and if the `Splitterator` actually allows splitting. – Didier L May 11 '18 at 15:58
  • Do you have a code example that demonstrates this behavior? – Sean Van Gorder May 11 '18 at 18:56

1 Answers1

1

This has nothing to do with the JIT compiler but with the implementation of the Stream API. It will divide the workload into chunks which are the processed sequentially by the worker threads. The general strategy is to have more jobs than worker threads to enable work-stealing, see for example ForkJoinTask.getSurplusQueuedTaskCount(), which can be used to implement such an adaptive strategy.

The following code can be used to detect how many elements were processed sequentially when the source is an ArrayList:

List<Object> list = new ArrayList<>(Collections.nCopies(10_000, ""));
System.out.println(System.getProperty("java.version"));
System.out.println(Runtime.getRuntime().availableProcessors());
System.out.println( list.parallelStream()
    .collect(
        () -> new ArrayList<>(Collections.singleton(0)),
        (l,x) -> l.replaceAll(i -> i + 1),
        List::addAll) );

On my current test machine, it prints:

1.8.0_60
4
[625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625]

So there are more chunks than cores, to allow work-stealing. However, once the sequential processing of a chunk has started, it can’t be split further, so this implementation has limitations when the per-element execution times differ significantly. This is always a trade-off.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • I've been doing some research, but I'm confused as to how to "have more jobs than worker threads", per your suggestion above. Is there a parameter (in the streaming API) that controls how many "jobs/tasks" to split a list into? So, if I had a list of 1,000,000 items and 10 threads, I could control whether there are jobs of 100,000 items each or, preferably, jobs of say 500 items each (so that jobs can run on whichever thread is idle at the time)? Or, must I use use ForkJoinTask / Futures in order to code this level of control / load balancing? – Jonathan Sylvester May 28 '18 at 10:33
  • There are several factors determining the actual strategy, implementation details, stream source characteristics, actual terminal operation, etc. One strategy, I’ve seen, is to simply use four times the number of intended parallelism. Another strategy is based on the `getSurplusQueuedTaskCount()` method I’ve linked in my answer. Its documentation already suggests the use case. Each worker forks until this method reports a value above a certain threshold (which should be a small number). In a balanced execution, this creates threshold×workers jobs, but more jobs, if some are already stolen. – Holger May 28 '18 at 10:48
  • @Holger minor question, does this mean that if my implementation of `trySplit` is really bad (or the source is not really splittable), I might not benefit from work stealing at all? Also, I don't *really* understand what this `625` is suppose to prove - well in my understanding it means that this 10_000 elements list will be split into `16` Spliterators, each taking 625 elements to handle sequentially... – Eugene Oct 09 '18 at 17:09
  • @Eugene the printed results show that the test has been made on a four core machine and ended up at 16 chunks (yes `Spliterator`s), so the splitting did not stop at having one spliterator per core, but produced significantly more. That’s crucial as once the sequential processing has started in a `forEachRemaining` manner, it can’t be stopped to split again for work-stealing. But when each of the four cores has four spliterators to process, it’s possible to steal them when the core has not reached the fourth yet. It’s even imaginable that further splitting will be done once that happened. – Holger Oct 09 '18 at 17:18
  • @Holger excellent! I was really in pain proving this at work, your example made it super simple, unfortunatelly I can not upvote twice – Eugene Oct 09 '18 at 17:29