6

I am working segmenting large ctype arrays and processing them in parallel. I am receiving the error below and believe it it because one segment of the array is finishing processing before another. I tried using process.join() to have the first set of processes wait, but that is not working. Ideas?

Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored

Using:

    ....

        with closing(multiprocessing.Pool(initializer=init(array))) as p:
            del array #Since the array is now stored in a shared array destroy the array ref for memory reasons

            step = y // cores
            if step != 0:
                jobs =[]
                for i in range (0, y, step):
                    process = p.Process(target=stretch, args= (shared_arr,slice(i, i+step)),kwargs=options)
                    jobs.append(process)
                    process.start()

                for j in jobs:
                    j.join()

    del jobs
    del process

Update:

 #Create an ctypes array
        array = ArrayConvert.SharedMemArray(array)
        #Create a global of options
        init_options(options) #options is a dict
        with closing(multiprocessing.Pool(initializer=init(array))) as p:
            del array #Since the array is not stored in a shared array destroy the array ref for memory reasons


            step = y // cores
            if step != 0:
                for i in range (0, y, step):
                    #Package all the options into a global dictionary

                    p.map_async(stretch,[slice(i, i+step)])

                    #p.apply_async(stretch,args=(shared_arr,slice(i, i+step)),kwargs=options)

        p.join()        

def init_options(options_):
    global kwoptions
    kwoptions = options_

The function that I am passing to map_async is stored in a different module, so I am struggling to get the global kwoptions passed to that function. It does not seem right to be passing globals around between modules like this (unpythonic). Is this the way to be able to pass kwargs through map_async.

Should I be reworking the multiprocessing using something different (apply or Process)?

Jzl5325
  • 3,898
  • 8
  • 42
  • 62
  • you shouldn't create new processes if you use Pool. See [an example](http://stackoverflow.com/a/7908612/4279) – jfs Aug 07 '12 at 18:01
  • @J.F.Sebastian Thanks for the link and for posting that answer - it has been helpful. Is it possible to pass map_async a series of kwargs though? I though that map_async was single arg only( the reason I went with Process). I guess I could wrap them using a lamdba and pass the function as the arg? – Jzl5325 Aug 07 '12 at 18:21
  • Create a wrapper as a named global function to pass keyword args to map_async. – jfs Aug 07 '12 at 18:44
  • 2
    You may find that http://bugs.python.org/issue15101 is related to this and that this is fixed in what will soon be 2.7.4. – gps Dec 01 '12 at 02:17

2 Answers2

2

So I got this working by reworking the code and removing pool (as per J.F. Sebastian's comment).

In pseudo code:

initialize the shared array
determine step size
create an empty list of jobs
create the process, pass it the kwargs, and append it to the job list
start the jobs
join the jobs

Here is the code if that helps any googler:

#Initialize the ctypes array
        init(array)
        #Remove the reference to the array (to preserve memory on multiple iterations.
        del array

        step = y // cores
        jobs = []
        if step != 0:
            for i in range(0,y,step):        
                p = multiprocessing.Process(target=stretch,args= (shared_arr,slice(i, i+step)),kwargs=options)
                jobs.append(p)

            for job in jobs:
                job.start()
            for job in jobs:
                job.join()
Jzl5325
  • 3,898
  • 8
  • 42
  • 62
  • There is no point to have more processes than CPUs available (for CPU bound jobs). `mp.Pool` allows you to separate number of worker processes and number of jobs i.e., the same worker process in a pool can execute multiple jobs in sequence. – jfs Aug 09 '12 at 10:52
1

initializer argument for Pool() accepts a function; replace initializer=init(array) with initializer=init, initargs=(array,)

To pass keyword arguments to a function f() used with pool.*map* family you could create a wrapper mp_f():

#!/usr/bin/env python
import logging
import multiprocessing as mp
from contextlib import closing

def init(shared_array_):
    # globals that should be available in worker processes should be
    # initialized here
    global shared_array
    shared_array = shared_array_

def f(interval, a=None, b=None):
    mp.get_logger().info("interval=%r, a=%r, b=%r" % (interval, a, b))
    shared_array[interval] = [a + interval.start]*b # fake computations

def mp_f(arg_kwargs):
    try:
        arg, kwargs = arg_kwargs
        return f(arg, **kwargs) # pass keyword args to f()
    except Exception:
        mp.get_logger().error("f%r failed" % (arg_kwargs,))

def main():
    mp.log_to_stderr().setLevel(logging.INFO)

    N = 10**6
    array = mp.RawArray('i', N) # create shared array

    # create workers pool; use all available CPU cores
    with closing(mp.Pool(initializer=init, initargs=(array,))) as p:
        options = dict(a=5, b=N//4) # dummy options
        step = options['b']
        args = ((slice(i, i+step), options) for i in range(0, N, step))
        for _ in p.imap_unordered(mp_f, args): # submit jobs
            pass
    p.join()
    mp.get_logger().info(array[::step])

if __name__=="__main__":
    mp.freeze_support() # for py2exe and the-like on Windows
    main()
jfs
  • 399,953
  • 195
  • 994
  • 1,670