9

I actually tried to answer this question How to skip even lines of a Stream<String> obtained from the Files.lines. So I though this collector wouldn't work well in parallel:

private static Collector<String, ?, List<String>> oddLines() {
    int[] counter = {1};
    return Collector.of(ArrayList::new,
            (l, line) -> {
                if (counter[0] % 2 == 1) l.add(line);
                counter[0]++;
            },
            (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            });
}

but it works.

EDIT: It didn't actually work; I got fooled by the fact that my input set was too small to trigger any parallelism; see discussion in comments.

I thought it wouldn't work because of the two following plans of executions comes to my mind.


1. The counter array is shared among all threads.

Thread t1 read the first element of the Stream, so the if condition is satisfied. It adds the first element to its list. Then the execution stops before he has the time to update the array value.

Thread t2, which says started at the 4th element of the stream add it to its list. So we end up with a non-wanted element.

Of course since this collector seems to works, I guess it doesn't work like that. And the updates are not atomic anyway.


2. Each Thread has its own copy of the array

In this case there is no more problems for the update, but nothing prevents me that the thread t2 will not start at the 4th element of the stream. So he doesn't work like that either.


So it seems that it doesn't work like that at all, which brings me to the question... how the collector is used in parallel?

Can someone explain me basically how it works and why my collector works when ran in parallel?

Thank you very much!

Community
  • 1
  • 1
user2336315
  • 15,697
  • 10
  • 46
  • 64
  • When you think about it, if the collector was fed the elements in a non-serial fashion (or serialized but in a different order), your collector wouldn't work correctly, even without the counter. – biziclop May 11 '15 at 15:33
  • As it turns out, (2) is the most accurate explanation without going into to much about the implementation details. If you chose a simple `Collections.toList` as the collector. The parallel stream will do a map/reduce to collect all of the elements. It uses the ForkJoin pool to offer the fork/join work when executing in parallel. – John Vint May 11 '15 at 15:47
  • @JohnVint `toList` does not share a common variable between threads, so I understand how this works. It was just when you have a stateful condition. – user2336315 May 11 '15 at 15:56
  • Oh right. I am surprised that it works then. The accumulator is definitely executed in multiple threads. – John Vint May 11 '15 at 16:00
  • @user2336315 I just ran your example with `100000` elements and predictably the size wasn't the expected output of a non-paralllel execution. – John Vint May 11 '15 at 16:03
  • 3
    @JohnVint Yes this is was because my input file was too small. I'm actually happy that it doesn't produce the expected results, it kinds of confirms my knowledge about the execution. – user2336315 May 11 '15 at 16:30
  • So, as a thought experiment, supposing that you only had a stream of strings, not knowing its full size, how would you go about it? – biziclop May 12 '15 at 09:32
  • @biziclop I don't know. If the stream was sequential it's easy but due to the auto possible parallelism, I don't know. That's why I guess there is no `zip` method on the Stream API. – user2336315 May 12 '15 at 09:39
  • Well, you can definitely turn (or wrap) the stream into a sequential one, lazily decorate it with indexes, then switch back to parallel processing. But whether it's worth it or not depends on how expensive that further processing step is. – biziclop May 12 '15 at 09:55
  • @biziclop You mean `.sequential().map(...).parallel()`? If yes it won't work are those are intermediate operations so the stream will be `parallel` when applying the mapping when calling a final operation – user2336315 May 12 '15 at 10:23
  • Yes, but it doesn't matter because you do `.sequential().map().parallel().filter().map()...`, the final operation is just a simple collection to a list. – biziclop May 12 '15 at 10:42
  • @biziclop What I mean is you should think of this like a flag calling `sequential().map().parallel()` will execute the map call in parallel when the terminal operation is called, because the last call wins (in this case parallel). Just try with this http://pastebin.com/u7yGd6rY. You'll see that this not always yields the same result. Sometimes you have even that are printed when it should only print odd numbers and the index associated is not the one it should in some cases also. – user2336315 May 12 '15 at 10:54

2 Answers2

5

Passing a parallel() source stream into your collector is enough to break the logic because your shared state (counter) may be incremented from different tasks. You can verify that, because it is never returning the correct result for any finite stream input:

    Stream<String> lines = IntStream.range(1, 20000).mapToObj(i -> i + "");
    System.out.println(lines.isParallel());
    lines = lines.parallel();
    System.out.println(lines.isParallel());

    List<String> collected = lines.collect(oddLines());

    System.out.println(collected.size());

Note that for infinite streams (e.g. when reading from Files.lines()) you need to generate some significant amount of data in the stream, so it actually forks a task to run some chunks concurrently.

Output for me is:

false
true
12386

Which is clearly wrong.


As @Holger in the comments correctly pointed out, there is a different race that can happen when your collector is specifying CONCURRENT and UNORDERED, in which case they operate on a single shared collection across tasks (ArrayList::new called once per stream), where-as with only parallel() it will run the accumulator on a collection per task and then later combine the result using your defined combiner.

If you'd add the characteristics to the collector, you might run into the following result due to the shared state in a single collection:

false
true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 73
    at java.util.ArrayList.add(ArrayList.java:459)
    at de.jungblut.stuff.StreamPallel.lambda$0(StreamPallel.java:18)
    at de.jungblut.stuff.StreamPallel$$Lambda$3/1044036744.accept(Unknown Source)
    at java.util.stream.ReferencePipeline.lambda$collect$207(ReferencePipeline.java:496)
    at java.util.stream.ReferencePipeline$$Lambda$6/2003749087.accept(Unknown Source)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
    at de.jungblut.stuff.StreamPallel.main(StreamPallel.java:32)12386
Thomas Jungblut
  • 20,854
  • 6
  • 68
  • 91
  • I don't get it. You first state that it will never be executed concurrently and then you demonstrate me that in fact it is executed concurrently and that it fails. I tried to execute this in a loop 1_000_000 times and I didn't get an error (although the results are not the one expected). Is this normal? – user2336315 May 11 '15 at 15:55
  • 1
    Why do you insist on keeping that first line in your answer while [correctly saying otherwise](http://stackoverflow.com/questions/30171322/how-collectors-are-used-when-turning-the-stream-in-parallel#comment48450434_30171909) in the comments of another answer? – Holger May 11 '15 at 16:15
  • 1
    By reading your comments I thought you understood that without these characteristics, the streams are still processed in parallel but in a different mode. So if you didn’t, notice that this sentence is simply wrong. Without these two flags, accumulators can run in parallel but will use different containers which are then combined using the combiner function. When specifying `CONCURRENT` and `UNORDERED`, the accumulator may be invoked concurrently on *one* container without any combiner invocation. – Holger May 11 '15 at 16:24
  • 1
    You're right ;) I thought I was going through the sequential code part. – Thomas Jungblut May 11 '15 at 16:29
  • I still don't get how you had an IOOBE, it shouldnt happen. – user2336315 May 11 '15 at 16:31
  • 1
    @user2336315 welcome to the wonderful world of race conditions, I would think that an addition happened after the size was updated and before it was resized internally to hold the new element. – Thomas Jungblut May 11 '15 at 16:34
  • @Holger is that clearer now? please feel free to suggest an edit if you find an easier formulation. – Thomas Jungblut May 11 '15 at 16:48
  • 2
    It is. But note that you don’t even need large data sets to spot the problem. It’s only the file I/O case where large data is needed. I managed to produce wrong results with an expression as simple as `Stream.of("1", "2", "3").parallel().collect(oddLines())`… – Holger May 11 '15 at 16:51
  • 1
    @Holger true, the FileIO returns an unspecified size of the stream, so it will only decide to run it in parallel if the approx. size is estimated to be large enough. In the finite stream it can split the stream without a heuristic estimate. – Thomas Jungblut May 11 '15 at 16:53
  • @ThomasJungblut Hmm my understanding about the collector is that each thread gets a list via the supplier, then use the accumulator to accumulate the data in its list. Then each task is merged via the combiner function. So I don't see how it could produce an IOOBE because add and addAll should append the content to the end of each list. I tried to run your code 1 millions times, and I didn't get an error. – user2336315 May 11 '15 at 16:55
  • 1
    @user2336315 `parallel()` on the source stream *does not generate* a single shared collection, which might fail on `l.add(line)`. You need to provide the characteristics to the collector to expose that issue. – Thomas Jungblut May 11 '15 at 16:57
  • This is exactly how the `toList()` collector is implemented (except it has the identity finish characteristic but it shouldnt change the behavior), and it has always worked for me when turning a stream in parallel and then collecting the content via its `toList()` method. So I'm not really sure about this :/ – user2336315 May 11 '15 at 17:03
  • 1
    @user2336315 `toList()` does not provide CONCURRENT/UNORDERED, so the accumulator will run with different lists and the finalizer (`addAll()` over all the parallel resulting lists) will be ran sequentially- which is why it works. – Thomas Jungblut May 11 '15 at 17:06
  • @ThomasJungblut Sorry, I forgot to provide the characteristics.... My apologies :/ – user2336315 May 11 '15 at 17:09
  • @ThomasJungblut Just you just looked at the source code (which I find not very readable) to tell me this informations (speaking about the last comments on the shared collection) ? If not where :-) ? I don't see where it's mentioned in the javadoc page. – user2336315 May 11 '15 at 17:11
  • @user2336315 yep, read and stepped through the sources with a debugger. – Thomas Jungblut May 11 '15 at 17:12
  • Gosh... I would have been happy if you said no. The source code is really not very readable... – user2336315 May 11 '15 at 17:13
  • @user2336315 It isn't but I've seen worse. :) – biziclop May 12 '15 at 09:11
3

Actually it's just a coincidence that this collector work. It doesn't work with custom data source. Consider this example:

List<String> list = IntStream.range(0, 10).parallel().mapToObj(String::valueOf)
        .collect(oddLines());
System.out.println(list);

This produces always different result. The real cause is just because when BufferedReader.lines() stream is split by at least java.util.Spliterators.IteratorSpliterator.BATCH_UNIT number of lines which is 1024. If you have substantially bigger number of lines, it may fail even with BufferedReader:

String data = IntStream.range(0, 10000).mapToObj(String::valueOf)
    .collect(Collectors.joining("\n"));
List<String> list = new BufferedReader(new StringReader(data)).lines().parallel()
    .collect(oddLines());
list.stream().mapToInt(Integer::parseInt).filter(x -> x%2 != 0)
    .forEach(System.out::println);

Were collector working normally this should not print anything. But sometimes it prints.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334