4

I have a ThreadPoolExecutor in my programs which submit()s a task. However, when I end my program, the script "freezes". It seems like the thread is not ended correctly.

Is there a solution for this?

example:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def task():
    for i in range(3):
        print(i)
        sleep(1)
        
        
with ThreadPoolExecutor() as executor:
    future = executor.submit(task)
    
    future.cancel()  # Waits for loop of blocking task to complete
    executor.shutdown(wait=False)  # Still waits for loop in blocking task to complete

sys.exit() does not work either, it will still wait for the future to complete

DasSaffe
  • 2,080
  • 1
  • 28
  • 67

1 Answers1

3

This program does not hang for me and I do not see anything in your code that is violating any published restrictions placed on the use of the concurrent.futures package. So reading the rest of this may be a waste of your time. But that's not to say that you don't have some statements that are not accomplishing anything in your code that you may not be aware of and, if that is the case, I thought I should point these out to you. And perhaps the issue you are having may be related to one of these statements combined with the version of Python you are using or the platform on which you are executing (although I am not very confident that your problem doesn't lie elsewhere). Anyway, I have modified your code below to point out a few things.

First, I have modified the pool size to be 1 and I now call method submit twice. Consequently the first submitted task executes immediately but the second task will not start executing until the first submitted task completes. Second, when you use a ThreadPoolExecutor instance as a context manager like you are doing, then when the block terminates there is an implicit call to ThreadPoolExecutor.shutown(wait=True). Consequently, I have rewritten your code to make this implicit call explicit. The resulting modified code is:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def task(task_no):
    for i in range(3):
        print(f'task no. = {task_no}, i = {i}')
        sleep(1)


executor = ThreadPoolExecutor(1)
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)

print('future 1 canceled = ', future1.cancel())
print('future 2 canceled = ', future2.cancel())
executor.shutdown(wait=False)
executor.shutdown(wait=True)
print('I am done!')

And its output in my environment is:

task no. = 1, i = 0
future 1 canceled =  False
future 2 canceled =  True
task no. = 1, i = 1
task no. = 1, i = 2
I am done!

Discussion

The first thing to observe is that when the call future1.cancel() is executed, the first task is already running and therefore calling cancel has no effect. But since the second task has not started execution when future2.cancel() is called, we can see that the task can be canceled. The point is that in your original code, the call to future.cancel() will have no effect.

The second point is that because you are using variable executor as a context manager, in addition to the call you explicitly make to executor.shutdown(wait=False), it is immediately followed by an implicit call to executor.shutdown(wait=True), so you end up waiting for all pending tasks to complete before continuing thus rendering the first call to executor.shutdown(wait=False) rather useless.

Question

Is it possible that for some reason in your environment these two consecutive calls to shutdown is the cause of your hang? Try the following code to see if it makes any difference:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def task():
    for i in range(3):
        print(i)
        sleep(1)


executor = ThreadPoolExecutor()
future = executor.submit(task)
executor.shutdown(wait=False)

If this runs to completion, then you can try first adding the call future.cancel() and if the program still completes, then add back the additional call to executor.shutdown(wait=True).

Update

Based on your comment, if you are looking to terminate a thread that never ends on its own, you cannot do this if you are using the concurrent.futures package or a threading.Thread instance. However, you can do this with the multiprocessing.pool.ThreadPool multithreading pool:

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    i = 0;
    while True:
        print(i)
        i += 1
        sleep(.5)


pool = ThreadPool(1)
async_result = pool.apply_async(task)
sleep(2) # Let the task run for a while
pool.terminate() # Now terminate the pool

Prints:

0
1
2
3

Or you can use the pool as a context manager, which implicitly calls terminate() on the pool when the context manager block exits:

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    i = 0;
    while True:
        print(i)
        i += 1
        sleep(.5)


with ThreadPool(1) as pool:
    async_result = pool.apply_async(task)
    sleep(2) # Let the task run for a while
    # Now we exit the block:
# There is an explicit call to pool.terminate() when
# the context manager block exits.

Finally, since the pool threads are daemon threads, when the process that created the pool terminates, then the pool's threads automatically terminate and you do not even have to call terminate:

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    i = 0;
    while True:
        print(i)
        i += 1
        sleep(.5)


pool = ThreadPool(1)
async_result = pool.apply_async(task)
sleep(2) # Let the task run for a while
# Now the main process implicitly terminates since no more statements
# are being executed and the pool's threads are destroyed.

But if you do not want the running task to terminate prematurely just because the program has no more statements to execute:

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    for i in range(10):
        print(i)
        sleep(.5)


pool = ThreadPool(1)
async_result = pool.apply_async(task)
# Explicitly wait for the task to complete by calling `get` on the 
# AsyncResult instance or call pool.close() followed by pool.join() 
# to wait for all submitted tasks to complete:
async_result.get()
# Program terminates here:

You may also wish to look at this for a comparison of the two pool packages.

Booboo
  • 38,656
  • 3
  • 37
  • 60
  • That is definitely not unimportant. But maybe I phrased it a bit unclear. What I'm really looking for is a way to force-kill an already spawned thread, which would never exit on its own – DasSaffe Jun 26 '23 at 07:08
  • See my update to the answer. – Booboo Jun 26 '23 at 10:27