14

Why should I use concurrent characteristic in parallel stream with collect:

List<Integer> list =
        Collections.synchronizedList(new ArrayList<>(Arrays.asList(1, 2, 4)));

Map<Integer, Integer> collect = list.stream().parallel()
        .collect(Collectors.toConcurrentMap(k -> k, v -> v, (c, c2) -> c + c2));

And not:

Map<Integer, Integer> collect = list.stream().parallel()
        .collect(Collectors.toMap(k -> k, v -> v, (c, c2) -> c + c2));

In other words, what are the side effects to not using this characteristic, Is it useful for the internal stream operations?

codewario
  • 19,553
  • 20
  • 90
  • 159
0xh3xa
  • 4,801
  • 2
  • 14
  • 28
  • 2
    Side effects include nausea, loss of data, broken maps and other concurrency related problems. – Kayaman Dec 08 '16 at 14:32
  • 1
    First, don’t use `synchronizedList`. There is no reason to use that. Well, there’s also no reason to copy the contents of the `List` returned by `Arrays.asList(1, 2, 4)` to another `ArrayList`. – Holger Dec 08 '16 at 17:14
  • @Holger because Arrays.asList return abstract list and I want to make some modification and remove with abstractlist throws an exception, so I wrapped it into ArrayList – 0xh3xa Dec 09 '16 at 08:07
  • 1
    Right, when you want to `add` or `remove` you need an `ArrayList`. But in this question you’re not doing that, so it isn’t necessary. – Holger Dec 09 '16 at 08:33
  • 3
    btw how about accepting Holger's answer here? – Eugene Aug 28 '18 at 13:26

3 Answers3

22

These two collectors operate in a fundamentally different way.

First, the Stream framework will split the workload into independent chunks that can be processed in parallel (that’s why you don’t need a special collection as the source, synchronizedList is unnecessary).

With a non-concurrent collector, each chunk will be processed by creating a local container (here, a Map) using the Collector’s supplier and accumulating it into the local container (putting entries). These partial results have to be merged, i.e. one map has been put into the other, to get a final result.

A concurrent collector supports accumulating concurrently, so only one ConcurrentMap will be created and all threads accumulate into that map at the same time. So after completion, no merging step is required, as there is only one map.


So both collectors are thread-safe, but might exhibit entirely different performance characteristics, depending on the task. If the Stream’s workload before collecting the result is heavy, the differences might be negligible. If like in your example, there is no relevant work before the collect operation, the outcome heavily depends on how often mappings have to be merged, i.e the same key occurs, and how the actual target ConcurrentMap deals with contention in the concurrent case.

If you mostly have distinct keys, the merging step of a non-concurrent collector can be as expensive as the previous putting, destroying any benefit of the parallel processing. But if you have lots of duplicate keys, requiring merging of the values, the contention on the same key may degrade the concurrent collector’s performance.

So there’s no simple “which is better” answer (well, if there was such an answer, why bother adding the other variant). It depends on your actual operation. You can use the expected scenario as a starting point for selecting one but should measure with the real-life data then. Since both are equivalent, you can change your choice at any time.

Naman
  • 27,789
  • 26
  • 218
  • 353
Holger
  • 285,553
  • 42
  • 434
  • 765
  • I don't understand u said in [this question](https://stackoverflow.com/questions/52054008/why-my-collector-method-is-not-processing-data-parallely) that irrespective of collector's container being concurrent or not will collect data sequentially in encountered order.but here u say **A concurrent collector supports accumulating concurrently, so only one ConcurrentMap will be created and all threads accumulate into that map at the same time. So after completion, no merging step is required, as there is only one map** so how collectors characteristics are determined ? – amarnath harish Sep 17 '18 at 17:32
  • 3
    @amarnathharish it’s Eugene saying it, not me, but still, it’s correct. The important point is, a `Collector` is concurrent when it *declares* to be concurrent by [returning the `CONCURRENT` characteristic in the set](https://docs.oracle.com/javase/9/docs/api/java/util/stream/Collector.html#characteristics--). It’s not influenced by the actual type returned by the `Supplier`, but always the deliberate choice of the programmer. In the Q&A you’ve linked, Eugene shows one way to declare that characteristic (with `Collector.of`). The other is to use `toConcurrentMap` instead of `toMap`. – Holger Sep 17 '18 at 17:42
9

First of all I gave a +1 to Holger's answer, it is a good one. I would try to simply it just a bit, by saying that :

CONCURRENT -> multiple threads throw data at the same container in no particular order (ConcurrentHashMap)

NON-CONCURRENT -> multiple threads combine their intermediate results.

The easiest way to understand it (IMHO) is to write a custom collector and play with each of it's methods: supplier, accumulator, combiner.

This was already sort-of covered here

Community
  • 1
  • 1
Eugene
  • 117,005
  • 15
  • 201
  • 306
2

Because of this : "Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentMap as a key or value happen-before actions subsequent to the access or removal of that object from the ConcurrentMap in another thread."

Galeixo
  • 102
  • 9