2

I'm testing with a minimal example how futures work when using ProcessPoolExecutor.

First, I want to know the result of my processing functions, then I would like to add complexity to the scenario.


import time
import string
import random
import traceback
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor

def process(*args):

    was_ok = True

    try:
        name = args[0][0]
        tiempo = args[0][1]

        print(f"Task {name} - Sleeping {tiempo}s")
        time.sleep(tiempo)

    except:
        was_ok = False

    return name, was_ok


def program():

    amount = 10
    workers = 2

    data = [''.join(random.choices(string.ascii_letters, k=5)) for _ in range(amount)]
    print(f"Data: {len(data)}")
    tiempo = [random.randint(5, 15) for _ in range(amount)]
    print(f"Times: {len(tiempo)}")

    with ProcessPoolExecutor(max_workers=workers) as pool:
        try:
            index = 0
            futures = [pool.submit(process, zipped) for zipped in zip(data, tiempo)]
            for future in concurrent.futures.as_completed(futures):
                name, ok = future.result()
                print(f"Task {index} with code {name} finished: {ok}")
                index += 1 
                
        except Exception as e:
            print(f'Future failed: {e}')

if __name__ == "__main__":
    program()

If I run this program, the output is as expected, obtaining all the future results. However, just at the end I also get a failure:

Data: 10
Times: 10
Task utebu - Sleeping 14s
Task klEVG - Sleeping 10s
Task ZAHIC - Sleeping 8s
Task 0 with code klEVG finished: True
Task RBEgG - Sleeping 9s
Task 1 with code utebu finished: True
Task VYCjw - Sleeping 14s
Task 2 with code ZAHIC finished: True
Task GDZmI - Sleeping 9s
Task 3 with code RBEgG finished: True
Task TPJKM - Sleeping 10s
Task 4 with code GDZmI finished: True
Task CggXZ - Sleeping 7s
Task 5 with code VYCjw finished: True
Task TUGJm - Sleeping 12s
Task 6 with code CggXZ finished: True
Task THlhj - Sleeping 11s
Task 7 with code TPJKM finished: True
Task 8 with code TUGJm finished: True
Task 9 with code THlhj finished: True
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 101, in _python_exit
    thread_wakeup.wakeup()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 89, in wakeup
    self._writer.send_bytes(b"")
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 183, in send_bytes
    self._check_closed()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed

AFAIK the code doesn't have an error itself. I've been researching really old questions like the following, without luck to find a fix for it:

  • this one related to the same error msg but for Python 2,
  • this issue in GitHub which seems to be exactly the same I'm having (and seems not fixed yet...?),
  • this other issue where the comment I linked to seems to point to the actual problem, but doesn't find a solution to it,
  • of course the official docs for Python 3.7,
  • ...

And so forth. However still unlucky to find how to solve this behaviour. Even found a old question here in SO who pointed to avoid using as_completed function and using submit instead (and from there came this actual test, before I just had a map to my process function).

Any idea, fix, explanation or workaround are welcome. Thanks!

Btc Sources
  • 1,912
  • 2
  • 30
  • 58

0 Answers0