Definitely not a good option to manage so many threads in a single machine by a single application unless it is a 16+ core machine or higher.
Consider factors like is your work I/O intensive or CPU intensive and make appropriate choices. Read here and here
I usually use
int maxThreadCount = Runtime.getRuntime().availableProcessors();
ExecutorService executor =
new ThreadPoolExecutor(
0, maxThreadCount - 1,
1, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(maxThreadCount * 2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
Now do the processing by adding your tasks and wait until everything is finished:
while (moreTaskstoDo) {
Callable c =...
executor.submit(c);
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
Now with Java 8+ in place you could think of doing it more efficiently.
I did a small benchmarking myself. The below code is inspired by article and you can read more about Java 8 Handbook
Consider this function of finding a total.
//approach 1: old school
private static void findingTotalOldSchool() {
long total = 0;
long start = System.nanoTime();
for (long i = 1; i < LIMIT; i++) {
total = total + (i * FACTOR);
}
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
public static Range range(int max) {
return new Range(max);
}
// Approach 2: custom iterator
private static void findingTotalCustomIterator() {
long total = 0;
long start = System.nanoTime();
for (long i : range(LIMIT)) {
total = total + i * FACTOR;
}
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
// Approach 3: using streams
private static void findingTotalStream() {
long start = System.nanoTime();
long total = 0;
total = LongStream.range(1, LIMIT)
.map(t -> t * FACTOR)
.sum();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
// Approach 4: using parallel streams
private static void findingTotalParallelStream() {
long start = System.nanoTime();
long total = 0;
total = LongStream.range(1, LIMIT)
.parallel()
.map(t -> t * FACTOR)
.sum();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
// Approach 5: Using Completable Futures alone
private static void findingTotalCFS() {
long start = System.nanoTime();
List<CompletableFuture<Long>> futures =
LongStream.range(1, LIMIT).boxed()
.map(t -> CompletableFuture.supplyAsync(() -> t * FACTOR ))
.collect(Collectors.toList());
//Code here --- could run ahead hence joining on futures
long total = futures.stream().map(CompletableFuture::join).mapToLong(t->t).sum();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Futures used: "+futures.size());
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
// Approach 6: Using Completable Futures managed by Executor Service
private static void findingTotalCFSE() {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
List<CompletableFuture<Long>> futures =
LongStream.range(1, LIMIT).boxed()
.map(t -> CompletableFuture.supplyAsync(() -> {
return t * FACTOR;
}, executor))
.collect(Collectors.toList());
long total = futures.stream().map(CompletableFuture::join).mapToLong(t->t).sum();
executor.shutdownNow();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Futures used: "+futures.size());
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
// Approach 7: Using Executor service alone
private static void findingTotalES() {
long start = System.nanoTime();
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
long total = LongStream.
range(1, LIMIT)
.boxed()
.map((i)->executorService.submit(new Operation(i, FACTOR)))
.map((Future<Long> future)-> {
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}catch (ExecutionException e) {
// Extract the actual exception from its wrapper
Throwable t = e.getCause();
}
return 0;
})
.mapToLong(t->t.longValue())
.sum();
executorService.shutdown();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
class Operation implements Callable<Long> {
long i; int j;
Operation(long i, int j) { this.i = i; this.j = j; }
@Override
public Long call() {
return i * j;
}
}
class Range implements Iterable<Integer> {
private int limit;
public Range(int limit) {
this.limit = limit;
}
@Override
public Iterator<Integer> iterator() {
final int max = limit;
return new Iterator<Integer>() {
private int current = 0;
@Override
public boolean hasNext() {
return current < max;
}
@Override
public Integer next() {
if (hasNext()) {
return current++;
} else {
throw new NoSuchElementException("Range reached the end");
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("Can't remove values from a Range");
}
};
}
}
We ran test runs with 2 sets of data. Each test should be run individually and not as part of a single whole run (as JVM optimizes and the result might vary).
//first run
final static int FACTOR = 1;
final static int LIMIT = 10000;
//second run
final static int FACTOR = 9876;
final static int LIMIT = 1000000;
System.out.println("-----Traditional Loop-----");
findingTotalOldSchool();
// 0 ms
// 4 ms
System.out.println("-----Custom Iterator----");
findingTotalCustomIterator();
// 1 ms
// 15 ms
System.out.println("-----Streams-----");
findingTotalStream();
// 38 ms
// 33 ms
System.out.println("-----Parallel Streams-----");
findingTotalParallelStream();
// 29 ms
// 64 ms
System.out.println("-----Completable Futures with Streams-----");
findingTotalCFS();
// 77 ms
// 635 ms
System.out.println("-----Executor Service with Streams-----");
findingTotalES();
// 323 ms
// 12632 ms
System.out.println("-----Completable Futures with Executor Service with Streams-----");
findingTotalCFSE();
// 77 ms
// 844 ms
Observations:
- Traditional loop is fast most of the cases.
- Use parallel streams when performance or IO operations are involved.
- For simple iterations
(involving substitutions or simple numeric calculations) go for
traditional loop.
- Completable Futures with Executor Service is flexible and a go to
option when you need more control on the number of threads, etc.
If what your doing is complex go for higher order systems that helps you to distribute it horizontally like Akka or Vert.x