I am running an experiment on varying thread count for java 8 parallel stream's reduction operation: summation over 11.5 million numbers. I run it on 8 core intel xeon processor. I do not see much variation in the total execution time and total CPU time, when I vary the number of threads from 16 to 1. I do get speed-up over sequential version which uses 'streams' instead of `parallel streams'. I want to understand the correlation between no of threads and the speed-up over sequential reduction.
Can somebody help me with the reasoning for this please? Is my code incorrect somewhere?
Source code is here, line 74 performs parallel reduction.
Relevant part of the code is pasted below as well:
class RedOperator implements BinaryOperator<MonResult>{
@Override
public MonResult apply(MonResult t, MonResult u) {
// TODO Auto-generated method stub
if(t != null && u != null)
t.count = t.count + u.count;
return t;
}
}
class FOFinisher implements Function<ConcurrentHashMap<ArrayList<String>, ArrayList<MonResult>>, ArrayList<MonResult>>{
protected MonResult id;
protected boolean isParallel;
public FOFinisher(boolean isparallel){
id = new MonResult();
id.count = 0;
this.isParallel = isparallel;
}
public long getJVMCpuTime() {
long lastProcessCpuTime = 0;
try {
if (ManagementFactory.getOperatingSystemMXBean() instanceof OperatingSystemMXBean) {
lastProcessCpuTime=((com.sun.management.OperatingSystemMXBean)ManagementFactory.getOperatingSystemMXBean()).getProcessCpuTime();
}
}
catch ( ClassCastException e) {
System.out.println(e.getMessage());
}finally{
return lastProcessCpuTime;
}
}
@Override
public ArrayList<MonResult> apply(ConcurrentHashMap<ArrayList<String>, ArrayList<MonResult>> t) {
// TODO Auto-generated method stub
long beg = System.nanoTime();
long begCPU = getJVMCpuTime();
ArrayList<MonResult> ret;
RedOperator x = new RedOperator();
if(isParallel){
ret = new ArrayList<MonResult>(t.values().stream().map((alist)->{
return alist.parallelStream().reduce(id,x);
}).collect(Collectors.toList()));
}
else{
ret = new ArrayList<MonResult>(t.values().stream().map((alist)->{
return alist.stream().reduce(id,x);
}).collect(Collectors.toList()));
}
long end = System.nanoTime();
long endCPU = getJVMCpuTime();
System.out.println("Exec Time:" + TimeUnit.MILLISECONDS.convert((end-beg),TimeUnit.NANOSECONDS));
System.out.println("CPU Time : " + TimeUnit.MILLISECONDS.convert((endCPU-begCPU),TimeUnit.NANOSECONDS));
return ret;
}
}