This program does not hang for me and I do not see anything in your code that is violating any published restrictions placed on the use of the concurrent.futures
package. So reading the rest of this may be a waste of your time. But that's not to say that you don't have some statements that are not accomplishing anything in your code that you may not be aware of and, if that is the case, I thought I should point these out to you. And perhaps the issue you are having may be related to one of these statements combined with the version of Python you are using or the platform on which you are executing (although I am not very confident that your problem doesn't lie elsewhere). Anyway, I have modified your code below to point out a few things.
First, I have modified the pool size to be 1 and I now call method submit
twice. Consequently the first submitted task executes immediately but the second task will not start executing until the first submitted task completes. Second, when you use a ThreadPoolExecutor
instance as a context manager like you are doing, then when the block terminates there is an implicit call to ThreadPoolExecutor.shutown(wait=True)
. Consequently, I have rewritten your code to make this implicit call explicit. The resulting modified code is:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(task_no):
for i in range(3):
print(f'task no. = {task_no}, i = {i}')
sleep(1)
executor = ThreadPoolExecutor(1)
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)
print('future 1 canceled = ', future1.cancel())
print('future 2 canceled = ', future2.cancel())
executor.shutdown(wait=False)
executor.shutdown(wait=True)
print('I am done!')
And its output in my environment is:
task no. = 1, i = 0
future 1 canceled = False
future 2 canceled = True
task no. = 1, i = 1
task no. = 1, i = 2
I am done!
Discussion
The first thing to observe is that when the call future1.cancel()
is executed, the first task is already running and therefore calling cancel
has no effect. But since the second task has not started execution when future2.cancel()
is called, we can see that the task can be canceled. The point is that in your original code, the call to future.cancel()
will have no effect.
The second point is that because you are using variable executor
as a context manager, in addition to the call you explicitly make to executor.shutdown(wait=False)
, it is immediately followed by an implicit call to executor.shutdown(wait=True)
, so you end up waiting for all pending tasks to complete before continuing thus rendering the first call to executor.shutdown(wait=False)
rather useless.
Question
Is it possible that for some reason in your environment these two consecutive calls to shutdown
is the cause of your hang? Try the following code to see if it makes any difference:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task():
for i in range(3):
print(i)
sleep(1)
executor = ThreadPoolExecutor()
future = executor.submit(task)
executor.shutdown(wait=False)
If this runs to completion, then you can try first adding the call future.cancel()
and if the program still completes, then add back the additional call to executor.shutdown(wait=True)
.
Update
Based on your comment, if you are looking to terminate a thread that never ends on its own, you cannot do this if you are using the concurrent.futures
package or a threading.Thread
instance. However, you can do this with the multiprocessing.pool.ThreadPool
multithreading pool:
from multiprocessing.pool import ThreadPool
from time import sleep
def task():
i = 0;
while True:
print(i)
i += 1
sleep(.5)
pool = ThreadPool(1)
async_result = pool.apply_async(task)
sleep(2) # Let the task run for a while
pool.terminate() # Now terminate the pool
Prints:
0
1
2
3
Or you can use the pool as a context manager, which implicitly calls terminate()
on the pool when the context manager block exits:
from multiprocessing.pool import ThreadPool
from time import sleep
def task():
i = 0;
while True:
print(i)
i += 1
sleep(.5)
with ThreadPool(1) as pool:
async_result = pool.apply_async(task)
sleep(2) # Let the task run for a while
# Now we exit the block:
# There is an explicit call to pool.terminate() when
# the context manager block exits.
Finally, since the pool threads are daemon threads, when the process that created the pool terminates, then the pool's threads automatically terminate and you do not even have to call terminate
:
from multiprocessing.pool import ThreadPool
from time import sleep
def task():
i = 0;
while True:
print(i)
i += 1
sleep(.5)
pool = ThreadPool(1)
async_result = pool.apply_async(task)
sleep(2) # Let the task run for a while
# Now the main process implicitly terminates since no more statements
# are being executed and the pool's threads are destroyed.
But if you do not want the running task to terminate prematurely just because the program has no more statements to execute:
from multiprocessing.pool import ThreadPool
from time import sleep
def task():
for i in range(10):
print(i)
sleep(.5)
pool = ThreadPool(1)
async_result = pool.apply_async(task)
# Explicitly wait for the task to complete by calling `get` on the
# AsyncResult instance or call pool.close() followed by pool.join()
# to wait for all submitted tasks to complete:
async_result.get()
# Program terminates here:
You may also wish to look at this for a comparison of the two pool packages.