0

I am trying to make an ExecutorService implementation that can be provided with a timeout or interrupt for each thread.

In my below example, suppose I am spawning 2 threads (in actual scenario, this number will be high), then I need to make sure each thread should be running for 10 minutes. That means, Thread1 will run for 10 minutes and Thread2 will run for 10 minutes as well. If 10 minutes is over then I need to interrup the thread or timeout.

Below is the code I have so far and I am not able to understand how should I add this interrupt or timeout functionality here in such a clean way so that if I am making this no of threads parameter configurable in my code then it should work properly there as well.

public static void main(String[] args) {

    final int noOfThreads = 2;
    final long exp_time_millis = 600000; //10 minutes

    //create thread pool with given size 
    ExecutorService service = Executors.newFixedThreadPool(noOfThreads);


    for (int i = 0, i< noOfThreads; i++) {
        service.submit(new ThreadTask());
    }
}


class ThreadTask implements Runnable {

    @Override
    public void run() {

        while(true) {
            System.out.println("Thread running...");
            try {

        /* make a select sql to the database 
         * and measure how much time it is taking in 
         * returning the response
         */

            } catch (InterruptedException e) {

            }
        }
    }
}

Any suggestions will be of great help.

I have already seen few articles on the SO but I was not able to find anything which matches my scenario and I can implement that easily.

Updated Code:-

I am trying the below code but it gives me error on the catch block in the run method. Not sure if I am doing anything wrong. Can anyone help me?

public class ThreadTimeout {

    public static void main(String[] args) {

        final int noOfThreads = 2;

        //create thread pool with given size 
        ExecutorService service = Executors.newFixedThreadPool(noOfThreads);

        ScheduledExecutorService scheduleService = Executors.newScheduledThreadPool(noOfThreads);
        for (int i = 0; i< noOfThreads; i++) {
            final Future future = service.submit(new ThreadTask());
            scheduleService.schedule(new Runnable(){
                public void run(){
                    future.cancel(true);
                }
            }, 10, TimeUnit.MINUTES);
        }
    }
}

class ThreadTask implements Runnable {

    @Override
    public void run() {

           //make a database connection

        while (true) {
            System.out.println("Thread running...");
            try {
                /*
                 * make a select sql to the database and measure
                 * how much time it is taking in returning the
                 * response
                 */
            } catch (InterruptedException e) {

            }
        }
    }
}
AKIWEB
  • 19,008
  • 67
  • 180
  • 294

2 Answers2

1

I would recommend using a second ScheduledExecutorService. You can submit the Future returned from your original submissions to the ScheduledExecutorService to cancel.

ScheduledExecutorService scheduleService =   Executors.newScheduledThreadPool(n);
for (int i = 0, i< noOfThreads; i++) { 
   final Future future = service.submit(new ThreadTask());
   scheduleService.schedule(new Runnable(){
       public void run(){
           future.cancel(true);
       }
  }, 10, TimeUnits.MINUTES);
}

Now the ThreadTask needs to respond to interruption or else this will not help.

AKIWEB
  • 19,008
  • 67
  • 180
  • 294
John Vint
  • 39,695
  • 7
  • 78
  • 108
  • Thanks John for the help. I have updated my question with the snippet from your code and my code. But I am getting error in the catch block asking me to remove the exception. But I think you mentioned to me that `ThreadTask` needs to respond to `interruption`. Right? I know, I can add `Thread.sleep` but in actual code, I will be doing `select sql calls` to the database. Do you know how can I do that? Or if you can provide me the full flow by combining your code and mine code, that will be of great help to me as well. By that I can understand better. – AKIWEB Feb 25 '13 at 04:07
  • And one more thing ,`newScheduledThreadPool(n)` here n should be same as number of threads for `newFixedThreadPool(noOfThreads);` right? – AKIWEB Feb 25 '13 at 04:11
  • Hi John, can you please help me here with my questions? Thanks in advance. – AKIWEB Feb 25 '13 at 19:18
  • `And one more thing ,newScheduledThreadPool(n)` not necessarily. You can get away with 1 or a couple threads. All they will do is signal to the other threads they should cancel. That thread usually wouldn't be doing much work. – John Vint Feb 25 '13 at 19:23
  • `Do you know how can I do that?` depends on if the SQL driver itself is responsive to interruption. You can also look at this question http://stackoverflow.com/questions/9492777/stopping-a-non-looping-java-thread. You can instead have `if(!Thread.currentThread().isInterrupted())` as a condition. – John Vint Feb 25 '13 at 19:27
  • If you can't do the latter suggestion and your driver is responsive to interruption via `Statement.cancel` there is a sort of hack you can do to achieve interruption with `Future.cancel` – John Vint Feb 25 '13 at 19:30
0

What I would recommend is to use the ExecutorService.awaitTermination(...); method and then the ExecutorService.shutdownNow() method.

For example:

for (int i = 0; i < noOfThreads; i++) {
    service.submit(new ThreadTask());
}
// we always need to shutdown the service _after_ we've submitted all jobs
service.shutdown();
// now we wait for those tasks to finish for 10 minutes
if (!service.awaitTermination(10, TimeUnit.MINUTES)) {
    // if we timed out waiting for the tasks to finish, forcefully interrupt them
    service.shutdownNow();
}

Note that this will interrupt the threads but that will only cause certain methods such as Thread.sleep(), Object.wait(), and some others to throw InterruptedException. It also sets the interrupt bit on the thread which can be tested with Thread.currentThread().isInterrupted(). It will not "kill" the thread like you would a unix process.

Gray
  • 115,027
  • 24
  • 293
  • 354