8

I have some Python code (on Windows) that uses the multiprocessing module to run a pool of worker processes. Each worker process needs to do some cleanup at the end of the map_async method.

Does anyone know how to do that?

Jed Smith
  • 15,584
  • 8
  • 52
  • 59
Dave
  • 173
  • 2
  • 7
  • 2
    I'm super late to the party here, but see [this answer](http://stackoverflow.com/a/24724452/2073595) for a way you can do this. – dano Apr 21 '15 at 13:43

2 Answers2

4

Do you really want to run a cleanup function once for each worker process rather than once for every task created by the map_async call?

multiprocess.pool.Pool creates a pool of, say, 8 worker processes. map_async might submit 40 tasks to be distributed among the 8 workers. I can imagine why you might want to run cleanup code at the end of each task, but I'm having trouble imagining why you would want to run cleanup code just before each of the 8 worker processes is finalized.

Nevertheless, if that is what you want to do, you could do it by monkeypatching multiprocessing.pool.worker:

import multiprocessing as mp
import multiprocessing.pool as mpool
from multiprocessing.util import debug

def cleanup():
    print('{n} CLEANUP'.format(n=mp.current_process().name))

# This code comes from /usr/lib/python2.6/multiprocessing/pool.py,
# except for the single line at the end which calls cleanup().
def myworker(inqueue, outqueue, initializer=None, initargs=()):
    put = outqueue.put
    get = inqueue.get
    if hasattr(inqueue, '_writer'):
        inqueue._writer.close()
        outqueue._reader.close()

    if initializer is not None:
        initializer(*initargs)

    while 1:
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, **kwds))
        except Exception, e:
            result = (False, e)
        put((job, i, result))
    cleanup()

# Here we monkeypatch mpool.worker
mpool.worker=myworker

def foo(i):
    return i*i

def main():
    pool = mp.Pool(8)
    results = pool.map_async(foo, range(40)).get()
    print(results)

if __name__=='__main__':
    main()

yields:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
PoolWorker-8 CLEANUP
PoolWorker-3 CLEANUP
PoolWorker-7 CLEANUP
PoolWorker-1 CLEANUP
PoolWorker-6 CLEANUP
PoolWorker-2 CLEANUP
PoolWorker-4 CLEANUP
PoolWorker-5 CLEANUP
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • Hi,it looks like it is what i need. I can't get why we have a initializer method and don't have a finalizer one when we create a pool. Regarding why i need to finalize , it is because i start some worker process that will then start an excel session (from win32com) and will proceed the all the queue item in the different sessions. My pb is that when the worker process finsh they don't clsoe the excel session. – Dave May 23 '11 at 09:37
  • @unutbu: Sorry but don't get it, map doesn't contain any wau to initialize or finalize a process. The func passed as argument is just a piece of work. One thing that could be done is to add a sentinel (like None ) that will properly shutdown the excel session. – Dave May 23 '11 at 17:31
  • @unubtu: Thx for the great answers. I think that it is more than a workaround and that should solve my issues. I also think that if i want to gain more controls over the worker processes i may to spend more times with the raw Process instead of using the very convenient Pool. Anyway with you last suggestiosns i can still have the comfort of the Pool ;) – Dave May 24 '11 at 07:33
  • @unubtu: would `pool.map` ensure that each worker will run the `shutdown_sessions` or `start_sessions` ? – vin May 04 '17 at 14:46
  • 2
    @vin: Thanks for pointing that out. You are correct -- there is no assurance that `shutdown_sessions` would be run by every worker if `pool.map(shutdown_sessions, ...)` is called. Instead, it seems the best way to run a finalizer is to [use dano's answer](http://stackoverflow.com/a/24724452/2073595). – unutbu May 04 '17 at 18:16
  • @unubtu: thanks for pointing out the answer, it is exactly what I was looking for!! – vin May 05 '17 at 02:15
2

Your only real option here is to run cleanup at the end of the function you map_async to.

If this cleanup is honestly intended for at process death, you cannot use the concept of a pool. They are orthogonal. A pool does not dictate the process lifetime unless you use maxtasksperchild, which is new in Python 2.7. Even then, you do not gain the ability to run code at process death. However, maxtasksperchild might suit you, because any resources that the process opens will definitely go away when the process is terminated.

That being said, if you have a bunch of functions that you need to run cleanup on, you can save duplication of effort by designing a decorator. Here's an example of what I mean:

import functools
import multiprocessing

def cleanup(f):
    """Decorator for shared cleanup mechanism"""
    @functools.wraps(f)
    def wrapped(arg):
        result = f(arg)
        print("Cleaning up after f({0})".format(arg))
        return result
    return wrapped

@cleanup
def task1(arg):
    print("Hello from task1({0})".format(arg))
    return arg * 2

@cleanup
def task2(arg):
    print("Bonjour from task2({0})".format(arg))
    return arg ** 2

def main():
    p = multiprocessing.Pool(processes=3)
    print(p.map(task1, [1, 2, 3]))
    print(p.map(task2, [1, 2, 3]))

if __name__ == "__main__":
    main()

When you execute this (barring stdout being jumbled because I'm not locking it here for brevity), the order you get things out should indicate that your cleanup task is running at the end of each task:

Hello from task1(1)
Cleaning up after f(1)
Hello from task1(2)
Cleaning up after f(2)
Hello from task1(3)
Cleaning up after f(3)
[2, 4, 6]

Bonjour from task2(1)
Cleaning up after f(1)
Bonjour from task2(2)
Cleaning up after f(2)
Bonjour from task2(3)
Cleaning up after f(3)
[1, 4, 9]
Jed Smith
  • 15,584
  • 8
  • 52
  • 59