2

I am trying to test running a collector in parallel with the following codes. The "combiner" method is never called, so it looks like the collector is still run sequentially. Why is that so? How do I make it parallel?

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

public class TestCollector {

    private static class Staff {

        private String staffFullName;

        private int staffID;

        public String getStaffFullName() {
            return staffFullName;
        }

        public void setStaffFullName(String staffFullName) {
            this.staffFullName = staffFullName;
        }

        public int getStaffID() {
            return staffID;
        }

        public void setStaffID(int staffID) {
            this.staffID = staffID;
        }
    }

    private static class StaffCollector implements Collector<Staff, Queue<Staff>,Staff[]> {

        @Override
        public Supplier<Queue<Staff>> supplier() {
            return ConcurrentLinkedQueue::new;
        }

        @Override
        public BiConsumer<Queue<Staff>, Staff> accumulator() {
            return (list, staff) -> list.add(staff);
        }

        @Override
        public BinaryOperator<Queue<Staff>> combiner() {
            return (left, right) -> {
                System.out.print("combiner called");
                left.addAll(right);
                return left;
            };
        }

        @Override
        public Function<Queue<Staff>, Staff[]> finisher() {
            return staffList -> {
                System.out.println("Number of staff: " + staffList.size());
                Staff[] staffArray = staffList.toArray(new Staff[0]);
                return staffArray;
            };
        }

        @Override
        public Set<Characteristics> characteristics() {
            return EnumSet.of(Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED);
        }
    }
    public static void main(String[] args) throws Exception {
        ArrayList<Staff> staffList = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            Staff staff = new Staff();
            staff.setStaffFullName("Staff " + i);
            staff.setStaffID(i);
            staffList.add(staff);
        }
        staffList.parallelStream().collect(new StaffCollector());
    }

}
Naman
  • 27,789
  • 26
  • 218
  • 353
user3573403
  • 1,780
  • 5
  • 38
  • 64
  • It *does* run in parallel, otherwise the order of the instances in the array would follow the insertion to the original list. – Nikolas Charalambidis Aug 27 '20 at 08:37
  • How can it be? The System.out.print("combiner called"); in combiner() is not even printed. I can only see the println() from finisher(). – user3573403 Aug 27 '20 at 08:50
  • Did you see the content of the `Staff[]`? – Nikolas Charalambidis Aug 27 '20 at 08:57
  • Yes, I did. It's no longer in order, which means that it is run in parallel. If I remove Characteristics.CONCURRENT or Collector.Characteristics.UNORDERED, then combiner() would be called, and the order of Staff[] would be the same as insertion into original list, which means that it is run sequentially. But I thought combiner() is used only when it is run in parallel, but it seems to be the opposite. – user3573403 Aug 27 '20 at 09:03
  • I am not entirely sure but are you confusing the `Collector` characteristics for the `Spliterator` characteristics? Characteristics of a Collector as the doc states are used to optimize reduction implementation. If you dive in for the implementaiton of `collect`, you can see a check around those `ORDERED` && `CONCURRENT` characteristics to not involve the `combiner`. – Naman Aug 27 '20 at 09:30
  • Remove the concurrent characteristic. See the linked duplicate – Michael Aug 27 '20 at 09:36
  • 1
    See [this comparison of `toMap` and `toConcurrentMap`](https://stackoverflow.com/a/41045442/2711488) which explain the difference between parallel processing with `CONCURRENT` and without. Note that it is rarely necessary to implement the `Collector` interface manually. You could simply use `Collector.of(ConcurrentLinkedQueue::new, (list, staff) -> list.add(staff), (left, right) -> { System.out.print("combiner called"); left.addAll(right); return left; }, staffList -> staffList.toArray(new Staff[0]), ⦗Characteristics.CONCURRENT,⦘ Characteristics.UNORDERED)` to achieve the same. – Holger Aug 27 '20 at 12:09

0 Answers0