4

Recently a use case came up where I had to kick off several blocking IO tasks at the same time and use them in sequence. I did not want to change the order of operation on the consumption side and since this was a web app and these were short-lived tasks in the request path, I didn't want to bottleneck on a fixed threadpool and was looking to mirror the .Net async/await coding style. The FutureTask<> seemed ideal for this but required an ExecutorService. This is an attempt to remove the need for one.

Order of operation:

  1. Kick off tasks
  2. Do some stuff
  3. Consume Task 1
  4. Do some other stuff
  5. Consume Task 2
  6. Finish up

    ...

I wanted to spawn a new thread for each FutureTask<> but simplify the thread management. After run() completed, the calling thread could be joined.

The solution I came up with was:

package com.staples.search.util;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class FutureWrapper<T> extends FutureTask<T> implements Future<T> {

    private Thread myThread;

    public FutureWrapper(Callable<T> callable) {
    super(callable);
    myThread = new Thread(this);
    myThread.start();
    }

    @Override
    public T get() {
    T val = null;
    try {
        val = super.get();
        myThread.join();
    }
    catch (Exception ex)
    {
        this.setException(ex);
    }
    return val;
    }
}

Here are a couple of JUnit tests I created to compare FutureWrapper to CachedThreadPool.

@Test
public void testFutureWrapper() throws InterruptedException, ExecutionException {
long startTime  = System.currentTimeMillis();
int numThreads = 2000;

List<FutureWrapper<ValueHolder>> taskList = new ArrayList<FutureWrapper<ValueHolder>>();

System.out.printf("FutureWrapper: Creating %d tasks\n", numThreads);

for (int i = 0; i < numThreads; i++) {
    taskList.add(new FutureWrapper<ValueHolder>(new Callable<ValueHolder>() { 
    public ValueHolder call() throws InterruptedException {
            int value = 500;
            Thread.sleep(value);
            return new ValueHolder(value);
    }
    }));
}

for (int i = 0; i < numThreads; i++)
{
    FutureWrapper<ValueHolder> wrapper = taskList.get(i);
    ValueHolder v = wrapper.get();
}

System.out.printf("Test took %d ms\n", System.currentTimeMillis() - startTime);

Assert.assertTrue(true);
}

@Test
public void testCachedThreadPool() throws InterruptedException, ExecutionException {
long startTime  = System.currentTimeMillis();
int numThreads = 2000;

List<Future<ValueHolder>> taskList = new ArrayList<Future<ValueHolder>>();
ExecutorService esvc = Executors.newCachedThreadPool();

System.out.printf("CachedThreadPool: Creating %d tasks\n", numThreads);

for (int i = 0; i < numThreads; i++) {
    taskList.add(esvc.submit(new Callable<ValueHolder>() { 
    public ValueHolder call() throws InterruptedException {
            int value = 500;
            Thread.sleep(value);
            return new ValueHolder(value);
    }
    }));
}

for (int i = 0; i < numThreads; i++)
{
    Future<ValueHolder> wrapper = taskList.get(i);
    ValueHolder v = wrapper.get();
}

System.out.printf("Test took %d ms\n", System.currentTimeMillis() - startTime);

Assert.assertTrue(true);
}

class ValueHolder {
    private int value;
    public ValueHolder(int val) { value = val; }
    public int getValue() { return value; }
    public void setValue(int val) { value = val; }
}

Repeated runs puts the FutureWrapper at ~925ms vs. ~935ms for the CachedThreadPool. Both tests bump into OS thread limits.

Things seem to work and the thread spawning is pretty fast (10k threads with random sleeps in ~4s). Does anyone see something wrong with this implementation?

saarp
  • 1,931
  • 1
  • 15
  • 28
  • Must task2 start execution *after* task1 completes, or can they execute OK in parallel (and you just get their results in order from the main thread)? – Bohemian Apr 01 '15 at 22:05
  • @Bohemian - No, execution of tasks is independent. It's just the consumption that has a specific order. – saarp Apr 01 '15 at 22:22
  • 1
    "Does anyone see something wrong with this implementation?" JCIP has quite a bit to say about the problems of leaking references to partially-initialized instances from the constructor. – Andy Turner Apr 01 '15 at 22:27
  • 5
    In addition to @AndyTurner's comment, this seems like an awful lot of work. If you really don't want to be constrained by a shared thread pool (and a CachedThreadPool should be heaps faster than spawning your own threads), why not just use a per-call ThreadPool, instead of rolling your own? As a bonus thought: if your code is executing in a Java EE container, spawning your own threads in this manner is explicitly frowned upon: http://stackoverflow.com/questions/533783/why-spawning-threads-in-java-ee-container-is-discouraged – Sbodd Apr 01 '15 at 22:31
  • 1
    What does this solution buy you over, say, submitting callables to an executor created via `Executors.newCachedThreadPool`? – Andy Turner Apr 01 '15 at 22:31
  • @AndyTurner - I have found that the StaticThreadPool starves the process of threads under load and CachedThreadPool can be slow to ramp up/down the number of threads. I think the OS does a better job of managing the threads than the JVM which is why I'm exploring this route. Where is resource leakage an issue? – saarp Apr 01 '15 at 22:51
  • 3
    If you really want to re-invent the wheel, make sure it is at least as round as the existing one... – isnot2bad Apr 01 '15 at 23:06
  • @isnot2bad - Seems like this happens a lot in Java anyways. – saarp Apr 02 '15 at 08:10
  • 1
    @saarp Your test code is invalid. You simulate CPU load by calling `Thread.sleep(...)`. This is not valid here, as thousands of threads can sleep in parallel, but they cannot all work in parallel. Use a real CPU load here! – isnot2bad Apr 02 '15 at 09:38

1 Answers1

6

Creating and starting thousands of threads is usually a very bad idea, because creating threads is expensive, and having more threads than processors will bring no performance gain but cause thread-context-switches that consume CPU-cycles instead. (See notes very below)

So in my opinion, your test-code contains a big error in reasoning: You are simulating CPU load by calling Thread.sleep(500). But in fact, this does not really cause the CPU to do anything. It is possible to have many sleeping threads in parallel - no matter how many processors you have, but it is not possible to run more CPU consuming tasks than processors in (real) parallel.

If you simulate real CPU load, you'll see, that more threads will just increase the overhead due to thread-management, but not decrease the total processing time.


So let's compare different ways to run CPU consuming tasks in parallel!

First, let's assume we've got some CPU consuming task that always takes the same amount of time:

public Integer task() throws Exception {
    // do some computations here (e.g. fibonacchi, primes, cipher, ...)
    return 1;
}

Our goal is to run this task NUM_TASKS times using different execution strategies. For our tests, we set NUM_TASKS = 2000.

(1) Using a thread-per-task strategy

This strategy is very comparable to your approach, with the difference, that it is not necessary to subclass FutureTask and fiddle around with threads. Instead, you can use FutureTask directly as it is both, a Runnable and a Future:

@Test
public void testFutureTask() throws InterruptedException, ExecutionException {
    List<RunnableFuture<Integer>> taskList = new ArrayList<RunnableFuture<Integer>>();

    // run NUM_TASKS FutureTasks in NUM_TASKS threads
    for (int i = 0; i < NUM_TASKS; i++) {
        RunnableFuture<Integer> rf = new FutureTask<Integer>(this::task);
        taskList.add(rf);
        new Thread(rf).start();
    }

    // now wait for all tasks
    int sum = 0;
    for (Future<Integer> future : taskList) {
        sum += future.get();
    }

    Assert.assertEquals(NUM_TASKS, sum);
}

Running this test with JUnitBenchmarks (10 test iterations + 5 warmup iterations) yields the following result:

ThreadPerformanceTest.testFutureTask: [measured 10 out of 15 rounds, threads: 1 (sequential)]
 round: 0.66 [+- 0.01], round.block: 0.00 [+-
0.00], round.gc: 0.00 [+- 0.00], GC.calls: 66, GC.time: 0.06, time.total: 10.59, time.warmup: 4.02, time.bench: 6.57

So one round (execution time of method task()) is about 0.66 seconds.

(2) Using a thread-per-cpu strategy

This strategy uses a fixed number of threads to execute all tasks. Therefore, we create an ExecutorService via Executors.newFixedThreadPool(...). The number of threads should be equal to the number of CPUs (Runtime.getRuntime().availableProcessors()), which is 8 in my case.

To be able to track the results, we simply use a CompletionService. It automatically takes care of the results - no matter in which order they arrive.

@Test
public void testFixedThreadPool() throws InterruptedException, ExecutionException {
    ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<Integer> ecs = new ExecutorCompletionService<Integer>(exec);

    // submit NUM_TASKS tasks
    for (int i = 0; i < NUM_TASKS; i++) {
        ecs.submit(this::task);
    }

    // now wait for all tasks
    int sum = 0;
    for (int i = 0; i < NUM_TASKS; i++) {
        sum += ecs.take().get();
    }

    Assert.assertEquals(NUM_TASKS, sum);
}

Again we run this test with JUnitBenchmarks with the same settings. The results are:

ThreadPerformanceTest.testFixedThreadPool: [measured 10 out of 15 rounds, threads: 1 (sequential)]
 round: 0.41 [+- 0.01], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 22, GC.time: 0.04, time.total: 6.59, time.warmup: 2.53, time.bench: 4.05

Now one round is only 0.41 seconds (almost 40% runtime reduction)! Also not the fewer GC calls.

(3) Sequential execution

For comparison we should also measure the non-parallelized execution:

@Test
public void testSequential() throws Exception {
    int sum = 0;
    for (int i = 0; i < NUM_TASKS; i++) {
        sum += this.task();
    }

    Assert.assertEquals(NUM_TASKS, sum);
}

The results:

ThreadPerformanceTest.testSequential: [measured 10 out of 15 rounds, threads: 1 (sequential)]
 round: 1.50 [+- 0.01], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+-0.00], GC.calls: 244, GC.time: 0.15, time.total: 22.81, time.warmup: 7.77, time.bench: 15.04

Note that 1.5 seconds is for 2000 executions, so a single execution of task() takes 0.75 ms.

Interpretation

According to Amdahl's law, the time T(n) to execute an algorithm on n processors, is:

T(n)=T(1)*(B+(1-B)/n)

B is the fraction of the algorithm that cannot be parallelized and must run sequentially. For pure sequential algorithms, B is 1, for pure parallel algorithms it would be 0 (but this is not possible as there is always some sequential overhead).

T(1) can be taken from our sequential execution: T(1) = 1.5 s

If we had no overhead (B = 0), on 8 CPUs we'd got: T(8) = 1.5 / 8 = 0.1875 s.

But we do have overhead! So let's compute B for our two strategies:

  • B(thread-per-task) = 0.36
  • B(thread-per-cpu) = 0.17

In other words: The thread-per-task strategy has twice the overhead!

Finally, let's compute the speedup S(n). That's the number of times, an algorithm runs faster on n CPUs compared to sequential execution (S(1) = 1):

S(n)=1/(B+(1-B)/n)

Applied to our two strategies, we get:

  • thread-per-task: S(8) = 2.27
  • thread-per-cpu: S(8) = 3.66

So the thread-per-cpu strategy has about 60% more speedup than thread-per-task.

TODO

We should also measure and compare memory consumption.


Note: This all is only true for CPU consuming tasks. If instead, your tasks perform lots of I/O related stuff, you might benefit from having more threads than CPUs as waiting for I/O will put a thread in idle mode, so the CPU can execute another thread meanwhile. But even in this case, there is a reasonable upper limit which is usually far below 2000 on a PC.

isnot2bad
  • 24,105
  • 2
  • 29
  • 50
  • 1
    Some say that m+n threads is optimal where m is the number of CPUs, and n is a smallish number (anywhere between 1 and m depending who you ask.) The reason being, that sooner or later, one of your threads is going to be blocked by something (e.g., paging) and when that happens, another application thread will be ready to take its place. – Solomon Slow Apr 02 '15 at 13:25
  • Yes, this was intended for IO-bound, not CPU-bound workloads where threads are spending most of their time blocking and are generally short-lived. I'm not sure I agree that thread creation is expensive. It seems to happen quickly and have low overhead (both mem+CPU). – saarp Apr 03 '15 at 00:19