0

I've made a small code for practising with Executors and Threads. It consists of the following:

  1. Create a fixed-thread pool of size 3 with an infinite queue.
  2. Submit 3 tasks with infinite loop (while(true)) to the pool (then all threads are occupied)
  3. Submit a 4th task, which is going to be waiting in the queue.
  4. executor.shutdown() and doing a println for seeing how make active task and task count do i have.
  5. setting the flag to false in order to stop the infinite while and then doing a println for seeing how make active task and task count do i have
  6. cancelling all futures with mayInterruptIfRunning=true and then doing a println for seeing how make active task and task count do i have

This is the code:

public class Main {


private static ThreadPoolExecutor fixexThreadPool;

public static void main(String[] args) throws InterruptedException {
    System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
    fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    final Boolean[] flag = new Boolean[1];
    flag[0] = true;
    List<Future> futures = new ArrayList<>();

    System.out.println("Submiting 3 threads");
    for (int i = 0; i < 3; i++) {
        futures.add(fixexThreadPool.submit(() -> {
            int a = 1;
            while (flag[0]) {
                a++;
            }
            System.out.println("Finishing thread execution.");
        }));
    }
    System.out.println("Done submiting 3 threads.");
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    Thread.sleep(3000L);
    System.out.println("Submitting a 4th thread.");

    futures.add(fixexThreadPool.submit(() -> {
        int a = 1;
        while (flag[0]) {
            a++;
        }
        System.out.println("Finishing thread execution");
    }));

    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));

    System.out.println("Executor shutdown");
    fixexThreadPool.shutdown();
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    Thread.sleep(2000L);
    System.out.println("Setting flag to false.");
    flag[0] = false;
    Thread.sleep(2000L);
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    System.out.println("Cancelling all futures.");
    futures.forEach(f -> f.cancel(true));
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
}

}

This is the output of the execution:

  • Creating fixed thread pool of size 3 and infinite queue.
  • Submiting 3 threads
  • Done submiting 3 threads.
  • Active count: 3 | Completed task count: 0 | task count: 3
  • Submitting a 4th thread.
  • Active count: 3 | Completed task count: 0 | task count: 4
  • Executor shutdown
  • Active count: 3 | Completed task count: 0 | task count: 4
  • Setting flag to false.
  • Active count: 3 | Completed task count: 0 | task count: 4
  • Cancelling all futures.
  • Active count: 3 | Completed task count: 0 | task count: 4

There are a couple of things i don't understand.

  1. Why, after shutting down executor, there still are active threads ?
  2. Why, after changing the flag to false in order to break the infinite loop, the infinite while doesn't break ?
  3. Why, after cancelling every future, there is are active threads ?
  4. No matter if a change the flag to false, shutdown executor or even cancelling all futures, my program doesn't stop running. Why is that?

Thanks in advance!!

mariano
  • 95
  • 1
  • 8
  • Try reading the javadocs for `shutdown` and `shutdownNow` and understand the differences. Also, as the threads are looping wildly they will be consuming max cpu. Other instructions may not get a chance to execute. – Scary Wombat May 20 '20 at 00:32
  • hi @ScaryWombat, i've changed to shutdownNow() and program keeps running.The only thing that changed was that the 4th queued task was removed. – mariano May 20 '20 at 01:10

2 Answers2

1

This issue can be solved by making use of the volatile keyword. This thread provides a wealth of answers explaining what volatile is in detail and there are plenty of tutorials/sources/blogs out there that can provide further detail. Another super detailed thread about volatile here.

Honestly, there are many many people on the internet that can explain it better and more accurately than I ever could, but in short - volatile is a Java modifier and should be used when you have a resource that is being shared by multiple threads. It tells the JVM to make sure that each threads cached value is synchronized with the value in main memory.

Clearly the JVM is falling over somewhere and the value the threads are holding doesn't quite match the actual value. A small change to your implementation can fix this :

Make the flag a class instance variable

private volatile static Boolean[] flag = new Boolean[1];

See here for why I did this.

So to give the slightly bigger picture:

private static ThreadPoolExecutor fixexThreadPool;
private volatile static Boolean[] flag = new Boolean[1];

public static void main(String[] args) throws InterruptedException {
    System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
    fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    flag[0] = true;
    List<Future> futures = new ArrayList<>();

    System.out.println("Submiting 3 threads");
    ...

The code now happily stops without any issues, hope this helps :)

(on a side note, curious as to why you used Boolean[] and not just a Boolean? I kept it as is for consistency in my answer but it also works obviously with flag just being a Boolean rather than an array)

-- Edit --

To answer your recent comment - I'm affraid my understanding pretty much stops with what I have already written, but I can provide my thoughts on it. It appears that the reason your app doesn't "exit" when you call fixexThreadPool.shutdown(); can be found in the documentation for ThreadPoolExecutor. Namely -

public void shutdown()

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.

The while loop has already been submitted, and so it will happily carry on executing.

I explored this a bit to see what was happening.

Firstly I didn't enjoy that long status log line so I created a separate method for it! I also noticed a few interesting Boolean state in the ThreadPoolExecutor and Future's so decided to log them too:

private static void Log() {
     System.out.println(String.format("\nActive count: %s | Completed task count: %s | task count: %s", 
             fixexThreadPool.getActiveCount(), 
             fixexThreadPool.getCompletedTaskCount(), 
             fixexThreadPool.getTaskCount()));
     System.out.println(String.format("isShutdown : %s | isTerminated : %s | isTerminating : %s ", 
             fixexThreadPool.isShutdown(), 
             fixexThreadPool.isTerminated(), 
             fixexThreadPool.isTerminating())); 
     System.out.println(String.format("Futures size = %s", futures.size()));
     futures.forEach(f -> System.out.println(String.format("Future isCancelled : %s | isDone : %s", f.isCancelled(), f.isDone())));
     System.out.println("");
}

Placing this into your code, we get:

    Log();
    System.out.println("Executor shutdown");
    fixexThreadPool.shutdown();
    Log();
    Thread.sleep(2000L);
    System.out.println("Setting flag to false.");
    flag[0] = false;
    Thread.sleep(2000L);
    Log();
    System.out.println("Cancelling all futures.");
    futures.forEach(f -> System.out.println(String.format("Future cancelled - %s", f.cancel(true))));
    Log();

I also wanted to add a quick heartbeat to the app, printing a every now and then so i could see what was/wasn't still running behind the scenes:

private static void Heartbeat(int a) {
    int amod = a % 1000000000;
    if(amod == 0) {
        System.out.println(a);
    }
}

In use:

while (flag[0]) {
    a++;
    Heartbeat(a);
}

I then ran a few test runs, trying some different things, in different combinations:

  • comment out fixexThreadPool.shutdown();
  • comment out futures.forEach(f -> f.cancel(true));
  • comment out flag[0] = false;
  • try with/without the new volatile fix.

To keep an already very long answer a little bit shorter, feel free to explore combinations of these yourself, but I found that

  • without the volatile keyword, the threads were stuck in terminating state. I can only assume that shutdown() does not stop the while loops from doing what their doing as these tasks have already been submitted and so as none of the threads think flags[0] == false, they will carry on incrementing a.

    As far as I can see, this exhibits the behavior outlined in the documentation. So shutdown isnt stopping your while loops, it just stops any new future's being submitted to the thread pool (and being added to the blocking queue), then waits paitently for the while loops to complete (which obviously without the volatile modifier on the flag, they never will).

  • Using volatile, but commenting out the shutdown, obviously, each task terminates (console logs 4 "Finishing thread execution.") but the thread pool stays active and the program itself does not terminate. The pool is patiently waiting for new tasks, which it obviously never gets.
  • The `future.cancel' logic is a bit beyond my understanding currently. I ran a test calling just future.cancel and did some deeper logging/investigation but don't really have a concrete answer.

    My running theory is that the future's have already been added to the blocking queue in the thread pool and so calling cancel doesn't do anything to affect the thread pool's execution, and so effectively calling future.cancel on its own does absolutely nothing to fixexThreadPool.

Hopefully this provides plenty of food for thought :)

Ben
  • 86
  • 2
  • I used Boolean[] because AFAIK where using local variables from a lambda, lambda assumes them as effectively final, then if i use boolean directly, i will work with a copy of the boolean and if i change the value from the outside, the lambda won't notice the change. Is that correct. ? – mariano May 20 '20 at 01:20
  • thanks @Ben for the clear answer. I've changed the flag to volatile and now the threads are finishing :) . However, i still don't understand why does the program doesn't finish when i say executor.shutdown() and when i cancel all futures... – mariano May 20 '20 at 01:24
  • I am guessing that if there was a small sleep in the wild loops, then some other Thread may have the chance to execute. – Scary Wombat May 20 '20 at 01:39
  • thank you @Ben for the explanation. As you said there are some things which are unclear yet. – mariano May 20 '20 at 15:42
0

Why, after shutting down executor, there still are active threads?

By invoking shutdown, we request a normal shutdown. The thread pool stops accepting new tasks but waits for the already submitted tasks to complete - running or queued. So the shutdown method doesn't try to stop the threads. And conceptually, the ExecutorService implementations separate the task submission from the task execution. In other words, we own the tasks but not the threads.

Why, after changing the flag to false in order to break the infinite loop, the infinite while doesn't break?

We're using a cancellation flag here. But according to the Java memory model, for multiple threads to see the changes made to the shared data - flag - we must either employ synchronization or use the volatile keyword. So while synchronization provides both mutual exclusion and memory visibility, the volatile keyword just provides the memory visibility. Since only the main thread modifies the flag variable here, defining flag as a static volatile variable will make it work as expected.

public class Main {


    private static ThreadPoolExecutor fixexThreadPool;
    private static volatile Boolean[] flag = new Boolean[1];
    // the main method
}

Why, after canceling every future, there is are active threads ?

ThreadPoolExecutor uses the FutureTask class as the Future implementation by default. And FutureTask cancels by interrupting the worker thread. So one can expect thread interruption to stop the task and even terminate the thread. But thread interruption is a cooperative mechanism. Firstly the task must be responsive to interrupts. It must check the interruption flag with Thread.isInterrupted and exit if possible. Secondly, the thread owner, in our case the thread pool, must check the interruption status and act accordingly. In this example, the task isn't responsive to interrupts. It uses a flag to cancel its operation. So let's continue with the thread owner. ThreadPoolExecutor doesn't act on the interruption status, so it doesn't terminate the thread.

No matter if a change the flag to false, shutdown executor or even canceling all futures, my program doesn't stop running. Why is that?

As mentioned above, the current task uses the cancellation flag approach. So using the volatile keyword must solve the problem. Then the tasks will stop as intended. Cancellation, on the other hand, won't have any effect on the current task. Because it isn't checking the interruption status. We can make it responsive like this:

while (flag[0] && !Thread.currentThread().isInterrupted())

This way, the task also supports the cancellation by a thread interrupt.

isaolmez
  • 1,015
  • 11
  • 14
  • thanks for the answer. super clear. Do you know any book or course where i can learn these deep and "under the hood" concepts on java threads and executors ? – mariano Jun 03 '20 at 16:41
  • I'm glad it helped. These topics are covered in detail in "Java Concurrency in Practice" by Brian Goetz and friends. – isaolmez Jun 04 '20 at 11:57