I'm testing with a minimal example how futures work when using ProcessPoolExecutor
.
First, I want to know the result of my processing functions, then I would like to add complexity to the scenario.
import time
import string
import random
import traceback
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
def process(*args):
was_ok = True
try:
name = args[0][0]
tiempo = args[0][1]
print(f"Task {name} - Sleeping {tiempo}s")
time.sleep(tiempo)
except:
was_ok = False
return name, was_ok
def program():
amount = 10
workers = 2
data = [''.join(random.choices(string.ascii_letters, k=5)) for _ in range(amount)]
print(f"Data: {len(data)}")
tiempo = [random.randint(5, 15) for _ in range(amount)]
print(f"Times: {len(tiempo)}")
with ProcessPoolExecutor(max_workers=workers) as pool:
try:
index = 0
futures = [pool.submit(process, zipped) for zipped in zip(data, tiempo)]
for future in concurrent.futures.as_completed(futures):
name, ok = future.result()
print(f"Task {index} with code {name} finished: {ok}")
index += 1
except Exception as e:
print(f'Future failed: {e}')
if __name__ == "__main__":
program()
If I run this program, the output is as expected, obtaining all the future
results. However, just at the end I also get a failure:
Data: 10
Times: 10
Task utebu - Sleeping 14s
Task klEVG - Sleeping 10s
Task ZAHIC - Sleeping 8s
Task 0 with code klEVG finished: True
Task RBEgG - Sleeping 9s
Task 1 with code utebu finished: True
Task VYCjw - Sleeping 14s
Task 2 with code ZAHIC finished: True
Task GDZmI - Sleeping 9s
Task 3 with code RBEgG finished: True
Task TPJKM - Sleeping 10s
Task 4 with code GDZmI finished: True
Task CggXZ - Sleeping 7s
Task 5 with code VYCjw finished: True
Task TUGJm - Sleeping 12s
Task 6 with code CggXZ finished: True
Task THlhj - Sleeping 11s
Task 7 with code TPJKM finished: True
Task 8 with code TUGJm finished: True
Task 9 with code THlhj finished: True
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 101, in _python_exit
thread_wakeup.wakeup()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 89, in wakeup
self._writer.send_bytes(b"")
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 183, in send_bytes
self._check_closed()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
AFAIK the code doesn't have an error itself. I've been researching really old questions like the following, without luck to find a fix for it:
- this one related to the same error msg but for Python 2,
- this issue in GitHub which seems to be exactly the same I'm having (and seems not fixed yet...?),
- this other issue where the comment I linked to seems to point to the actual problem, but doesn't find a solution to it,
- of course the official docs for Python 3.7,
- ...
And so forth. However still unlucky to find how to solve this behaviour. Even found a old question here in SO who pointed to avoid using as_completed
function and using submit
instead (and from there came this actual test, before I just had a map
to my process function).
Any idea, fix, explanation or workaround are welcome. Thanks!