12

Are java.util.stream.Collectors::joining implementations thread-safe? Can I do something like

public final class SomeClass {
  private static final Collector<CharSequence, ?, String> jc = Collectors.joining(",");

  public String someMethod(List<String> someList) {
       return someList.parallelStream().collect(jc);
  }
}

without fear of running into concurrency issues?

voho
  • 2,805
  • 1
  • 21
  • 26
Roy Stark
  • 463
  • 2
  • 15
  • 4
    From what I remember it is not Collectors responsibility to handle thread safety. Collectors simply provides way explaining *where* to collect, but it is Stream which handles *when* to collect (which should also be thread-safe). – Pshemo Jun 25 '15 at 13:43
  • 1
    @Pshemo In this case multiple streams are using the same shared collector. – assylias Jun 25 '15 at 13:52
  • 1
    @Pshemo looking at the implementation it looks like it would work - I don't know if it is specific to this collector or if it is a derived result of their thread safety. – assylias Jun 25 '15 at 13:53
  • 2
    @assylias You are right. I was under false impression that `jk` `Collector` is using one `StringJoiner` for each joining so I was thinking that each stream should have its own separate `Collector`. But now I see that each call of `collect(jc)` will eventually create `new StringJoiner`, so using it in a way shown in question should be OK (unless someone will edit `someList` before stream will finish its collecting job but that is a problem of `someList` not stream). – Pshemo Jun 25 '15 at 14:33
  • 7
    Short answer: yes. – Brian Goetz Jun 25 '15 at 14:44

1 Answers1

14

You can use this collector as any other collector provided in Collectors class without fear of running into concurrency issues. The Collector need not to care about thread safety unless it has CONCURRENT characteristic. It just need to have its operations non-interfering, stateless and associative. The rest will be done by Stream pipeline itself. It will use the collector functions in the way which does not require the additional synchronization. In particular when accumulator or combiner function is called, it's guaranteed that no other thread is operating on the same accumulated value at the moment. This is specified in Collector documentation:

Libraries that implement reduction based on Collector, such as Stream.collect(Collector), must adhere to the following constraints:

<...>

  • For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.

Note that the collector itself is stateless as well as functions it provides, thus it's also safe to have it in the static field. The state is preserved in the external accumulator which is returned by supplier and passed back to accumulator, combiner and finisher. So even if the same collector is reused by several stream operations, they don't interfere.

Community
  • 1
  • 1
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334