3

After going through the following answers which talks about using concurrent data structures in streams and different between using concurrent map and converting to a map, can someone explain what will happen if I am using the other syntax of collect i.e.

    Stream<Integer> integers = Stream.iterate(1, n -> n + 1).parallel(); 
    Map<Integer, Boolean> resultMap = integers
                                        .limit(1000)
                                        .collect(HashMap::new,
                                                (map, value) -> map.put(value, false),
                                                HashMap::putAll);

As per the documentation, supplier will be invoked depending on number of threads spawned. What if i use ConcurrentHashMap instead of HashMap ?

When executed in parallel, multiple intermediate results may be instantiated, populated, and merged so as to maintain isolation of mutable data structures. Therefore, even when executed in parallel with non-thread-safe data structures (such as ArrayList), no additional synchronization is needed for a parallel reduction.

Timir
  • 1,395
  • 8
  • 16
Adithya
  • 2,923
  • 5
  • 33
  • 47

2 Answers2

7

There will be no behavioral change when you use ConcurrentHashMap instead of HashMap with the three-arg collect method. To change the behavior, you need a Collector which reports the CONCURRENT characteristic and there is no way to specify characteristics with the ad-hoc collector.

Further, the operation must be unordered to enable a parallel collect operation where all threads accumulate into a single container. The operation may be unordered due to the stream properties, either intrinsically, e.g. when streaming over an unordered source like a HashSet, or explicitly via unordered(), e.g.

Map<Integer, Boolean> resultMap = integers.limit(1000)
    .unordered()
    .collect(Collector.of(
        () -> new ConcurrentHashMap<>(),
        (map, value) -> map.put(value, false),
        (m1,m2) -> { m1.putAll(m2); return m1; },
        Collector.Characteristics.CONCURRENT));

or due to the UNORDERED characteristic of the collector:

Map<Integer, Boolean> resultMap = integers.limit(1000)
    .collect(Collector.of(
        () -> new ConcurrentHashMap<>(),
        (map, value) -> map.put(value, false),
        (m1,m2) -> { m1.putAll(m2); return m1; },
        Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED));

The latter is what you get when using the builtin collector:

Map<Integer, Boolean> resultMap = integers.limit(1000)
    .collect(Collectors.toConcurrentMap(Function.identity(), i -> Boolean.FALSE));

toConcurrentMap will always be CONCURRENT and UNORDERED and requires a ConcurrentMap when you use a map supplier, whereas toMap is never CONCURRENT, even if you provide a supplier which creates instances of a ConcurrentMap implementation.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • 2
    Quite elaborative! appreciated. – Vinay Prajapati Apr 18 '18 at 13:14
  • So, it's essential that the stream is `parallel` as well as `unordered` for parallel collection to work ? How will i achieve my result if i specify `Stream integers = Stream.iterate(1, n -> n + 1).parallel(); integers.unordered().limit(1000).collect(Collectors.toConcurrentMap(Function.identity(), i -> Boolean.FALSE ));` as i want to have numbers from 1 to 1000. How do parallel and unordered work together in the above case ? – Adithya Apr 18 '18 at 18:32
  • 1
    @Adithya first of all, `ConcurrentHashMap` has no defined order, so the result is always unordered. That’s why the `toConcurrentMap` collector has the `UNORDERED` characteristic, which makes explicit `.unordered()` unnecessary (like in my last example). – Holger Apr 19 '18 at 14:46
  • 1
    @Adithya But if you make the stream unordered, take care to do it like in my first example, `.limit(1000).unordered()`, not `.unordered().limit(1000)`. Doing `.limit(1000)` first, selects the first 1000 elements from the ordered stream, followed by releasing the ordering for subsequent processing. Doing `.unordered()` first, makes the subsequent `.limit(1000)` select *some* thousand elements from the now-unordered, infinite stream (there is no “first” in an unordered stream). – Holger Apr 19 '18 at 14:47
  • @Holger Appreciate your effort ! After trying few approaches it seems right order needs to be `.limit(10).parallel().unordered()` (figured out with help of `.peek()`). Although, i am still unclear how `unordered` works. Some intermediate operations wait till all the input is processed like `sorted()`. So, even if i do give `unordered` before `limit` i.e. `.parallel().unordered().limit(10).forEach(System.out::println)` , it does process limit and print correct values as well ! Maybe not the right forum but i will need to explore how `.parallel()` and `.unordered()` work together. – Adithya Apr 19 '18 at 19:29
  • Mind that the actual terminal operation is the strongest factor in determining the execution strategy. `forEach` may behave entirely different than `collect`. And particular implementations may not utilize all optimization opportunities yet, so it’s important to care for the *semantics* of the operation. `.unordered().limit(10)` selects *some* ten elements, not necessarily the first ten elements, but it may happen to be the first ten in some constellation depending on, e.g. the actual terminal operation or whether the limit is smaller than the source’s splitting capability. – Holger Apr 20 '18 at 06:55
  • Can you elaborate your last point ? What i meant is certain intermediate operations can hang your code like calling `sorted` on an infinite stream where the terminal operation has no control. Calling `unordered` on an infinite stream doesn't cause the stream to hang. That's where i wasn't clear. – Adithya Apr 20 '18 at 13:04
  • 1
    `sorted` is *not* the opposite of `unordered`. With `unordered`, you are just saying that you don’t care about the order and allow the implementation to take performance advantage of the unordered nature of the operation. In contrast, with `sorted` you are requesting the stream to sort all elements and that can only be done by first receiving all elements, as it is possible that the last encountered element is the minimum, i.e. will be the first after sorting. You don’t know until you have seen all values. So you must apply a `limit` before `sorted` for infinite streams. – Holger Apr 20 '18 at 13:56
2

To be precise, even, if you use ConcurrentHashMap threads, won't be sharing data and there would be as many ConcurrentHashMap as many threads are there i.e. supplier would be called that no. of times. The combiner i.e. the last parameter which is a BiConsumer will be doing the merge operation but in no order i.e. whichever thread finishes throws data at it and it is then merged.

When you explicitly say Collectors.toConcurrentMap then the Collectors behavior is that one a single container(i.e. ConcurrentHashMap)all threads will push the data & no combining efforts would be required.

Vinay Prajapati
  • 7,199
  • 9
  • 45
  • 86