I am trying to test running a collector in parallel with the following codes. The "combiner" method is never called, so it looks like the collector is still run sequentially. Why is that so? How do I make it parallel?
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
public class TestCollector {
private static class Staff {
private String staffFullName;
private int staffID;
public String getStaffFullName() {
return staffFullName;
}
public void setStaffFullName(String staffFullName) {
this.staffFullName = staffFullName;
}
public int getStaffID() {
return staffID;
}
public void setStaffID(int staffID) {
this.staffID = staffID;
}
}
private static class StaffCollector implements Collector<Staff, Queue<Staff>,Staff[]> {
@Override
public Supplier<Queue<Staff>> supplier() {
return ConcurrentLinkedQueue::new;
}
@Override
public BiConsumer<Queue<Staff>, Staff> accumulator() {
return (list, staff) -> list.add(staff);
}
@Override
public BinaryOperator<Queue<Staff>> combiner() {
return (left, right) -> {
System.out.print("combiner called");
left.addAll(right);
return left;
};
}
@Override
public Function<Queue<Staff>, Staff[]> finisher() {
return staffList -> {
System.out.println("Number of staff: " + staffList.size());
Staff[] staffArray = staffList.toArray(new Staff[0]);
return staffArray;
};
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED);
}
}
public static void main(String[] args) throws Exception {
ArrayList<Staff> staffList = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
Staff staff = new Staff();
staff.setStaffFullName("Staff " + i);
staff.setStaffID(i);
staffList.add(staff);
}
staffList.parallelStream().collect(new StaffCollector());
}
}