2

What is the best practice approach to launch a pool of 1000's of tasks (where up to 4 should be able to execute in parallel) and automatically timeout them if they take more than 3 seconds (individually)?

While I found that ExecutorService seems to be helpful (see SSCE from another post below), I don't see how to make this work for multiple tasks running in parallel (as the future.get(3, TimeUnit.SECONDS) is executing on the same thread than the one launching the tasks, hence no opportunity to launch multiple tasks in parallel):

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Test {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new Task());

        try {
            System.out.println("Started..");
            System.out.println(future.get(3, TimeUnit.SECONDS));
            System.out.println("Finished!");
        } catch (TimeoutException e) {
            future.cancel(true);
            System.out.println("Terminated!");
        }

        executor.shutdownNow();
    }
}

class Task implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(4000); // Just to demo a long running task of 4 seconds.
        return "Ready!";
    }
}

Thanks!

Tom
  • 1,375
  • 3
  • 24
  • 45
  • Well, don't use `Executors.newSingleThreadExecutor()` - use `Executors.newFixedThreadPool(4)`. And submit them all before you start trying to call `Future.get()`. – Andy Turner May 23 '16 at 20:07

2 Answers2

1

If you have to monitor each task to kill it when it exceeds the timeout period, either

  1. the task itself has to keep track of time and quit appropriately, OR
  2. you have to create a second watchdog thread for every task. The watchdog thread sets a timer and sleeps, waking up after the timeout interval expires and then terminating the task if it's still running.
Jim Garrison
  • 85,615
  • 20
  • 155
  • 190
  • Option 1 is not feasible as the timeout is in case the thread get frozen (due to external dependencies). What would be the best approach for option 2? – Tom May 23 '16 at 20:28
  • I'm not sure how to explain option 2 any better. When you submit the task you must create a second thread (maybe in another executor) whose job it is to sleep for the timeout period, wake up, and if the primary task is still running, terminate it. – Jim Garrison May 23 '16 at 20:30
  • I figured out that Timer and TimerTask are best to use here. But where I'm unclear is how to kill the thread as Thread.stop() seems deprecated. – Tom May 23 '16 at 20:54
  • Ah, there's the rub. You have to set a termination flag in the thread and interrupt it. If it's blocked non-interruptibly there's not much you can do. Maybe you'll have to use non-blocking NIO operations, which complicates things. [Here's the canonical answer on StackOverflow](http://stackoverflow.com/q/671049/18157) – Jim Garrison May 23 '16 at 20:58
-1

This is a tricky one. Here’s what I came up with:

public class TaskQueue<T> {
    private static final Logger logger =
        Logger.getLogger(TaskQueue.class.getName());

    private final Collection<Callable<T>> tasks;

    private final int maxTasks;

    private int addsPending;

    private final Collection<T> results = new ArrayList<T>();

    private final ScheduledExecutorService executor;

    public TaskQueue() {
        this(4);
    }

    public TaskQueue(int maxSimultaneousTasks) {
        maxTasks = maxSimultaneousTasks;
        tasks = new ArrayDeque<>(maxTasks);
        executor = Executors.newScheduledThreadPool(maxTasks * 3);
    }

    private void addWhenAllowed(Callable<T> task)
    throws InterruptedException,
           ExecutionException {

        synchronized (tasks) {
            while (tasks.size() >= maxTasks) {
                tasks.wait();
            }
            tasks.add(task);

            if (--addsPending <= 0) {
                tasks.notifyAll();
            }
        }

        Future<T> future = executor.submit(task);
        executor.schedule(() -> future.cancel(true), 3, TimeUnit.SECONDS);
        try {
            T result = future.get();
            synchronized (tasks) {
                results.add(result);
            }
        } catch (CancellationException e) {
            logger.log(Level.FINE, "Canceled", e);
        } finally {
            synchronized (tasks) {
                tasks.remove(task);
                if (tasks.isEmpty()) {
                    tasks.notifyAll();
                }
            }
        }
    }

    public void add(Callable<T> task) {
        synchronized (tasks) {
            addsPending++;
        }

        executor.submit(new Callable<Void>() {
            @Override
            public Void call()
            throws InterruptedException,
                   ExecutionException {
                addWhenAllowed(task);
                return null;
            }
        });
    }

    public Collection<T> getAllResults()
    throws InterruptedException {
        synchronized (tasks) {
            while (addsPending > 0 || !tasks.isEmpty()) {
                tasks.wait();
            }
            return new ArrayList<T>(results);
        }
    }

    public void shutdown() {
        executor.shutdown();
    }
}

I suspect it could be done more cleanly using Locks and Conditions instead of synchronization.

VGR
  • 40,506
  • 4
  • 48
  • 63