3

I'm converting some serial processed python jobs to multiprocessing with dask or joblib. Sadly I need to work on windows.
When running from within IPython or from command line invoking the py-file with python everything is running fine.
When compiling an executable with cython, it is no longer running fine: Step by step more and more processes (unlimited and bigger than the number of requested processes) get startet and block my system.
It somehow feels like Multiprocessing Bomb - but of course I used if __name__=="__main__:" for having the control block - approved by fine running from python call at the command line.
My cython call is cython --embed --verbose --annotate THECODE.PY and I'm compiling with gcc -time -municode -DMS_WIN64 -mthreads -Wall -O -I"PATH_TO_\include" -L"PATH_TO_\libs" THECODE.c -lpython36 -o THECODE resulting in a windows executable THECODE.exe.
With other (single processing) code that is running fine.
The problem seems to be the same for dask and joblib (what might mean, that dask works like or is based on joblib).
Any suggestions?

For those interested in a mcve: Just taking the first code from Multiprocessing Bomb and compiling it with my cython commands above will result in an executable blowing your system. (I just tried :-) )

I just found something interesting by adding one line to the code sample for showing the __name__:

import multiprocessing

def worker():
    """worker function"""
    print('Worker')
    return

print("-->" + __name__ + "<--")
if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

When running that piece of code with python it shows

__main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__

(other output supressed). Explaining that the if decision works. When running the executable after cython and compilation is shows

__main__
__main__
__main__
__main__
__main__
__main__

and more and more. Thus the workers call to the module are no longer masqueraded like an import and thus each workers tries to start five new ones in a recursive manner.

Bastian Ebeling
  • 1,138
  • 11
  • 38

4 Answers4

7

When starting a new python-process multiprocessing-module uses spawn-method on Windows (this behavior can be also triggered on Linux by using mp.set_start_method('spawn').

Command-line arguments are passed to the interpreter in the new process, so the communication with the parent process can be established, for example:

 python -c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork

The problem with embeded cython modules (or with frozen (i.e. created with cx_Freeze, py2exe and similar) modules in general), that passing command line arguments to them corresponds more to

python my_script.py <arguments>

i.e. the command line aren't automatically processed by the interpeter, but needs to be handled in the script.

multiprocessing provides a function called multiprocessing.freeze_support(), which handles the command line arguments correctly and which can be used as shown in Bastian's answer:

if __name__ == '__main__':
    # needed for Cython, as it doesn't set `frozen`-attribute
    setattr(sys, 'frozen', True) 
    # parse command line options and execute it if needed
    multiprocessing.freeze_support()

This solution works however only for Windows, as can be seen in the code:

def freeze_support(self):
    '''Check whether this is a fake forked process in a frozen executable.
    If so then run code specified by commandline and exit.
    '''
    if sys.platform == 'win32' and getattr(sys, 'frozen', False):
        from .spawn import freeze_support
        freeze_support()

There is a bug-report: multiprocessing freeze_support needed outside win32 which might/might not be fixed soon.

As explained in the above bug-report, it is not enough to set frozen attribute to True and to call freeze_support directly from the multiprocessing.spawn because than the semaphore tracker isn't handled correctly.

There are two options I see: either to patch your installation with a yet unreleased patch from the above bug report or to use the do-it-yourself approach presented bellow.


Here are an earlier version of this answer which is more "experimental" but offers more insights/details and proposes a solution in a somewhat Do-It-Yourself-style.

I'm on linux, so I use mp.set_start_method('spawn') to simulate the behavior of windows.

What happens in the spawn-mode? Let's add some sleeps, so we can investigate the processes:

#bomb.py
import multiprocessing as mp
import sys
import time

def worker():
    time.sleep(50)
    print('Worker')
    return

if __name__ == '__main__':
        print("Starting...")
        time.sleep(20)
        mp.set_start_method('spawn') ## use spawn!
        jobs = []
        for i in range(5):
            p = mp.Process(target=worker)
            jobs.append(p)
            p.start()

By using pgrep python we can see that at first there is only one-python process, then 7(!) different pids. We can see the command-line arguments via cat /proc/<pid>/cmdline. 5 of the new processes have command line

-c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork

and one:

-c "from multiprocessing.semaphore_tracker import main;main(4)"

That means, the parent process starts 6 new python interpreter instances and every newly started interpreter executes a code sent from the parent via the command line options, the information is shared via pipes. One of these 6 python-instances is a tracker, which observes the whole thing.

Ok, what happens if cythonized+embeded? The same as with the normal python, the only difference is that the bomb-executable is started instead of python. But differently as the python-interpreter, it doesn't execute/isn't aware of the command line arguments, so the main function runs over and over and over again.

There is an easy fix: let the bomb-exe to start the python interpreter

 ...
 if __name__ == '__main__':
    mp.set_executable(<PATH TO PYTHON>)
 ....

Now the bomb is no longer a multiprocessing bomb!

However, the goal is probably not to have a python-interpreter around, so we need to make our program aware of possible command lines:

import re
......
if __name__ == '__main__':
    if len(sys.argv)==3:  # should start in semaphore_tracker mode
        nr=list(map(int, re.findall(r'\d+',sys.argv[2])))          
        sys.argv[1]='--multiprocessing-fork'   # this canary is needed for multiprocessing module to work   
        from multiprocessing.semaphore_tracker import main;main(nr[0])

    elif len(sys.argv)>3: # should start in slave mode
        fd, pipe=map(int, re.findall(r'\d+',sys.argv[2]))
        print("I'm a slave!, fd=%d, pipe=%d"%(fd,pipe)) 
        sys.argv[1]='--multiprocessing-fork'   # this canary is needed for multiprocessing module to work  
        from multiprocessing.spawn import spawn_main; 
        spawn_main(tracker_fd=fd, pipe_handle=pipe)

    else: #main mode
        print("Starting...")
        mp.set_start_method('spawn')
        jobs = []
        for i in range(5):
            p = mp.Process(target=worker)
            jobs.append(p)
            p.start()

Now, our bomb doesn't need a stand-alone python-interpreter and stops after the workers are done. Please note the following:

  1. The way it is decide, in which mode bomb should be started is not very error-safe, but I hope you get the gist
  2. --multiprocessing-fork is just a canary, it doesn't do anything it only must be there, see here.

NB: The changed code can be also used with python, because after executing "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork python changes the sys.argv so the code no longer sees the original command line and len(sys.argv) is 1.

ead
  • 32,758
  • 6
  • 90
  • 153
  • Found a [solution](https://stackoverflow.com/questions/53497078/joblib-parallel-cython-hanging-forever) that works for me: I had to use `with multiprocessing.get_context("spawn").Pool() as pool: ...` in my app to force Loky-backend of the dependency to use `n_jobs=1`. No mp-bombs in my frozen app anymore :thumbsup: – Alex Jan 27 '20 at 07:24
3

I think based on the detail from the submitted bug report, I can offer the maybe most elegant solution over here

if __name__ == '__main__':
    if sys.argv[0][-4:] == '.exe':
        setattr(sys, 'frozen', True)
    multiprocessing.freeze_support()
    YOURMAINROUTINE()

The freeze_support()-call is needed on windows - see python multiprocessing documentation.
If running within python only with that line it is already fine.
But somehow cython is obviously not aware of some of those things (the docs tell it is tested with py2exe, PyInstaller and cx_Freeze). It could be alleviated by the setattr-call, which only may be used when compiling, thus the decision by file extension.

Bastian Ebeling
  • 1,138
  • 11
  • 38
  • 1
    Tried this and, unfortunately, I still get multiple spawns of my frozen command line app using multiprocessing.Pool on Python 3.7 with joblib dependency. I verified, and `setattr(sys, 'frozen', True)` is executed (as a bomb, however). – Alex Jan 25 '20 at 10:32
  • This works only for windows see https://bugs.python.org/issue32146 – ead Feb 16 '20 at 19:17
2

Inspired by the answer (or the given ideas there) from ead, I found a very simple solution - or lets better call it workaround.
For me just changing the if clause to

if __name__ == '__main__':
    if len(sys.argv) == 1:
        main()
    else:
        sys.argv[1] = sys.argv[3]
        exec(sys.argv[2])

did it.
The reason why that works is (in my case): When calling the original .py-file the worker's __name__ is set to __mp_main__ (but all processes are just the plain .py-file).
When running the (cython) compiled version the worker's name is not usable, but the workers get called different and thus we can identify them by more that one argument in argv. In my case worker's argv reads

['MYPROGRAMM.exe',
 '-c',
 'from multiprocessing.spawn import spawn_main;
       spawn_main(parent_pid=9316, pipe_handle =392)',
 '--multiprocessing-fork']

Thus in argv[2] the code for activation of the workers is found and gets executed with the upper commands.
Of course if you need arguments for your compiled file, you need a bigger effort, maybe parsing for the parent_pid in the call. But in my case, that would simply be overdone.

Bastian Ebeling
  • 1,138
  • 11
  • 38
  • Your solution will only work for your minimal/similar examples. As soon as data must be shared between the master and the slaves your work around will break down... – ead Nov 20 '17 at 06:54
  • @ead oh sad - you are right. I just tested. Do you know, why? What can I do further? (Okay, it is no longer a process-bomp, but non functional. :-( ) – Bastian Ebeling Nov 20 '17 at 07:46
  • 1
    What is about my solution, which mimics the the right behavior of the python-interpreter? You will need to adjust it for windows (`parent_pid` instead of `fd` and `pipe_handle` instead of `pipe`), but I think that should set up the communication correctly – ead Nov 20 '17 at 08:29
  • @ead your solution works - I just needed to understand it. I'm fine with that - but I think it could be done (working for me also with big worker functions) in a more readable (or understandable) fashion - I'll update my answer. – Bastian Ebeling Nov 20 '17 at 08:59
0

Since the solutions proposed didn't work for me, I am providing an additional answer with a workaround.

My frozen app also resulted in a multiprocessing bomb. I could solve it by

  1. using Thread-based parallelism, instead of process-based multiprocessing and
  2. within Joblib Parallel execution, using Parallel(n_jobs=4, prefer="threads"), as suggested by this answer (instead of the default prefer="multiprocessing")

I couldn't get multiprocessing.Pool to work in the frozen app (neither with prefer="threads" nor with prefer="multiprocessing"), but one can switch to thread-based multiprocessing by see docs:

# a dependency with joblib
from dep_with_joblib import BigJob
# multiprocessing wrapper for threaded.Thread
from multiprocessing.dummy import Pool as ThreadPool
# instead of
# from multiprocessing import Pool

# thread based parallelism,
# works if `Parallel(n_jobs=4, prefer="threads")` is used
# in joblib (e.g. inside big_job())
POOL = ThreadPool(processes=1)

# as far as I can tell,
# the following Process based Parallelism 
# does _not_ work with frozen app/joblib atm
# POOL = Pool(processes=1)

class MainClass():
    def __init__(self):
        """Init ClusterGen"""
        return

    @staticmethod
    def run_big_job(big_job, data):
        """Run big_job on parallel thread"""
        big_job()
        return big_job

   def big_job_exec(self):
        """Big job execution"""


        bigjob = BigJob()
        big_job_input_data = ...
        # Start big_job on different thread
        async_result = POOL.apply_async(
            MainClass.run_big_job, (bigjob, big_job_input_data))
        # get results from clusterer
        bigjob_results = async_result.get()

More explicit example with Queue and threading.Thread:

import threading
import queue
# a dependency with joblib
from dep_with_joblib import BigJob

job_queue = queue.Queue()

def store_in_queue(f):
    def wrapper(*args):
        job_queue.put(f(*args))
    return wrapper

class MainClass():
    def __init__(self):
        """Init ClusterGen"""
        return

    @staticmethod
    @store_in_queue
    def run_big_job(big_job, data):
        """Run big_job on parallel thread"""
        big_job()
        return big_job

   def big_job_exec(self):
        """Big job execution"""

        bigjob = BigJob()
        big_job_input_data = ...
        # Start big_job on different thread
        t = threading.Thread(
            target=MainClass.run_big_job,
            args=(bigjob, big_job_input_data),
            group=None,
            name="example-bigjob",
        )
        t.start()
        # get results from big_job
        bigjob_results = job_queue.get()

in both of the examples above, bigjob() is run async on a different thread. The examples can be easily modified with multiple threads.

Why async? In my case BigJob() is a module from a dependency that uses Joblib.Parallel to improve speed, which wouldn't work when my app was frozen + I needed bigjob() to run async to prevent my GUI from crashing.

Alex
  • 2,784
  • 2
  • 32
  • 46