6

I'm working on code where I frequently have to use python's multiprocessing Pool class. This results in a ton of code that looks like this:

import time
from multiprocessing import Pool
from functools import partial

def test_func(x):
    time.sleep(1)
    return x

def test_func_parallel(iterable, processes):
    p = Pool(processes=processes)
    output = p.map(test_func, iterable)
    p.close()
    return output

This can be made more general:

def parallel(func, iterable, **kwargs):
    func = partial(func, **kwargs)
    p = Pool(processes=6)
    out = p.map(func, iterable)
    p.close()
    return out

This works, but adding a parallel wrapper to every other function complicates the code. What I'd really like is to get this working as a decorator. Something like this:

def parallel(num_processes):
    def parallel_decorator(func, num_processes=num_processes):
        def parallel_wrapper(iterable, **kwargs):
            func = partial(func, **kwargs)
            p = Pool(processes=num_processes)
            output = p.map(func, iterable)
            p.close()
            return output

        return parallel_wrapper
    return parallel_decorator

Which could be used as follows:

@parallel(6)
def test_func(x):
    time.sleep(1)
    return x

This fails for pickle reasons

Can't pickle <function test1 at 0x117473268>: it's not the same object as __main__.test1

I've read a few posts on related issues, but they all implement a solution where the multiprocessing is executed outside the decorator. Does anyone know a way to make this work?

Karl
  • 961
  • 6
  • 10
  • 1
    BTW, this is a much better question than most I see about multiprocessing -- well-thought-through, has a solid reproducer, etc. – Charles Duffy Apr 16 '21 at 21:09
  • 1
    See [What can multiprocessing and dill do together?](https://stackoverflow.com/questions/19984152/what-can-multiprocessing-and-dill-do-together) -- switch from `multiprocessing` to the 3rd-party `pathos.multiprocessing` and you're there. – Charles Duffy Apr 16 '21 at 21:52
  • can you try this with "fork" or are you on Windows? – Aaron Apr 17 '21 at 03:08
  • Have you tried leveraging `copyreg`, at all? https://docs.python.org/3.7/library/copyreg.html There's also `partialmethod' in functools, though I don't know if that would be a solution. https://docs.python.org/3.7/library/functools.html – Mark Moretto Apr 17 '21 at 15:10
  • "they all implement a solution where the multiprocessing is executed outside the decorator." >> would you mind sharing the link for these.. (for my/public learning..).. tq.. – p._phidot_ May 12 '21 at 21:40

2 Answers2

3

If you don't mind to not use the syntactic sugar for decorators (@ symbol), something like this should work:

import functools
import time

from multiprocessing import Pool


def parallel(func=None, **options):
    if func is None:
        return functools.partial(parallel, **options)

    def wrapper(iterable, **kwargs):
        processes = options["processes"]

        with Pool(processes) as pool:
            result = pool.map(func, iterable)

        return result

    return wrapper


def test(i):
    time.sleep(1)
    print(f"{i}: {i * i}")

test_parallel = parallel(test, processes=6)


def main():
    test_parallel(range(10))


if __name__ == "__main__":
    main()
HTF
  • 6,632
  • 6
  • 30
  • 49
  • Why not just `parallel(test, processes=6)(range(10))`? I also don't think that `@functools.wraps(func)` is accomplishing anything because you are not wrapping `func`. – Booboo Apr 17 '21 at 15:43
0

I have the same problem. It revolves around how Pool() objects are implemented. So, it is going to work fine with a normal wrapper but not with a Decorator. The workaround is to define your own Pool()-like implementation using Process().

This can be very tricky to optimize but if you are a Decorator enthusiast here is a (dirty) example:

# something to do
args = range(10)


def parallel(function):
    """ An alternative implementation to
    multiprocessing.Pool().map() using
    multiprocessing.Process(). """

    def interfacer(args):
        """ The wrapper function. """
        
        # required libraries
        from multiprocessing import (Queue, Process)
        from os import cpu_count
        
        # process control
        ## maximum number of processes required
        max_processes = len(args)
        
        ## maximum numer of processes running
        max_threads = cpu_count() - 1
        
        """ Since there is no Pool() around
        we need to take care of the processes
        ourselves. If there is nothing for a
        processes to do, it is going to await
        for an input, if there are too many of
        them, the processor shall suffer. """
        
        # communications
        ## things to do
        inbasket = Queue()
        
        ## things done
        outbasket = Queue()
        
        """ I am thinking asynchronouly,
        there is probably a better way of
        doing this. """
        
        # populate inputs
        for each in args:
            
            ## put arguments into the basket
            inbasket.put(each)
        
        def doer():
            """ Feeds the targeted/decorated
            'function' with data from the baskets and
            collets the results.
            
            This blind function helps the
            implementation to generalize over any
            iterable. """
            
            outbasket.put(function(inbasket.get()))
            return(True)
        
        def run(processes = max_threads):
            """ Create a certain number of
            Process()s and runs each one. 
            There is room for improvements here. """
            
            # the process pool
            factory = list()
            
            # populate the process pool
            for each in range(processes):
                factory.append(Process(target = doer))
            
            # execute in process pool
            for each in factory:
                each.start()
                each.join()
                each.close()
            
            return(True)
        
        """ Now we need to manage the processes,
        and prevent them for overwhelm the CPU.
        That is the tricky part that Pool() does
        so well. """
        
        while max_processes:
        # as long as there is something to do
        
            if (max_processes - max_threads) >= 0:
                
                run(max_threads)
                max_processes -= max_threads
            
            else:
            # play it safe    
                run(1)
                max_processes -= 1
        
        # undo the queue and give me back the list of 'dones'
        return([outbasket.get() for each in range(outbasket.qsize())])

    return(interfacer)

@parallel
def test(x):
    return(x**2)

print(test(args))

Probably this code is inefficient, but gives an idea.

Bruno
  • 61
  • 3