0

I have a Job class which triggers 3 different runnable tasks inside an executor service. The threads may take longer time(may be sometime even 1 hour to finish their tasks). If they take more time than expected, I want to stop the threads. Is there an efficient way to stop the async process?

The job gets triggered at some point in time(using a scheduler cron exp), the job needs to be stopped in case it is taking more time.

public class MyJob {
   . . . . . . . . . .
   . . . . . . . . . .
   public void process(..) {
     . . . . . . . . . .
     . . . . . . . . . .
     Runnable r1 = () -> { task1() };
     Runnable r2 = () -> { task2() };
     Runnable r3 = () -> { task3() };

     List.add(r1, r2, r3);
   }

   private void task1() {
     . . . . . . . . . .
   }

   private void task2() {
     . . . . . . . . . .
   }

   private void task3() {
     . . . . . . . . . .
   }

   private void runAsync(List<Runnable> tasks) {
     . . . . . . . . . .
     myExecutorService.submitAll(tasks);
     . . . . . . . . . .
   }
}

I read about interrupts, but am not sure if that fits my requirement. I want to define an API endpoint through which I would stop the async process.

  • Also duplicate of [this](https://stackoverflow.com/q/59464183/642706), [this](https://stackoverflow.com/q/12032674/642706), – Basil Bourque Apr 26 '23 at 00:04

3 Answers3

1

In Java there are basically 4 primitives to support inter-thread communication:

  1. Volatile variables.
  2. Interrupt system.
  3. Synchronized (methods or blocks).
  4. wait() + notify() system.

For completeness there is also the Thread.stop() and Thread.suspend() methods - but per the documentation they are inherently unsafe and should not be used. Now deprecated for eventual removal. See Why are Thread.stop, Thread.suspend and Thread.resume Deprecated?.

Additionally you should review the java.util.concurrent package - however my understanding is that the concurrent package is built on top of the 4 primitives listed above, as a result it does not provide any additional capabilities (that you couldn't implement yourself using those primitives).

The first two primitives listed above can be viewed as mechanisms to pause/abort running Threads. The last two are more related to synchronization/correctness and performance optimization (not having idle threads burning CPU).

The simplest case for a volatile variable would be a boolean flag shouldTerminate it is initialized with a value false. Then your main processing loop in the worker thread would typically have something like:

while(!shouldTerminate) {
  // Do something.
}

Hence if you want to signal a worker thread to terminated you can simply set shouldTerminate = true; and the worker thread will break out of its loop. How many flags you have and which worker Threads use which flags determines the level of control you have over when the workers will be terminated.

The core issue with (only) using volatile variables is that Threads can get blocked in several ways:

  • Waiting on a monitor (synchronized block) - possibly during a wait()
  • Executing a Thread.sleep()
  • Performing IO.

If you only use volatile variables the Thread won't abort until whatever situation is blocking its execution is cleared - once the block is cleared the Thread will then be able to check the variable and if required shut down.

This is where notify() and interrupt() come in:

  • notify() allows you to instruct another thread to break out of a wait().
  • interrupt() will typically break the other thread out of a sleep() and/or if it is blocked on IO.

Note: notify() wakes a (single) random thread hence if you have multiple Threads waiting on a single monitor it is generally good practice to use notifyAll().

Is there an efficient way to stop the async process?

You can cancel() on the Future you get back when scheduling jobs - but if the job is already running this is implemented as an interrupt().

Alternatively you can define a variable to signal to the job it should terminate - this is typically a cleaner solution - but it has the drawbacks outlined above.

Edit - Atomic Classes

As @Basil Bourque - points out in the comments the Atomic... classes are a grey area. I don't consider them primitives themselves, but they do leverage a primitive that I did not mention in my list above (the CAS operations from the VarHandle class).

The above list of primitives are typically something you might choose to use directly, where as I would recommend using the Atomic... classes in preference to directly invoking CAS operations.

TBF - since java.util.concurrent was added I haven't worked with interrupt() or wait/notify much. I have usually found one of the concurrent classes meets my needs, but I do still use volatile booleans to terminate execution of background threads - this is because I don't know how well third party libraries (typically used in worker threads) would cope with an interrupt. So I would rather accept that there will be a delay, while any IO completes before termination can occur.

DavidT
  • 385
  • 1
  • 8
  • I would add the `Atomic…` classes to item # 1, such as [`AtomicBoolean`](https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/concurrent/atomic/AtomicBoolean.html). – Basil Bourque Apr 25 '23 at 23:47
  • "not having idle threads burning CPU" seems like the wrong choice of words. An idle thread by definition is not using CPU cycles. – Basil Bourque Apr 25 '23 at 23:58
  • With respect to "busy idle threads" it really becomes a question of how you would implement functionality similar to `wait()` + `notify()` if you didn't have support from the JVM - you would most likely have a loop with a sleep in it that just kept waking up to see if it had something to do - you would have to trade off responsiveness (short sleeps) against the work the thread did each time it woke up - bluntly I just didn't have a succinct way of saying that (and it was a bit off topic to the main answer) - happy to take an edit if you have a better way of saying it. – DavidT Apr 26 '23 at 03:52
0

It depends on whether you want to control the stop or not.

When you submit() a Runnable into the executor service, you get back a Future that declares a method cancel(booleam mayInterruptIfRunning).

It is up to you to:

Either you expose this list of futures and let the caller decide what to do. In that case, your void process() becomes a List<Future<?>> process() so who calls this method, has the list of tasks running and can decide to stop some of them.

Or, you map these tasks internally and expose a method to stop the task, for example, by name. Something like this:

Map<String, Future<?>> tasksMap = new HashMap<>();
tasksMap.add("task1", executorService.submit(runnable1));
tasksMap.add("task2", executorService.submit(runnable2));
//etc.

and they can call you from outside like this:

public void stopTask(String taskName) {
    tasksMap.get(taskName).cancel(true/false);
    //I let you design the proper checks to make sure an entry of the map is present and that the thread is still running before cancelling it
}
Matteo NNZ
  • 11,930
  • 12
  • 52
  • 89
0

I would personally save the start date of the tasks into a global variable , and create another scheduler which checks at specified interval (depending on your needs) , if the now- start date stored earlier is higher than 1 hour . If it is, i would call myExecutorService.shutdownNow() method .

Uta Alexandru
  • 324
  • 4
  • 11