30

In Python I have seen many examples where multiprocessing is called but the target just prints something. I have a scenario where the target returns 2 variables, which I need to use later. For example:

def foo(some args):
   a = someObject
   b = someObject
   return a,b

p1=multiprocess(target=foo,args(some args))
p2=multiprocess(target=foo,args(some args))
p3=multiprocess(target=foo,args(some args))

Now what? I can do .start and .join, but how do I retrieve the individual results? I need to catch the return a,b for all the jobs I execute and then work on it.

Nishant
  • 20,354
  • 18
  • 69
  • 101

6 Answers6

30

You are looking to do some embarrassingly parallel work using multiple processes, so why not use a Pool? A Pool will take care of starting up the processes, retrieving the results, and returning the results to you.

I use pathos, which has a fork of multiprocessing, because it has much better serialization than the version that standard library provides.

(.py) file

from pathos.multiprocessing import ProcessingPool as Pool

def foo(obj1, obj2):
    a = obj1.x**2
    b = obj2.x**2
    return a,b

class Bar(object):
    def __init__(self, x):
        self.x = x

Pool().map(foo, [Bar(1),Bar(2),Bar(3)], [Bar(4),Bar(5),Bar(6)])

Result

[(1, 16), (4, 25), (9, 36)]

And you see that foo takes two arguments, and returns a tuple of two objects. The map method of Pool submits foo to the underlying processes and returns the result as res.

You can get pathos here: https://github.com/uqfoundation

Trenton McKinney
  • 56,955
  • 33
  • 144
  • 158
Mike McKerns
  • 33,715
  • 8
  • 119
  • 139
23

Yes, sure - you can use a number of methods. One of the easiest ones is a shared Queue. See an example here: http://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/

Eli Bendersky
  • 263,248
  • 89
  • 350
  • 412
  • Is there a restriction on the return value ? What if it is a binary file like pdf ? My target is to get n no: of pdf_S and then concatenate . Order is not significant for us . – Nishant May 29 '12 at 11:18
  • 1
    @Nishant: it can be any data really. For transferring actual files I would do a cautious analysis taking the file size into account. It may be more convenient to just write the files to disk and pass around pointers to them (i.e. names), but care must be taken in terms of synchronization and atomicity – Eli Bendersky May 29 '12 at 11:22
10

I'm copying this example straight from the docs because I can't give you a direct link to it. Note that it prints out the results from the done_queue, but you can do whatever you like with it.

#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue.  If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)):
        print '\t', done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print '\t', done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()

It is originally from the multiprocessing module docs.

cha0site
  • 10,517
  • 3
  • 33
  • 51
5

Why nobody uses callback of multiprocessing.Pool?

Example:

from multiprocessing import Pool
from contextlib import contextmanager

from pprint import pprint
from requests import get as get_page

@contextmanager
def _terminating(thing):
    try:
        yield thing
    finally:
        thing.terminate()

def _callback(*args, **kwargs):
    print("CALBACK")
    pprint(args)
    pprint(kwargs)

print("Processing...")
with _terminating(Pool(processes=WORKERS)) as pool:
    results = pool.map_async(get_page, URLS, callback=_callback)

    start_time = time.time()
    results.wait()
    end_time = time.time()
    print("Time for Processing: %ssecs" % (end_time - start_time))

Here, I print both args and kwargs. But you can replace callback by:

def _callback2(responses):
    for r in responses:
        print(r.status_code) # or do whatever with response...
KenanBek
  • 999
  • 1
  • 14
  • 21
4

It won't work on windows but here is is my multiprocessing decorator for functions, it returns a queue that you can poll and collect returned data from

import os
from Queue import Queue
from multiprocessing import Process

def returning_wrapper(func, *args, **kwargs):
    queue = kwargs.get("multiprocess_returnable")
    del kwargs["multiprocess_returnable"]
    queue.put(func(*args, **kwargs))

class Multiprocess(object):
    """Cute decorator to run a function in multiple processes."""
    def __init__(self, func):
        self.func = func
        self.processes = []

    def __call__(self, *args, **kwargs):
        num_processes = kwargs.get("multiprocess_num_processes", 2) # default to two processes.
        return_obj = kwargs.get("multiprocess_returnable", Queue()) # default to stdlib Queue
        kwargs["multiprocess_returnable"] = return_obj
        for i in xrange(num_processes):
            pro = Process(target=returning_wrapper, args=tuple([self.func] + list(args)), kwargs=kwargs)
            self.processes.append(pro)
            pro.start()
        return return_obj


@Multiprocess
def info():
    print 'module name:', __name__
    print 'parent process:', os.getppid()
    print 'process id:', os.getpid()
    return 4 * 22

data = info()
print data.get(False)
Jakob Bowyer
  • 33,878
  • 8
  • 76
  • 91
2

Here is an example of multi-process search for huge files.

Farshid Ashouri
  • 16,143
  • 7
  • 52
  • 66