0

Processes not cleaned for reuse

Hi there,

I stumbled upon a problem with ProcessPoolExecutor, where processes access data, they should not be able to. Let me explain:

I have a situation similar to the below example: I got several runs to start with different arguments each. They compute their stuff in parallel and have no reason to interact with each other. Now, as I understand it, when a process forks, it duplicates itself. The child process has the same (memory) data, as its parent, but should it change anything, it does so on its own copy. If I would want the changes to survive the lifetime of the child process, I would call in queues, pipes and other IPC stuff.

But I actually don't! The processes each manipulate data for their own, which should not carry over to any of the other runs. The example below shows otherwise, though. The next runs (not parallel running ones) can access the data of their previous run, implicating, that the data has not been scrubbed from the process.

Code/Example

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process, set_start_method

class Static:
    integer: int = 0

def inprocess(run: int) -> None:
    cp = current_process()
    # Print current state
    print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static.integer}", flush=True)

    # Check value
    if Static.integer != 0:
        raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")

    # Update value
    Static.integer = run + 1

def pooling():
    cp = current_process()
    # Get master's pid
    print(f"[{cp.pid} {cp.name}] Start")
    with ProcessPoolExecutor(max_workers=2) as executor:
        for i, _ in enumerate(executor.map(inprocess, range(4))):
            print(f"run #{i} finished", flush=True)

if __name__ == '__main__':
    set_start_method("fork")    # enforce fork
    pooling()

Output

[1998 MainProcess] Start
[ 0 2020 Process-1] int: 0
[ 2 2020 Process-1] int: 1
[ 1 2021 Process-2] int: 0
[ 3 2021 Process-2] int: 2
run #0 finished
run #1 finished
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 175, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in _process_chunk
    return [fn(*args) for args in chunk]
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in <listcomp>
    return [fn(*args) for args in chunk]
  File "<stdin>", line 14, in inprocess
Exception: [ 2 2020 Process-1] Variable already set!
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 29, in <module>
  File "<stdin>", line 24, in pooling
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
Exception: [ 2 2020 Process-1] Variable already set!

This behaviour can also be reproduced with max_workers=1, as the process is re-used. The start-method has no influence on the error (though only "fork" seems to use more than one process).


So to summarise: I want each new run in a process with all previous data, but no new data from any of the other runs. Is that possible? How would I achive it? Why does the above not do exactly that?

I appreciate any help.


I found multiprocessing.pool.Pool where one can set maxtasksperchild=1, so a worker process is destroyed, when its task is finished. But I dislike the multiprocessing interface; the ProcessPoolExecutor is more comfortable to use. Additionally, the whole idea of the pool is to save process setup time, which would be dismissed, when killing the hosting process after each run.

Dave J
  • 475
  • 9
  • 18

2 Answers2

5

Brand new processes in python do not share memory state. However ProcessPoolExecutor reuses process instances. It's a pool of active processes after all. I assume this is done to prevent the OS overhead of stooping and starting processes all the time.

You see the same behavior in other distribution technologies like celery where if you're not careful you can bleed global state between executions.

I recommend you manage your namespace better to encapsulate your data. Using your example, you could for example encapsulate your code and data in a parent class which you instantiate in inprocess(), instead of storing it in a shared namespace like a static field in classes or directly in a module. That way the object will ultimate be cleaned up by the garbage collector:

class State:
    def __init__(self):
        self.integer: int = 0

    def do_stuff():
        self.integer += 42

def use_global_function(state):
    state.integer -= 1664
    state.do_stuff()

def inprocess(run: int) -> None:
    cp = current_process()
    state = State()
    print(f"[{run:2d} {cp.pid} {cp.name}] int: {state.integer}", flush=True)
    if state.integer != 0:
        raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")
    state.integer = run + 1
    state.do_stuff()
    use_global_function(state)
André C. Andersen
  • 8,955
  • 3
  • 53
  • 79
  • thanks. regarding the process reuse: i assumed the same. regarding your suggestion: unfortunately, that does not work for me. i had to find an example, without all the actual overhead. i am relying on metaclasses to reduce my api and as a result, i cannot instantiate those things in the `inprocess` call. – Dave J Jun 21 '18 at 15:13
0

I have been facing some potentially similar problems and saw some interesting posts a in this one High Memory Usage Using Python Multiprocessing, that points towards using gc.collector(), however in your case it did not worked. So I thought of how the Static class was initialized, some points:

  1. Unfortunately, I cannot reproduce your minimal example the value error prompts: ValueError: cannot find context for 'fork'
  2. Considering 1, I use set_start_method("spawn") A quick fix then could be to initialize every time the Static class as below:
{
    class Static:
        integer: int = 0
        def __init__(self):
            pass
    
    def inprocess(run: int) -> None:
        cp = current_process()
        # Print current state
        print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static().integer}", flush=True)
    
        # Check value
        if Static().integer != 0:
            raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")
    
        # Update value
        Static().integer = run + 1
    
    
    def pooling():
        cp = current_process()
        # Get master's pid
        print(f"[{cp.pid} {cp.name}] Start")
        with ProcessPoolExecutor(max_workers=2) as executor:
            for i, _ in enumerate(executor.map(inprocess, range(4))):
                print(f"run #{i} finished", flush=True)
    
    
    if __name__ == "__main__":
        print("start")
        # set_start_method("fork")  # enforce fork , ValueError: cannot find context for 'fork'
        set_start_method("spawn")    # Alternative
        pooling()
}

This returns:

[ 0 1424 SpawnProcess-2] int: 0
[ 1 1424 SpawnProcess-2] int: 0
run #0 finished
[ 2 17956 SpawnProcess-1] int: 0
[ 3 1424 SpawnProcess-2] int: 0
run #1 finished
run #2 finished
run #3 finished
β.εηοιτ.βε
  • 33,893
  • 13
  • 69
  • 83
DTK
  • 95
  • 1
  • 9