I was trying to use paralleStream with a custom ForkJoin pool, the task performs network calls. When I use the following style
pool.submit(() -> {
ioDelays.parallelStream().forEach(n -> {
induceRandomSleep(n);
});
}).get();
The time taken is almost 11 times compared to doing the same if I loop through and submit tasks one by one shown below:
for (final Integer num : ioDelays) {
ForkJoinTask<Integer> task = pool.submit(() -> {
return induceRandomSleep(num);
});
tasks.add(task);
}
int count = 0;
final List<Integer> returnVals = new ArrayList<>();
tasks.forEach(task -> {
try {
returnVals.add(task.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
Is the ForkJoinPool.common involved in someway if parallelStream is used?Here is the entire program to simulate both the styles above
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class FJTPExperiment {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool(200);
List<Integer> ioDelays = new ArrayList<>();
for (int i = 0; i <2000; i++) {
ioDelays.add( (int)(300 *Math.random() + 200));
}
int originalCount = 0;
for (Integer val : ioDelays) {
originalCount += val;
}
System.out.println("Expected " + originalCount);
System.out.println(Thread.currentThread().getName() + " ::::Number of threads in common pool :" + ForkJoinPool.getCommonPoolParallelism());
long beginTimestamp = System.currentTimeMillis();
pool.submit(() -> {
ioDelays.parallelStream().forEach(n -> {
induceRandomSleep(n);
});
}).get();
long endTimestamp = System.currentTimeMillis();
System.out.println("Took " + (endTimestamp - beginTimestamp) + " ms");
List<ForkJoinTask<Integer>> tasks = new ArrayList<>();
beginTimestamp = System.currentTimeMillis();
for (final Integer num : ioDelays) {
ForkJoinTask<Integer> task = pool.submit(() -> {
return induceRandomSleep(num);
});
tasks.add(task);
}
int count = 0;
final List<Integer> returnVals = new ArrayList<>();
tasks.forEach(task -> {
try {
returnVals.add(task.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
endTimestamp = System.currentTimeMillis();
for (Integer val : returnVals) {
count += val;
}
System.out.println("Count " + count);
System.out.println("Took " + (endTimestamp - beginTimestamp) + " ms");
}
public static int induceRandomSleep(int sleepInterval) {
System.out.println(Thread.currentThread().getName() + " ::::sleeping for " + sleepInterval + " ms");
try {
Thread.sleep(sleepInterval);
return sleepInterval;
} catch (InterruptedException e) {
e.printStackTrace();
return sleepInterval;
}
}
}