12

I'm failing to understand the exact use case for Collectors.groupingByConcurrent. From the JavaDocs:

Returns a concurrent Collector implementing a cascaded "group by" operation on input elements of type T...
This is a concurrent and unordered Collector.
...

Maybe the keywords here are cascaded "group by". Does that point to something in how the actual accumulation is done by the collector? (looking at the source, it got intricate very quickly)


When I test it with a fake ConcurrentMap

class FakeConcurrentMap<K, V> extends HashMap<K, V> 
    implements ConcurrentMap<K, V> {}

I see that it breaks (gives wrong aggregations as the map isn't thread-safe) with parallel streams:

Map<Integer, Long> counts4 = IntStream.range(0, 1000000)
        .boxed()
        .parallel()
        .collect(
            Collectors.groupingByConcurrent(i -> i % 10, 
                                          FakeConcurrentMap::new, 
                                          Collectors.counting()));

Without .parallel(), results are consistently correct. So it seems that groupingByConcurrent goes with parallel streams.

But, as far as I can see, the following parallel stream collected with groupingBy always produces correct results:

Map<Integer, Long> counts3 = IntStream.range(0, 1000000)
        .boxed()
        .parallel()
        .collect(
            Collectors.groupingBy(i -> i % 10, 
                                  HashMap::new,
                                  Collectors.counting()));

So when is it correct to use groupingByConcurrent instead of groupingBy (surely that can't be just to get groupings as a concurrent map)?

ernest_k
  • 44,416
  • 5
  • 53
  • 99
  • 2
    You'll find that `groupingByConcurrent` doesn't **merge** the contents of the individual aggregations. Normally a `Collector` does not need to be thread safe - the API collections "split" streams then merges the resultant bits. So, the short answer is that for large streams it _may_ be more performant. – Boris the Spider Mar 02 '19 at 19:27
  • So in the OP's final example, the result is only consistent by happenstance? @BoristheSpider – markspace Mar 02 '19 at 19:29
  • I don't think so @markspace, it's correct by design - the final example is a `parallel()` with a normal `groupingBy`. A "normal" (non-concurrent) `Collector` is not required to be threadsafe. – Boris the Spider Mar 02 '19 at 19:31
  • See [`Collector.Characteristics.CONCURRENT`](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.Characteristics.html#CONCURRENT) – Boris the Spider Mar 02 '19 at 19:32
  • I'm not sure I follow. @BoristheSpider Is `Collector.Characteristics.CONCURRENT` obviated somehow in the stream? I think `Collectors.counting()` in the OP's final example might be responsible. If that method is thread safe and accumulates (and merges, if required) all results, then that could explain why the result is consistent. That IS just a guess though. – markspace Mar 02 '19 at 19:36
  • Sorry, @markspace - don't follow. `Collectors.counting` doesn't declare itself concurrent so the API ensures that it isn't called concurrently. – Boris the Spider Mar 02 '19 at 19:39
  • Oh, so does that one method make the entire stream not parallel? That might account for the consistent results, at least. @BoristheSpider – markspace Mar 02 '19 at 19:42
  • I would guess it prevents concurrency within a key, but still processes keys concurrently. Some experimentation required - you ask difficult questions @markspace. – Boris the Spider Mar 02 '19 at 19:44
  • The JavaDoc has a pretty good explanation. "Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads." – LppEdd Mar 02 '19 at 19:50
  • @LppEdd yeah - the question is what happens when the downstream collector is _not_ concurrent. – Boris the Spider Mar 02 '19 at 19:52
  • `groupingBy` will work just fine for parallel streams, but it won't be as _efficient._ It'll have to resort to cruder locking approaches and merging things later. – Louis Wasserman Mar 02 '19 at 20:01
  • @louiswasserman you happen to know where the behaviour of downstream non-concurrent collectors is documented? – Boris the Spider Mar 02 '19 at 20:04
  • 1
    @BoristheSpider I actively doubt that it's documented. The stream framework would probably consider it an implementation detail. – Louis Wasserman Mar 02 '19 at 20:06
  • 2
    @LouisWasserman there are no „cruder locking approaches“ needed for a non-concurrent collector. The costs of merging are the relevant aspect, which has to be compared to the costs of contention in the concurrent collector use. There is no simple answer which is more efficient. This depends on the actual stream operation and even the actual input data. See [Why should I use concurrent characteristic in parallel stream with collect?](https://stackoverflow.com/q/41041698/2711488)… – Holger Mar 02 '19 at 22:39

1 Answers1

13

All Collectors work just fine for parallel streams, but Collectors supporting direct concurrency (with Collector.Characteristics.CONCURRENT) are eligible for optimizations that others are not. groupingByConcurrent falls into this category.

(Roughly, what happens is that a non-concurrent collector breaks the input into per-thread pieces, creates an accumulator per thread, and then merges them at the end. A concurrent (and unordered) collector creates one accumulator and has several worker threads concurrently merging elements into the same accumulator.)

Louis Wasserman
  • 191,574
  • 25
  • 345
  • 413
  • Is there any way to extract this kind of analysis from Java or a JVM? Is there a JVM flag or compiler directive that will provide some analysis where the parallel components of a stream are and where extra locking has to be added? Just curious, mostly, but might be something to actually add if it's generally useful. – markspace Mar 02 '19 at 20:38
  • Not to my knowledge. It's an implementation detail users aren't usually supposed to care about. – Louis Wasserman Mar 02 '19 at 20:48