1

I have a callable which starts a Thread(this Thread runs a ping process) I want to allow the user to cancel the tasks:

public class PingCallable implements Callable<PingResult> {

private ProcThread processThread;

public PingCallable(String ip) {
    this.processThread = new ProcThread(ip);
}

@Override
public PingResult call() throws Exception {
    log.trace("Checking if the ip " + ip + " is alive");
    try {
        processThread.start();
        try {
            processThread.join();
        } catch (InterruptedException e) {
            log.error("The callable thread was interrupted for " + processThread.getName());
            processThread.interrupt();
            // Good practice to reset the interrupt flag.
            Thread.currentThread().interrupt();
        }
    } catch (Throwable e) {
        System.out.println("Throwable ");
    }
    return new PingResult(ip, processThread.isPingAlive());
  }
}

The ProcThread, looks something like:

@Override
public void run() {
    try {
        process = Runtime.getRuntime().exec("the long ping", null, workDirFile);
        /* Get process input and error stream, not here to keep it short*/ 

        // waitFor is InterruptedException sensitive
        exitVal = process.waitFor();
    } catch (InterruptedException ex) {
        log.error("interrupted " + getName(), ex);
        process.destroy();
        /* Stop the intput and error stream handlers, not here */ 
        // Reset the status, good practice
        Thread.currentThread().interrupt();
    } catch (IOException ex) {
        log.error("Exception while execution", ex);
    }
}

And the test:

    @Test
    public void test() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(15);
        List<Future<PingResult>> futures = new ArrayList<>();

        for (int i= 0; i < 100; i++) {
            PingCallable pingTask = new PingCallable("10.1.1.142");
            futures.add(executorService.submit(pingTask));
        }

        Thread.sleep(10000);
        executorService.shutdownNow();
//        for (Future<PingResult> future : futures) {
//            future.cancel(true);
//        }
    }

I monitor the ping processes using ProcessExplorer, I see 15, then the shutdownNow is executed, or future.cancel(true), only 4-5 max 8 processes are interrupted, the rest are left alive, I almost never see 15 messages saying "The callable thread was interrupted..", and the test does not finish until the processes end. Why is that?

dalvarezmartinez1
  • 1,385
  • 1
  • 17
  • 26
  • it might has something to do with JUnit not working correctly with threads. or maybe the threads not executed finished their work – user2717954 Oct 22 '14 at 10:22
  • 1
    is there any reason to have the ProcThread? Could this be removed and the code in the run() method put in your PingCallable? It would simplify matters. You already have a thread pool so you shouldn't need to spawn more threads. – pillingworth Oct 22 '14 at 10:28
  • @pauli The only reason I use the ProcThread is because it is used by many of our apps. I tried that too, not spawning another Thread, but it still does not work. – dalvarezmartinez1 Oct 22 '14 at 10:55

2 Answers2

1

I might not have a complete answer but there are two things to note:

  • shutdownNow signals a shutdown, to see if threads are actually stopped, use awaitTermination
  • process.destroy() also takes time to execute so the callable should wait for that to complete after interrupting the process-thread.

I modified the code a little and found that future.cancel(true) will actually prevent execution of anything in the catch InterruptedException-block of ProcThread, unless you use executor.shutdown() instead of executor.shutdownNow(). The unit-test does finish when "Executor terminated: true" is printed (using junit 4.11). It looks like using future.cancel(true) and executor.shutdownNow() will double-interrupt a thread and that can cause the interrupted-blocks to be skipped.

Below the code I used for testing. Uncomment for (Future<PingResult> f : futures) f.cancel(true); together with shutdown(Now) to see the difference in output.

public class TestRunInterrupt {


static long sleepTime = 1000L;
static long killTime = 2000L;

@Test
public void testInterrupts() throws Exception {

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    List<Future<PingResult>> futures = new ArrayList<Future<PingResult>>();
    for (int i= 0; i < 100; i++) {
        PingCallable pingTask = new PingCallable("10.1.1.142");
        futures.add(executorService.submit(pingTask));
    }
    Thread.sleep(sleepTime + sleepTime / 2);
    // for (Future<PingResult> f : futures) f.cancel(true);
    // executorService.shutdown();
    executorService.shutdownNow();
    int i = 0;
    while (!executorService.isTerminated()) {
        System.out.println("Awaiting executor termination " + i);
        executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        i++;
        if (i > 5) {
            break;
        }
    }
    System.out.println("Executor terminated: " + executorService.isTerminated());
}

static class ProcThread extends Thread {

    static AtomicInteger tcount = new AtomicInteger();

    int id;
    volatile boolean slept;

    public ProcThread() {
        super();
        id = tcount.incrementAndGet();
    }

    @Override
    public void run() {

        try {
            Thread.sleep(sleepTime);
            slept = true;
        } catch (InterruptedException ie) {
            // Catching an interrupted-exception clears the interrupted flag.
            System.out.println(id + " procThread interrupted");
            try {
                Thread.sleep(killTime);
                System.out.println(id + " procThread kill time finished");
            } catch (InterruptedException ie2) {
                System.out.println(id + "procThread killing interrupted"); 
            }
            Thread.currentThread().interrupt();
        } catch (Throwable t) {
            System.out.println(id + " procThread stopped: " + t);
        }
    }
}

static class PingCallable implements Callable<PingResult> {

    ProcThread pthread;

    public PingCallable(String s) {
        pthread = new ProcThread();
    }

    @Override
    public PingResult call() throws Exception {

        System.out.println(pthread.id + " starting sleep");
        pthread.start();
        try {
            System.out.println(pthread.id + " awaiting sleep");
            pthread.join();
        } catch (InterruptedException ie) {
            System.out.println(pthread.id + " callable interrupted");
            pthread.interrupt();
            // wait for kill process to finish
            pthread.join();
            System.out.println(pthread.id + " callable interrupt done");
            Thread.currentThread().interrupt();
        } catch (Throwable t) {
            System.out.println(pthread.id + " callable stopped: " + t);
        }
        return new PingResult(pthread.id, pthread.slept);
    }
}

static class PingResult {

    int id;
    boolean slept;

    public PingResult(int id, boolean slept) {
        this.id = id;
        this.slept = slept;
        System.out.println(id + " slept " + slept);
    }
}

}

Output without future.cancel(true) or with future.cancel(true) and normal shutdown(): 1 starting sleep 1 awaiting sleep 2 starting sleep 3 starting sleep 2 awaiting sleep 3 awaiting sleep 1 slept true 3 slept true 2 slept true 5 starting sleep 4 starting sleep 6 starting sleep 5 awaiting sleep 6 awaiting sleep 4 awaiting sleep 4 callable interrupted Awaiting executor termination 0 6 callable interrupted 4 procThread interrupted 5 callable interrupted 6 procThread interrupted 5 procThread interrupted Awaiting executor termination 1 6 procThread kill time finished 5 procThread kill time finished 4 procThread kill time finished 5 callable interrupt done 5 slept false 6 callable interrupt done 4 callable interrupt done 6 slept false 4 slept false Executor terminated: true

Output with future.cancel(true) and shutdownNow(): 1 starting sleep 2 starting sleep 1 awaiting sleep 2 awaiting sleep 3 starting sleep 3 awaiting sleep 3 slept true 2 slept true 1 slept true 4 starting sleep 6 starting sleep 5 starting sleep 4 awaiting sleep 5 awaiting sleep 6 awaiting sleep 5 callable interrupted 6 callable interrupted 4 callable interrupted 5 procThread interrupted 6 procThread interrupted 4 procThread interrupted Executor terminated: true

vanOekel
  • 6,358
  • 1
  • 21
  • 56
  • Thanks a lot sir. In my case, I also tried with awaitTeremination. The test prints "Executor terminated..". The tests is green and looks fine, but the button of the jvm which runs the test is still RED which means the JVM which runned the test is still on, it's off when i kill the pings manually in ProcessExplorer. Also I don't use shutdownNow with future.cancel(true), they are basically the same (as far as i know) either of them is commented – dalvarezmartinez1 Oct 22 '14 at 14:25
  • I've also noticed that the thread handlers of the process are never terminated, in this case the executor never finishes, have to check how to do this. – dalvarezmartinez1 Oct 22 '14 at 14:49
  • @dalvarezmartinez1 It looks like the ping-process is not stopping despite the `process.destroy()`. I googled on `java process destroy does not kill ping` and got lots of interesting hits. Maybe you can test to see if this is the cause of the problem by running a process other than the long ping (e.g. a batch file with 'echo hello world' or something along the lines of [this answer](http://stackoverflow.com/a/4908520/3080094)). – vanOekel Oct 22 '14 at 19:49
  • many thanks. You are right, the problem is somewhere there! I also think that I know how to solve it, I will test some more and post some stuff tomorrow! Thank you! – dalvarezmartinez1 Oct 22 '14 at 20:38
1

Yesterday I ran a series of tests, one of the most fruitful involved:

  1. Interrupting the threads which run the procces, checking that it was interrupted, and that the process nevertheless was still hanging on "waitFor",
  2. I decided to investigate why was the process not detecting that the thread in which it was running was interrupted.
  3. I found that it is crucial to handle the streams (output, input and error) correctly otherwise the external process will block on I/O buffer.
  4. I noticed that my error handler was also blocking on reading (no error output), don't know if it's an issue, but I decided to follow the suggestion and redirect the err stream to out stream
  5. Finally I discovered that there is a correct way to invoke and destroy processes in Java

New ProcThread (As @pauli suggests, it does not extend from THREAD anymore! Run's in a callable, I keep the name so the difference can be noticed) looks like:

        try {
        ProcessBuilder builder = new ProcessBuilder(cmd);
        builder.directory(new File(workDir));
        builder.redirectErrorStream(true);
        process = builder.start();
        // any output?
        sht= new StreamHandlerThread(process.getInputStream(), outBuff);
        sht.start();

        // Wait for is InterruptedException sensitive, so when you want the job to stop, interrupt the thread.
        exitVal = process.waitFor();
        sht.join();
        postProcessing();
        log.info("exitValue: %d", exitVal);
    } catch (InterruptedException ex) {
        log.error("interrupted " + Thread.currentThread().getName(), ex);
        shutdownProcess();

The shutdown process:

private void shutdownProcess() {
    postProcessing();
    sht.interrupt();
    sht.join();
}

The postProcessing:

    private void postProcessing() {
    if (process != null) {
        closeTheStream(process.getErrorStream());
        closeTheStream(process.getInputStream());
        closeTheStream(process.getOutputStream());
        process.destroy();
    }
}
Community
  • 1
  • 1
dalvarezmartinez1
  • 1,385
  • 1
  • 17
  • 26
  • 1
    You do not need a streamhandler-thread - [ExecHelper](http://ostermiller.org/utils/src/ExecHelper.java.html) from OstermillerUtils has been around for a long time and has served me well (adjusted version with abort time-out is [here](https://svn.java.net/svn/fwutil~svn/trunk/fwio/src/net/java/dev/fwutil/stackio/OSProcess.java)). – vanOekel Oct 23 '14 at 17:17