1

I have a fixed pool with a single thread. When I submit new task I want to stop all old threads except last one.

private class MyPool extends ThreadPoolExecutor {

    public MyPool(long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        super(1, 1, keepAliveTime, unit, workQueue);
    }

    public boolean isReady() {
        return semaphore;
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        // Iterate all existed task and stop
        Future<T> future = super.submit(task);
        return future;
    }

    private volatile boolean semaphore;

}

Code of running task:

private class MyTask implements Runnable {

    private volatile boolean isRun = true;
    private int id;

    public MyTask(int id) {
        this.id = id;
    }

    public void stop() {
        isRun = false;
    }

    @Override
    public void run() {
        try {
            System.out.println("Start " + id);
            if (isRun) {
                Thread.sleep(1000);
                System.out.println("Stop " + id);
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
    }       
}

I created my own class, but It doesn't correctly work because semaphore effects on a new task as well. What is the best way to do it?

Mr. Polywhirl
  • 42,981
  • 12
  • 84
  • 132
mystdeim
  • 4,802
  • 11
  • 49
  • 77

1 Answers1

1

This ThreadPoolExecutor kills the running thread if a new Callable is submitted:

class MyPool extends ThreadPoolExecutor {
    private volatile Thread activeThread = null;
    private static final Field FutureTask$runner;

    static {
        try {
            FutureTask$runner = FutureTask.class.getDeclaredField("runner");
            FutureTask$runner.setAccessible(true);
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }

    private static Thread getThread(FutureTask<?> task) {
        try {
            return (Thread) FutureTask$runner.get(task);
        } catch (IllegalAccessException e) {
            throw new Error(e);
        }
    }

    public MyPool() {
        super(1, 1,
            //whatever here
            5000, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>());
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        if(activeThread != null) {
            activeThread.stop(); //kill
        }
        FutureTask<T> activeTask = (FutureTask<T>)super.submit(task);
        activeThread = getThread(activeTask); //steal thread reference for killing
        return activeTask;
    }
}
Binkan Salaryman
  • 3,008
  • 1
  • 17
  • 29