1

I have a problem with Java 8 streams, where the data is processed in sudden bulks, rather than when they are requested. I have a rather complex stream-flow which has to be parallelised because I use concat to merge two streams.

My issue stems from the fact that data seems to be parsed in large bulks minutes - and sometimes even hours - apart. I would expect this processing to happen as soon as the Stream reads incoming data, to spread the workload. Bulk processing seems counterintuitive in almost every way.

So, the question is why this bulk-collection occurs and how I can avoid it.

My input is a Spliterator of unknown size and I use a forEach as the terminal operation.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
Jens Egholm
  • 2,730
  • 3
  • 22
  • 35
  • 3
    Without more detailed it's going to be difficult to answer your question. Do you have nested parallel streams by any chance? – assylias Sep 11 '15 at 15:25
  • 8
    Whats the question? – Sleiman Jneidi Sep 11 '15 at 15:26
  • 2
    Sounds you want push streams (e.g. RxJava), not pull streams (such as Java8's) – the8472 Sep 11 '15 at 15:44
  • 3
    Bulk processing might seem counter-intuitive to you but is the only way to avoid large per-item synchronization overhead. It seems you have an inappropriate trySplit implementation. The default implementation of `AbstractSpliterator` isn’t well suited for unknown sizes and large initial per-item overhead as it will buffer quite a lot of items then. You may implement a more suitable strategy, which, by the way, will likely require an unordered spliterator, so you are not required to return a strict prefix. You may also consider the possibility that `Stream`s are not the right tool for your job. – Holger Sep 11 '15 at 17:04
  • I added an actual question. Yes, I have nested parallel streams. But as far as I can read in the docs, that shouldn't be a problem. Regarding per-item synchronisation, I assumed this would be specified in the spliterator. It's a simple implementation that always returns true and blocks when reading input. I have considered using RxJava, but Streams just seem to have a much simpler API. And as far as I can see they're quite efficient. – Jens Egholm Sep 11 '15 at 17:30
  • 2
    Java 8 streams are _not_ the same thing as reading in streaming data from elsewhere, and are not likely to be usable _at all_ for that use case. – Louis Wasserman Sep 11 '15 at 17:40

1 Answers1

5

It’s a fundamental principle of parallel streams that the encounter order doesn’t have to match the processing order. This enables concurrent processing of items of sublists or subtrees while assembling a correctly ordered result, if necessary. This explicitly allows bulk processing and even makes it mandatory for the parallel processing of ordered streams.

This behavior is determined by the particular implementation of the Spliterator’s trySplit implementation. The specification says:

If this Spliterator is ORDERED, the returned Spliterator must cover a strict prefix of the elements

API Note:

An ideal trySplit method efficiently (without traversal) divides its elements exactly in half, allowing balanced parallel computation.


Why was this strategy fixed in the specification and not, e.g. an even/odd split?

Well, consider a simple use case. A list will be filtered and collected into a new list, thus the encounter order must be retained. With the prefix rule, it’s rather easy to implement. Split off a prefix, filter both chunks concurrently, afterwards, add the result of the prefix filtering to the new list, followed by adding the filtered suffix.

With an even odd strategy, that’s impossible. You may filter both parts concurrently, but afterwards, you don’t know how to join the results correctly unless you track each items position throughout the entire operation.

Even then, joining these geared items would be much more complicated than performing an addAll per chunk.


You might have noticed that this all applies only, if you have an encounter order that might have to be retained. If your spliterator doesn’t report an ORDERED characteristic, it is not required to return a prefix. Nevertheless, the default implementation you might have inherited by AbstractSpliterator is designed to be compatible with ordered spliterators. Thus, if you want a different strategy, you have to implement the split operation yourself.

Or you use a different way of implementing an unordered stream, e.g.

Stream.generate(()->{
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    return Thread.currentThread().getName();
}).parallel().forEach(System.out::println);

might be closer to what you expected.

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765