7

I am a beginner in Java 8.

Non-interference is important to have consistent Java stream behaviour. Imagine we are process a large stream of data and during the process the source is changed. The result will be unpredictable. This is irrespective of the processing mode of the stream parallel or sequential.

The source can be modified till the statement terminal operation is invoked. Beyond that the source should not be modified till the stream execution completes. So handling the concurrent modification in stream source is critical to have a consistent stream performance.

The above quotations are taken from here.

Can someone do some simple example that would shed lights on why mutating the stream source would give such big problems?

Naman
  • 27,789
  • 26
  • 218
  • 353

1 Answers1

11

Well the oracle example is self-explanatory here. First one is this:

List<String> l = new ArrayList<>(Arrays.asList("one", "two"));
 Stream<String> sl = l.stream();
 l.add("three");
 String s = l.collect(Collectors.joining(" "));

If you change l by adding one more elements to it before you call the terminal operation (Collectors.joining) you are fine; but notice that the Stream consists of three elements, not two; at the time you created the Stream via l.stream().

On the other hand doing this:

  List<String> list = new ArrayList<>();
  list.add("test");
  list.forEach(x -> list.add(x));

will throw a ConcurrentModificationException since you can't change the source.

And now suppose you have an underlying source that can handle concurrent adds:

ConcurrentHashMap<String, Integer> cMap = new ConcurrentHashMap<>();
cMap.put("one", 1);
cMap.forEach((key, value) -> cMap.put(key + key, value + value));
System.out.println(cMap);

What should the output be here? When I run this it is:

 {oneoneoneoneoneoneoneone=8, one=1, oneone=2, oneoneoneone=4}

Changing the key to zx (cMap.put("zx", 1)), the result is now:

{zxzx=2, zx=1}

The result is not consistent.

Eugene
  • 117,005
  • 15
  • 201
  • 306
  • 7
    I just thought about the ambiguity of `list.forEach(x -> list.add(x));`, if it didn’t throw a `ConcurrentModificationException`, but it’s a great idea to demonstrate it by comparing with a `ConcurrentHashMap`… – Holger Jun 01 '17 at 12:05
  • @Holger I *really* wanted to generate a Key that would make this die with an OutOfMemory; in theory this could happen. That would make the example great I think. Thank you for the link. – Eugene Jun 01 '17 at 12:08
  • 6
    In the absence of updates made by other threads, CHM `forEach` will not notice new state when the capacity has been increased. You may try `ConcurrentHashMap cMap = new ConcurrentHashMap<>(1000, 1f); cMap.put(1, 1); cMap.forEach((key, value) -> cMap.put(key + 1, 0));` and play with the initial capacity. You’ll see that the iteration stops when the capacity is increased (it’s rounded up to the next power of two). – Holger Jun 01 '17 at 12:30
  • 6
    If you want more than 65535 iterations, you have to adapt the key to the effect of CHM’s `spread` function, e.g. `ConcurrentHashMap cMap = new ConcurrentHashMap<>(200_000_000, 1f); cMap.put(1, 1); cMap.forEach((key, value) -> cMap.put(++value^(value>>>16), value));` runs for minutes now on my machine, already consuming several GB, don’t know whether or when it will bail out… – Holger Jun 01 '17 at 12:56
  • 1
    Finally got the OOME after ~ten minutes… – Holger Jun 01 '17 at 13:11
  • @Holger this the best comment I have faced so far. thank you so much! – Eugene Jun 01 '17 at 13:13
  • 3
    It is worth noting that the `ConcurrentHashMap` allow this because its iteration order is [weakly consistent](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly) – although this is not explicitly specified for `forEach()` – Didier L Jun 01 '17 at 14:02
  • @Eugene, hi! I find your answer to be very interesting, but apart from the example taken from the stream package docs, I'm failing to see what the non-interference requirement for streams has to do with your examples. You are not even using streams with your `ArrayList`, neither with your CHM. Besides, if I use a `CopyOnWriteArrayList` instead of an `ArrayList`, I observe consistent behavior. So I think that the inconsistent behavior has more to do with the internals of CHM than with streams interference. I'm sure that your examples go in the right direction, maybe I'm just missing the link... – fps Jun 01 '17 at 14:21
  • 5
    @Didier L: [the specification](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentMap.html#forEach-java.util.function.BiConsumer-) says that this is equivalent to the loop `for ((Map.Entry entry : map.entrySet()) action.accept(entry.getKey(), entry.getValue());`, which implies that the behavior should [as *weakly consistent* as the entryset iterator](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#entrySet--). – Holger Jun 01 '17 at 14:21
  • 4
    @Federico Peralta Schaffner: the behavior doesn’t change when you rewrite the examples to use `stream().forEach(…)` resp. `entrySet().stream().forEach(…)` instead of `forEach(…)`. These are just simplified examples of interference. When you use a `CopyOnWriteArrayList`, you don’t have interference.That’s even stated in [its class documentation](https://docs.oracle.com/javase/8/docs/api/?java/util/concurrent/CopyOnWriteArrayList.html) “*interference is impossible*”. – Holger Jun 01 '17 at 14:27
  • 1
    @FedericoPeraltaSchaffner would it make more sense if I used `l.stream().forEach` or `CHM.entrySet().stream().forEach...`? – Eugene Jun 01 '17 at 14:28
  • 1
    Eugene, First of all, let me clarify that I have upvoted your answer, because it clearly shows interference in collections and maps. I'm not saying that it would make more sense to add `.stream()` to your examples, it's just that I don't fully understand the connection between the interference you are showing with a list and a map and the requirement of non-interference for streams, which mandates to not modify the stream source from within an intermediate or terminal operation. I thought it was something intrinsic of streams, i.e. that the stream would explode by its own means. – fps Jun 01 '17 at 14:37
  • 1
    @Holger Thanks for the link, it's clear that there's no interference with `CopyOnWriteArrayList`. But I thought that the requirement of non-interference of streams had something to do with streams, and not with the source collections/maps. – fps Jun 01 '17 at 14:41
  • 2
    @Federico Peralta Schaffner: Interference is a broad term, but the OP provided a link/cite that shows that “*interference with the data source*” is meant, just like in the [stream’s package documentation](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#NonInterference). Keep in mind that a Stream operation *is* a single-iteration over a source, dependent on the source’s iteration policy. – Holger Jun 01 '17 at 14:53
  • 3
    @Holger That was the exact piece of information I was lacking: *a Stream operation is a single-iteration over a source, dependent on the source’s iteration policy*. Now, with that in mind, everything makes perfect sense. Thanks to both you and Eugene! – fps Jun 01 '17 at 14:57
  • 5
    @Eugene: well, you could also have interference between functions, but that’s discussed in the Stream API by discouraging “stateful behavioral parameters” and “side-effects in behavioral parameters” in general. – Holger Jun 01 '17 at 14:58