20

I am writing a Python program using its multiprocessing module. The program calls a number of worker functions, each yielding a random number. I need to terminate the program once one of the workers has produced a number larger than 0.7.

Below is my program where the "how to do this" part is not yet filled out. Any idea? Thanks.

import time
import numpy as np
import multiprocessing as mp
import time
import sys

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    print "From i = ",i, "       res = ",res
    if res>0.7:
        print "find it"
        # terminate  ???? Question: How to do this???


if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()
zell
  • 9,830
  • 10
  • 62
  • 115
  • You're looking for a way to facilitate communication between parent and child processes. This is answered at [Communication between parent child processes](http://stackoverflow.com/questions/6099981/communication-between-parent-child-processes). – Akshat Mahajan May 01 '16 at 02:56

4 Answers4

37

No process can stop another short of brute force os.kill()-like sledgehammers. Don't go there.

To do this sanely, you need to rework your basic approach: the main process and the worker processes need to communicate with each other.

I'd flesh it out, but the example so far is too bare-bones to make it useful. For example, as written, no more than num_workers calls to rand() are ever made, so there's no reason to believe any of them must be > 0.7.

Once the worker function grows a loop, then it becomes more obvious. For example, the worker could check to see if an mp.Event is set at the top of the loop, and just exit if it is. The main process would set the Event when it wants the workers to stop.

And a worker could set a different mp.Event when it found a value > 0.7. The main process would wait for that Event, then set the "time to stop" Event for workers to see, then do the usual loop .join()-ing the workers for a clean shutdown.

EDIT

Here's fleshing out a portable, clean solution, assuming the workers are going to keep going until at least one finds a value > 0.7. Note that I removed numpy from this, because it's irrelevant to this code. The code here should work fine under any stock Python on any platform supporting multiprocessing:

import random
from time import sleep

def worker(i, quit, foundit):
    print "%d started" % i
    while not quit.is_set():
        x = random.random()
        if x > 0.7:
            print '%d found %g' % (i, x)
            foundit.set()
            break
        sleep(0.1)
    print "%d is done" % i

if __name__ == "__main__":
    import multiprocessing as mp
    quit = mp.Event()
    foundit = mp.Event()
    for i in range(mp.cpu_count()):
        p = mp.Process(target=worker, args=(i, quit, foundit))
        p.start()
    foundit.wait()
    quit.set()

And some sample output:

0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done

Everything shuts down cleanly: no tracebacks, no abnormal terminations, no zombie processes left behind ... clean as a whistle.

KILLING IT

As @noxdafox pointed at, there's a Pool.terminate() method that does the best it can, across platforms, to kill worker processes no matter what they're doing (e.g., on Windows it calls the platform TerminateProcess()). I don't recommend it for production code, because killing a process abruptly can leave various shared resources in inconsistent states, or let them leak. There are various warnings about that in the multiprocessing docs, to which you should add your OS docs.

Still, it can be expedient! Here's a full program using this approach. Note that I bumped the cutoff to 0.95, to make this more likely to take longer than an eyeblink to run:

import random
from time import sleep

def worker(i):
    print "%d started" % i
    while True:
        x = random.random()
        print '%d found %g' % (i, x)
        if x > 0.95:
            return x # triggers callback
        sleep(0.5)

# callback running only in __main__
def quit(arg):
    print "quitting with %g" % arg
    # note: p is visible because it's global in __main__
    p.terminate()  # kill all pool workers

if __name__ == "__main__":
    import multiprocessing as mp
    ncpu = mp.cpu_count()
    p = mp.Pool(ncpu)
    for i in range(ncpu):
        p.apply_async(worker, args=(i,), callback=quit)
    p.close()
    p.join()

And some sample output:

$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]
Tim Peters
  • 67,464
  • 13
  • 126
  • 132
  • For the second `pool` method, what is the purpose of `p.close()` at the 2 line from bottom – WoooHaaaa Aug 10 '20 at 06:20
  • Always best practice to follow intended shutdown procedures. https://stackoverflow.com/questions/35708371/purpose-of-pool-join-pool-close-in-multiprocessing – Tim Peters Aug 10 '20 at 14:15
  • @TimPeters Thanks for you solution. How can I get the return the value? (not only printing it out) – diegus Apr 26 '21 at 07:33
  • @diegus, via any interprocess communication mechanism you like. The second approach in this answer already showed one such: using a callback to deliver the result back to the main process. But you could use anything else you like (e.g., a queue, a pipe, shared memory, ...). It wasn't in the scope of the _original_ question, so I won't say more about that here. – Tim Peters Apr 26 '21 at 18:10
  • how does it work with map and imap? there i cant give any arguments – Amir May 23 '22 at 12:39
5

There is a much cleaner and pythonic way to do what you want to do and it's achieved by using the callback functions offered by multiprocessing.Pool.

You can check this question to see an implementation example.

noxdafox
  • 14,439
  • 4
  • 33
  • 45
2

As one of the other users mentioned, you need the processes to communicate with each other in order to get them to terminate their peers. While you can use os.kill to terminate the peer processes, it is more graceful to signal a termination.

The solution I used is a pretty simple one: 1. find out the process ID (pid) of the main process, which spawns all the other worker processes. This connection information is available from the OS, which keeps track which child process was spawned from which parent process. 2. when one of the worker processes reaches your end condition, it uses the parent process ID to find all the child processes of the main process (including itself), then goes through the list and signals them to end (making sure it is not signaling itself) The code below contains the working solution.

import time
import numpy as np
import multiprocessing as mp
import time
import sys
import os
import psutil
import signal

pid_array = []

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    current_process = os.getpid()
    print "From i = ",i, "       res = ",res, " with process ID (pid) = ", current_process
    if res>0.7:
        print "find it"
        # solution: use the parent child connection between processes
        parent = psutil.Process(main_process)
        children = parent.children(recursive=True)
        for process in children:
            if not (process.pid == current_process):
                print "Process: ",current_process,  " killed process: ", process.pid
                process.send_signal(signal.SIGTERM)


if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    main_process = os.getpid()
    print "Main process: ", main_process
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()

The output gives a clear idea of what is happening:

Main process:  30249
From i =  0        res =  0.224609517693  with process ID (pid) =  30259
From i =  1        res =  0.470935062176  with process ID (pid) =  30260
From i =  2        res =  0.493680214732  with process ID (pid) =  30261
From i =  3        res =  0.342349294134  with process ID (pid) =  30262
From i =  4        res =  0.149124648092  with process ID (pid) =  30263
From i =  5        res =  0.0134122107375  with process ID (pid) =  30264
From i =  6        res =  0.719062852901  with process ID (pid) =  30265
find it
From i =  7        res =  0.663682945388  with process ID (pid) =  30266
Process:  30265  killed process:  30259
Process:  30265  killed process:  30260
Process:  30265  killed process:  30261
Process:  30265  killed process:  30262
Process:  30265  killed process:  30263
Process:  30265  killed process:  30264
Process:  30265  killed process:  30266
Muschel
  • 340
  • 5
  • 11
  • Just noting that this only works on platforms (like Linux) where a new process is started via `fork()` - otherwise it will raise `NameError` when a worker tries to access `main_process`. – Tim Peters May 01 '16 at 04:02
  • What is psutils? Is it necessary? – zell May 01 '16 at 04:03
  • 1
    `psutils` is yet another 3rd-party package you'll have to download. Remember I already said "don't go there" in my answer ;-) – Tim Peters May 01 '16 at 04:03
  • I prefer not to download psutils. It makes this simple program depend on something you have to download. Is it necessary? – zell May 01 '16 at 04:05
  • Then you'll have to accept sticking to what `multiprocessing` offers (which is thoroughly adequate, but requires playing along with its portable and relatively clean view of the world). – Tim Peters May 01 '16 at 04:09
  • I'm not answering the "is it necessary" question because - necessary or not - this is an ugly, brute force, non-portable approach to what _should_ be a very simple application of the clean & portable interprocess communication facilities `multiprocessing` offers. If you're unwilling to learn how to use those, your career in multiprocessing applications will be painful but mercifully brief ;-) – Tim Peters May 01 '16 at 04:20
  • @TimPeters I think you have an elegant answer in mind. Would you elaborate? – zell May 01 '16 at 05:01
  • @zell, I already gave an answer here. Yes, it's just a sketch, but the sample program does so little I can't guess at what you _really_ need. – Tim Peters May 01 '16 at 05:04
  • @zell, see edit to my answer, giving a full program. – Tim Peters May 01 '16 at 05:23
-5

You can terminate your Program simply by importing exit() from sys

import sys 

sys.exit()
Akshat Mahajan
  • 9,543
  • 4
  • 35
  • 44
KD Parmar
  • 1
  • 1
  • Thanks. But your answer is wrong. sys.exit() only terminates the process which executes it. – zell May 01 '16 at 02:50
  • 1
    From the docs: "Since exit() ultimately “only” raises an exception, it will only exit the process **when called from the main thread**, and the exception is not intercepted." It doesn't help facilitate the termination of multiple processes, nor does it solve the actual problem: communicating to the main process that a termination condition has been met. – Akshat Mahajan May 01 '16 at 02:52