0

I have a fixed thread pool in my java web application.

 App.aes =  Executors.newFixedThreadPool(3);

It was used for executing asynchronous tasks. Those tasks can take hours to finish. So, if I need to do a application reloading, I need to check if there are asynchronous tasks running, if yes I need to store those tasks in the waiting queue to somewhere and restore them after a application reloading.

I did some test:

public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(1);
        for(int i=0; i< 5; i++){
            final int c = i + 1;
            es.submit(new Runnable(){
                @Override
                public void run() {
                    System.out.println("current running " + c);
                    try {
                        Thread.sleep(10000);    // 10 sec
                    } catch (InterruptedException e) {
                        System.out.println("interrupted  " + c);
                    }
                }

            });
        }

        Thread.sleep(15000);
        List<Runnable> rems = es.shutdownNow();
        System.out.println("Remaining " + rems.size());
        System.out.println("--------- restore remaining task ----------");

        es = Executors.newFixedThreadPool(1);
        for(Runnable r : rems){
            es.submit(r);
        }
    }

the output is:

current running 1
current running 2
interrupted  2
Remaining 4
--------- restore remaining task ----------
current running 3
current running 4
current running 5
current running 6

And this is not the result I am looking for. The interrupted tasks will not be recovered. And the API docs proved this:

Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution. 

how can we safely store and restore unfinished tasks in a java thread pool? My real task is fail-safe which means each task can be rerun again and again. And the order of being executed does not matter.

My web application is deployed on weblogic, and the thread pool is started up by a servlet, a ServletContextListener is registerd to shutdown the thread pool.


I am expecting two options:

  • option 1. No need to interrupt active tasks, wait them finish, and then save all those waiting tasks then shutdown thread pool.

    pros: no need to worry about any unpredictable condition caused by interruption.
    cons: this will need to wait all actively running task to be finished. depend on thread pool size and time cost of each task, the waiting time can be long.

  • option 2. Interrupt active task, save all unfinished tasks then
    shutdown thread pool.

option 1 is the idea solution for me.

WestFarmer
  • 669
  • 8
  • 27
  • Do you need to store progress or can your tasks just re-run. And if you need to store progress of your tasks, can you actually do that in your code? – Yogesh_D Jun 27 '16 at 12:02
  • @Yogesh_D just re-run is enough for me. – WestFarmer Jun 27 '16 at 12:04
  • 1
    I would 1) save all tasks when you add them to the pool 2) remove them as the last thing it does when successful. This way there is nothing to be done when a system dies. – Peter Lawrey Jun 27 '16 at 12:40

1 Answers1

1

How about creating a list of interrupted tasks? You have already the catch enabling you to execute code for a specific interrupted thread:

public static void main(String[] args) throws InterruptedException {
    ExecutorService es = Executors.newFixedThreadPool(1);
    List<Runnable> interruptedTasks = new CopyOnWriteArrayList<Runnable>(); //create the list of interrupted threads/tasks
        //Edited - after the right comment by @Luke Lee (AbstractList cannot be instantiated and the operation in the catch block should be thread-safe)
    for(int i=0; i< 5; i++){
        final int c = i + 1;
        es.submit(new Runnable(){
            @Override
            public void run() {
                System.out.println("current running " + c);
                try {
                    Thread.sleep(10000);    // 10 sec
                } catch (InterruptedException e) {
                    System.out.println("interrupted  " + c);
                    interruptedTasks.add(this); //add this interrupted instance to the list
                }
            }

        });
    }

    Thread.sleep(15000);
    List<Runnable> rems = es.shutdownNow();
    System.out.println("Remaining " + rems.size());
    System.out.println("--------- restore remaining task ----------");

    es = Executors.newFixedThreadPool(1);
    for (Runnable r : interruptedTasks){ //add the interrupted Runnables to the new pool
        es.submit(r);
    }
    for(Runnable r : rems){
        es.submit(r);
    }
}

I didn't test this - but this should work, I believe. If you need some sort of clearing up before rerunning the task you can of course do them in the catch while adding the instance to the interrupted tasks list.

Obviously this helps the simplified test - but you can do the same in a more complex design: you can have the Runnable as a class that gets the pool's object as a parameter in the constructor rather than using an anonymous class. Only after creating the instance you submit it to the pool. That way it has the ability to add itself to the pool if interrupted. But then you need to add a setPool(ExecutorSerive) method to the Runnable and call it, resetting the pool's object to the new one, before rerunning it (in the for-loop that I added, just before the submit line).


Edit: Just saw your edit - about the options. My suggestion was obviously the 2nd option. For the first option I think you can use the example code in the documentation here, specifically - look at awaitTermination

The relevant section:

void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
}
et_l
  • 1,868
  • 17
  • 29
  • 1
    Two small issues, 1) you cannot instantiate an `AbstractList`. 2) race condition when adding interrupted tasks. – xiaofeng.li Jun 27 '16 at 12:48
  • @LukeLee well there is a visibility issue but no race condition, although OP should be using awaitTermination because shutdownNow doesn't block. –  Jun 27 '16 at 13:14
  • @xTrollxDudex Can you elaborate? IMHO When the thread pool size is larger than 1 and more than one thread are interrupted, my understanding is they all go to the catch block without synchronization. – xiaofeng.li Jun 27 '16 at 13:23
  • You're right, it is a single thread pool, but if there were more threads, ordering doesn't really matter (but the question doesn't specify whether there are more threads OR that order even matters). –  Jun 27 '16 at 13:44
  • 1
    When you are computing heterogeneous work, tasks after X amount of threads available to handle them that are then passed to the worker threads arrive in any order, thus there is no correct "ordering" which to synchronize to. Every order is right and every order is wrong, so it is plain unnecessary overhead to synchronize which task will continue first when restarted. Unless all of your tasks are synchronized, there is no race condition on the interrupted tasks list. TL;DR this is all assuming that OP doesn't care about the order. –  Jun 27 '16 at 13:59
  • @et_l your suggestion about option1 make me confused, shutdownNow() will attempt to stop all actively executing tasks, in other words it will interrupt actively running threads, so it should not be a solution for option 1. Correct me If I am wrong. – WestFarmer Jun 27 '16 at 14:31
  • @WestFarmer You're correct, of course. shutdownNow() will interrupt - but that's why it is called only after awaitTermination() has waited after calling shutdown(). It's an example taken straight from the documentation (as noted in the answer) and it shows the way to try shutdown and wait for the active threads to finish for a specified amount of time, before trying to force them to stop (interrupt - what we hope to avoid). – et_l Jun 27 '16 at 16:50
  • @Luke Lee thanks for the corrections. I've edited my answer and changed the AbstractList into CopyOnWriteArrayList which is both non-abstract and concurrent-safe. As xTrolloxDudex pointed out - ordering isn't an issue (the way I understand the question), but it still should be thread-safe as you pointed out. – et_l Jun 27 '16 at 16:54
  • @et_l You will find that race conditions are quite difficult to understand, I tried to elaborate the best I could here https://gist.githubusercontent.com/AgentTroll/db0da5b6c7f99bae4bc2796fda737938/raw/8666362b65ccbce7dc21f4b1cbfb617f1ba987ac/Race%2520conditions.txt –  Jun 27 '16 at 17:04
  • @xTrollxDudex The original answer didn't use a thread-safe list. My definition for "race condition" is explained here http://stackoverflow.com/questions/34510/what-is-a-race-condition – xiaofeng.li Jun 28 '16 at 01:20
  • I have to assume you just like to lecture. :) – xiaofeng.li Jun 28 '16 at 01:21
  • You are right, when it comes to threading, I love to talk :p However, I believe I am not mistaken when addressing your point that there is a race condition, which is when "the result of the change in data is dependent on the thread scheduling algorithm." When you say race condition, I assume you refer to the order that the threads write to the shared interrupted tasks list is dependent on the thread scheduler, which I address in my reply. If I am mistaken, my bad, but I hope you've learned something by reading what I've written anyways :) –  Jun 28 '16 at 03:19