6

Suppose I have this custom collector :

  public class CustomToListCollector<T> implements Collector<T, List<T>, List<T>> {

     @Override
     public Supplier<List<T>> supplier() {
         return ArrayList::new;
     }

     @Override
     public BiConsumer<List<T>, T> accumulator() {
         return List::add;
     }

     @Override
     public BinaryOperator<List<T>> combiner() {
         return (l1, l2) -> {
            l1.addAll(l2);
            return l1;
         };
     }

     @Override
     public Function<List<T>, List<T>> finisher() {
         return Function.identity();
     }

     @Override
     public Set<java.util.stream.Collector.Characteristics> characteristics() {
         return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
     }
}

This is exactly the Collectors#toList implementation with one minor difference: there's also UNORDERED characteristics added.

I would assume that running this code :

    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

    for (int i = 0; i < 100_000; i++) {
        List<Integer> result = list.parallelStream().collect(new CustomToListCollector<>());
        if (!result.equals(list)) {
            System.out.println(result);
            break;
        }
    }

should actually produce some result. But it does not.

I've looked under the hood a bit. ReferencePipeline#collect first checks if the stream is parallel, if the collector is concurrent and if the collector is unordered. Concurrent is missing, so it delegates to a method evaluate by creating a TerminalOp out of this collector. This under the hood is a ReducingSink, that actually cares if the collector is unordered or not:

         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }

        @Override
        public int getOpFlags() {
            return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                   ? StreamOpFlag.NOT_ORDERED
                   : 0;
        }
    }; 

I have not debugged further since it gets pretty complicated fast.

Thus may be there is a shortcut here and someone could explain what I am missing. It is a parallel stream that collects elements in a non-concurrent unordered collector. Shouldn't there be no order in how the threads combine the results together? If not, how is the order imposed here (by whom)?

Eugene
  • 117,005
  • 15
  • 201
  • 306
  • 3
    According to the [Stream.collect](http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#collect-java.util.stream.Collector-) contract, the fact that the collector is not concurrent, not doing any unsafe operations to make it unordered and [this](http://stackoverflow.com/a/29711206/1960816) and [this](http://stackoverflow.com/a/29713386/1960816) answer, it seems normal that the initial intrinsic encounter order is preserved. Under the hood, the `ArrayListSpliterator` will be used and everything is carefully arranged to preserve encounter order from the source to the destination. – Nick Vanderhoven Nov 30 '16 at 09:43
  • 2
    “should actually produce some result”…based on what? Declaring `UNORDERED` implies that the order is irrelevant so always returning the original order is no contradiction. – Holger Nov 30 '16 at 09:51
  • 2
    By the way, there is no need to manually implement the `Collector` interface, you can simply use `Collector.of(…)` specifying the functions and characteristics. But for injecting a characteristic into an existing collector like `toList()`, you even don’t need to re-implement the functions. See [here](http://stackoverflow.com/a/37116251/2711488). – Holger Nov 30 '16 at 10:46
  • @Holger your first comment makes a lot of sense, thank you. I really hoped that setting UNORDERED will *actually* make it unordered. Those threads would not need to somehow sync to preserve that order, since, well, I said that I don't care about the order. Another way to put it (and I assume this is actually done in StreamOpFlag) is : the input is ORDERED, the collect is UNORDERED, let's not clear the flag, but keep whatever we have in the stream, since you don't care about it anyway. – Eugene Nov 30 '16 at 13:01
  • @Holger I sound like I'm ranting here :) and I am honestly not. Is it fair to say that this might change and the resulting List might not be ordered in a future implementation? I mean you can't really rely on this, right? Also please make that an answer. – Eugene Nov 30 '16 at 13:05
  • @NickVanderhoven thx for the comment, but the ArraySpliterator's ordered flag will be removed from the resulting stream. See my sort-of-answer. You actually triggered me to look into it. – Eugene Dec 01 '16 at 10:13

2 Answers2

7

Note that the result is the same when using list .parallelStream() .unordered() .collect(Collectors.toList()), in either case, the unordered property is not used within the current implementation.

But let’s change the setup a little bit:

List<Integer> list = Collections.nCopies(10, null).stream()
    .flatMap(ig -> IntStream.range(0, 100).boxed())
    .collect(Collectors.toList());
List<Integer> reference = new ArrayList<>(new LinkedHashSet<>(list));

for (int i = 0; i < 100_000; i++) {
    List<Integer> result = list.parallelStream()
      .distinct()
      .collect(characteristics(Collectors.toList(), Collector.Characteristics.UNORDERED));
    if (!result.equals(reference)) {
        System.out.println(result);
        break;
    }
}

using the characteristics collector factory of this answer
The interesting outcome is that in Java 8 versions prior to 1.8.0_60, this has a different outcome. If we use objects with distinct identities instead of the canonical Integer instance, we could detect that in these earlier versions, not only the order of the list differs, but that the objects in the result list are not the first encountered instances.

So the unordered characteristic of a terminal operation was propagated to the stream, affecting the behavior of distinct(), similar to that of skip and limit, as discussed here and here.

As discussed in the second linked thread, the back-propagation has been removed completely, which is reasonable when thinking about it a second time. For distinct, skip and limit, the order of the source is relevant and ignoring it just because the order will be ignored in subsequent stages is not right. So the only remaining stateful intermediate operation that could benefit from back-propagation would be sorted, which would be rendered obsolete when the order is being ignored afterwards. But combining sorted with an unordered sink is more like a programming error anyway…

For stateless intermediate operations the order is irrelevant anyway. The stream processing works by splitting the source into chunks, apply all stateless intermediate operations on their elements independently and collecting into a local container, before merging into the result container. So the merging step is the only place, where respecting or ignoring the order (of the chunks) will have an impact on the result and perhaps on the performance.

But the impact isn’t very big. When you implement such an operation, e.g. via ForkJoinTasks, you simply split a task into two, wait for their completion and merge them. Alternatively, a task may split off a chunk into a sub-task, process its remaining chunk in-place, wait for the sub-task and merge. In either case, merging the results in order comes naturally due to the fact that the initiating task has hands on references to the adjacent tasks. To merge with different chunks instead, the associated sub-tasks first have to be found somehow.

The only benefit from merging with a different task would be that you can merge with the first completed task, if the tasks need different time to complete. But when waiting for a sub-task in the Fork/Join framework, the thread won’t be idle, the framework will use the thread for working on other pending tasks in-between. So as long as the main task has been split into enough sub-tasks, there will be full CPU utilization. Also, the spliterators attempt to split into even chunks to reduce the differences between the computing times. It’s very likely that the benefit of an alternative unordered merging implementation doesn’t justify the code duplication, at least with the current implementation.

Still, reporting an unordered characteristic allows the implementation to utilize it when beneficial and implementations can change.

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765
  • I think I said the same thing in my answer : *Bottom-line: that UNORDERED flag is preserved in the resulting Stream, but internally nothing is done with it. They could probably, but they choose not to.* – Eugene Dec 02 '16 at 12:52
  • 3
    Exactly, the bottom line is the same. I only went into detail regarding the potential benefits, the history and why the potential performance improvement is considered not worth additional code in the current implementation. I’d expect either, a major rewrite or the appearance of additional stateful operations, to come before this will change (again). – Holger Dec 02 '16 at 13:01
  • yes. +1 and will accept your answer. U made me happy (again) ;) thx a lot – Eugene Dec 02 '16 at 13:02
2

This is not an actual answer per-se, but if I add more code and comments, it will get too many I guess.

Here is another interesting thing, actually it made me realize I was wrong in comments.

A spliterator flags need to be merged with all the terminal operation flags and intermediate ones.

Our spliterator's flags are (as reported by StreamOpFlags) : 95; this can be debugged from AbstractSpliterator#sourceSpliterator(int terminalFlags).

That is why the line below reports true:

 System.out.println(StreamOpFlag.ORDERED.isKnown(95)); // true

At the same time our terminal collector's characteristics are 32:

System.out.println(StreamOpFlag.ORDERED.isKnown(32)); // false

The result:

int result = StreamOpFlag.combineOpFlags(32, 95); // 111
System.out.println(StreamOpFlag.ORDERED.isKnown(result)); // false

If you think about this, it makes complete sense. List has order, my custom collector does not => order is not preserved.

Bottom-line: that UNORDERED flag is preserved in the resulting Stream, but internally nothing is done with it. They could probably, but they choose not to.

Eugene
  • 117,005
  • 15
  • 201
  • 306