345

In the example code below, I'd like to get the return value of the function worker. How can I go about doing this? Where is this value stored?

Example Code:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

I can't seem to find the relevant attribute in the objects stored in jobs.

Super Kai - Kazuya Ito
  • 22,221
  • 10
  • 124
  • 129
Louis Thibault
  • 20,240
  • 25
  • 83
  • 152

13 Answers13

339

Use shared variable to communicate. For example like this:

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())
Nico Schlömer
  • 53,797
  • 27
  • 201
  • 249
vartec
  • 131,205
  • 36
  • 218
  • 244
  • 74
    I would recommend using a [`multiprocessing.Queue`](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue), rather than a `Manager` here. Using a `Manager` requires spawning an entirely new process, which is overkill when a `Queue` would do. – dano Apr 19 '15 at 00:54
  • 6
    @dano : I wonder, if we use Queue() object, we can not sure the order when each process return the value. I mean if we need the order in the result, to do the next work. How could we sure where exactly which output is from which process – Chau Pham Sep 29 '16 at 11:08
  • 8
    @Catbuilts You could return a tuple from each process, where one value is the actual return value you care about, and the other is a unique identifier from the process. But I also wonder why you need to know which process is returning which value. If that what you actually need to know about the process, or do you need to correlate between your list of inputs and the list of outputs? In that case, I would recommend using `multiprocessing.Pool.map` to process your list of work items. – dano Dec 01 '16 at 14:43
  • this works great to run multiple functions at the same time and a trick to be able to get information after. =) – pelos Jul 11 '17 at 17:01
  • 24
    **caveats for functions with only a single argument** : should use `args=(my_function_argument, )`. Note the `,` comma here! Or else Python will complain "missing positional arguments". Took me 10 minutes to figure out. Also check the [manual usage](https://docs.python.org/3/library/multiprocessing.html) (under the "process class" section). – yuqli Apr 29 '19 at 15:17
  • 1
    @yuqli: For those that stumble here and don't know. The reason for the `(a,)` syntax is because that is how to specify a tuple of len 1 (because just `(a)` is not a tuple). You could also use `tuple([a])` if that's less confusing. It "looks more clear" but it's kind of indirect and wasteful. – ThePopMachine Sep 18 '19 at 15:59
  • 6
    @vartec one drawback of using a multipriocessing.Manager() dictionary is that is pickles (serializes) the object it returns, so it has a bottleneck given by the pickle library of maximum 2GiB size for the object to return. Is there any other way of doing this avoiding the serialization of the returning object? – hirschme Nov 13 '19 at 21:46
  • I was not able to use the multiprocessing Manager while stepping or running through it with a debugger. Therefore I resigned myself to using a multiprocessing Queue instead. – FormerAtariUser Mar 06 '20 at 18:20
  • 1
    this makes no sense. how is return_dict being handled outside __main__?? – Vaidøtas I. Jan 17 '21 at 13:35
  • I dont know why use queue, 'manager.dict()' looks like python, i like it. – haofeng Apr 17 '21 at 10:21
  • manager = multiprocessing.Manager() return_dict = manager.dict() return_dict.values() Just hangs for me with no outputs. Any suggestions? Also, my dict has pandas dataframes (parallel analysis of several datasets). Will I have to unpack this to a regular dict when running in parallel (vs when calling the function directly)? – illan May 31 '23 at 14:58
95

I think the approach suggested by @sega_sai is the better one. But it really needs a code example, so here goes:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Which will print the return values:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

If you are familiar with map (the Python 2 built-in) this should not be too challenging. Otherwise have a look at sega_Sai's link.

Note how little code is needed. (Also note how processes are re-used).

Hugo
  • 27,885
  • 8
  • 82
  • 98
Mark
  • 18,730
  • 7
  • 107
  • 130
  • 2
    Any ideas why my `getpid()` return all the same value? I'm running Python3 – zelusp Oct 29 '16 at 17:39
  • I'm not sure how Pool distributes tasks over workers. Maybe they can all end up at the same worker if they're really fast? Does it happen consistently? Also if you add a delay? – Mark Oct 31 '16 at 15:30
  • I also thought it was a speed related thing but when I feed `pool.map` a range of 1,000,000 using more than 10 processes I see at most two different pids. – zelusp Oct 31 '16 at 19:00
  • 1
    Then I'm not sure. I think it'd be interesting to open a separate question for this. – Mark Nov 01 '16 at 11:27
  • 1
    If the things you want to send a different function to each process, use `pool.apply_async`: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult – Kyle Jun 05 '19 at 20:28
  • if you're using this method, make sure the return value is `Pickle` compatible. Otherwise you'll get "unserializable" errors: https://medium.com/@jwnx/multiprocessing-serialization-in-python-with-pickle-9844f6fa1812 – mrdaliri Apr 20 '20 at 05:44
  • @mrdaliri You're right, and same for input. In general objects need to serialized in some way to communicate them to another process. – Mark Apr 20 '20 at 07:12
  • Out of scope. The question is about using Process (which is less easy than with Pool...) – Eric H. Aug 18 '21 at 12:22
  • @EricH. Alternative solutions are valid answer on stackoverflow. The fact that Pool is easier doesn't make it bad, rather it might be considered an advantage to many users. – Mark Aug 18 '21 at 19:36
  • AttributeError: Can't get attribute 'foo' on – CS QGB Feb 06 '22 at 03:59
  • I use IPython REPL :AttributeError: Can't get attribute 'foo' on – CS QGB Feb 06 '22 at 03:59
  • @CSQGB There's no 'foo' in the example code, so you're running something else. Can't help with that, and I'm afraid this comment section isn't the right place for this type of question, sorry. – Mark Feb 06 '22 at 10:23
  • This is great until your function takes more than one parameter – Frobot Nov 20 '22 at 01:30
  • @Frobot You can still use this solution with lambdas or functools.partial, but that's a separate question – Mark Nov 27 '22 at 10:50
65

For anyone else who is seeking how to get a value from a Process using Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

Note that in Windows or Jupyter Notebook, with multithreading you have to save this as a file and execute the file. If you do it in a command prompt you will see an error like this:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
Matthew Moisen
  • 16,701
  • 27
  • 128
  • 231
  • 2
    when i put something in a queue in my worker process my join is never reached. Any idea how this might come? – Laurens Koppenol Oct 06 '16 at 12:30
  • @LaurensKoppenol do you mean that your main code hangs at p.join() permanently and never continues? Does your process have an infinite loop? – Matthew Moisen Oct 06 '16 at 17:44
  • 7
    Yes, it hangs there infinitely. My workers all finish (loop within worker function ends, print statement afterwards is printed, for all workers). The join doesn't do anything. If i remove the `Queue` from my function it does let me pass the `join()` – Laurens Koppenol Oct 10 '16 at 08:11
  • @LaurensKoppenol Are you perhaps not calling `queue.put(ret)` prior to calling `p.start()` ? In that case, the worker thread will hang at `queue.get()` forever. You can replicate this by copying my snippet above while commenting out `queue.put(ret)`. – Matthew Moisen Aug 16 '17 at 02:47
  • I edited this answer, the `queue.get()` has to happen before the `p.join()`. It works now for me. – jfunk Nov 16 '17 at 19:24
  • The naming is a bit odd. The `ret` at the top should surely be called `send`, as you're sending it to the process. Although surely simpler just to pass in via the `Process` `args` parameter? Also, if you're only using `put` and `get`, use `SimpleQueue`. Although the code hung for me on Py 3.6. – Chris Dec 26 '19 at 09:25
  • @Chris This is a contrived example demonstrating how to get a value from a `process` using a `multiprocessing.queue`. – Matthew Moisen Dec 26 '19 at 16:48
  • @MatthewMoisen appreciate that and your answer was very useful thanks - I copied it into my own below. Just thought calling the outer variable `send` might make the code a tiny bit more readable, as it is being sent and `ret` shadows the inner variable `ret`, which is a different variable, and is actually returned. You could also add a variable to the `args=(queue,)` tuple rather than sending, unless you're moving values around during parallel execution. – Chris Dec 26 '19 at 17:02
  • It prints False for me actually – Bendemann Jul 28 '20 at 15:28
  • 1
    @Bendemann Someone edited the answer and made it incorrect by placing the `queue.get` before the queue.join. I've fixed it now by placing `queue.get` after `p.join`. Please try again. – Matthew Moisen Jul 28 '20 at 16:58
  • It won't work on Jupyter notebookes btw. Maybe you can also add that in your answer. – Bendemann Jul 28 '20 at 17:02
  • Actually I am getting the same problem as @LaurensKoppenol above. It is hanging at `p.join()` indefinitely. It returns from the callback but it does not execute `queue.get()` – Bendemann Jul 28 '20 at 18:30
  • @Bendemann which version of Python are you using? Can you add a print statement before each line in the `worker` function and confirm its output? – Matthew Moisen Jul 30 '20 at 18:05
  • 3.7.7, yes I already did that and the worker is doing what it is supposed to be doing. Afterwards, it is not executing `queue.get()` after the `p.join()`. So it is stuck in p.join() basically. – Bendemann Jul 30 '20 at 19:28
45

For some reason, I couldn't find a general example of how to do this with Queue anywhere (even Python's doc examples don't spawn multiple processes), so here's what I got working after like 10 tries:

from multiprocessing import Process, Queue

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue is a blocking, thread-safe queue that you can use to store the return values from the child processes. So you have to pass the queue to each process. Something less obvious here is that you have to get() from the queue before you join the Processes or else the queue fills up and blocks everything.

Update for those who are object-oriented (tested in Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
rayryeng
  • 102,964
  • 22
  • 184
  • 193
sudo
  • 5,604
  • 5
  • 40
  • 78
40

This example shows how to use a list of multiprocessing.Pipe instances to return strings from an arbitrary number of processes:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

This solution uses fewer resources than a multiprocessing.Queue which uses

  • a Pipe
  • at least one Lock
  • a buffer
  • a thread

or a multiprocessing.SimpleQueue which uses

  • a Pipe
  • at least one Lock

It is very instructive to look at the source for each of these types.

  • What would be the best way to do that without making the pipes a global variable? – Nickpick Oct 25 '16 at 13:15
  • I put all the global data and code into a main function and it works the same. Does that answer your question? –  Oct 25 '16 at 13:43
  • does the pipe always have to be read before any new value can be added (sent) to it? – Nickpick Oct 25 '16 at 14:56
  • +1, good answer. But about the solution being more efficient, the tradeoff is that you're making one `Pipe` per process vs one `Queue` for all processes. I don't know if that ends up being more efficient in all cases. – sudo Sep 21 '17 at 20:41
  • 5
    This answer causes a deadlock if the returning object is large. Instead of doing the proc.join() first I would first try to recv() the return value and then do the join. – L. Pes Feb 12 '20 at 20:13
  • @L.Pes You will have to back that statement up with an example that proves it. –  Feb 12 '20 at 23:27
  • 1
    I am with @L.Pes on this. Could be OS-specific, but I adapted this example to my use case and workers trying to send_end.send(result) for large result would hang indefinitely. Joining after receiving fixed it. Happy to provide an example if N=2 is too anecdotal for you. – Vlad Apr 22 '20 at 02:10
  • @Vlad I think you misunderstand me. I am more than happy to change the answer, but if this is a real problem, then you should be able to post code that allows others to duplicate it, diagnose it, and provide a modified answer that may be even better than merely adding a `recv`. –  Apr 22 '20 at 04:50
  • Pipes rock! They're easy to use and understand :) – Vaidøtas I. Mar 22 '21 at 19:42
18

It seems that you should use the multiprocessing.Pool class instead and use the methods .apply() .apply_async(), map()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

Divyanshu Srivastava
  • 1,379
  • 11
  • 24
sega_sai
  • 8,328
  • 1
  • 29
  • 38
16

You can use the exit built-in to set the exit code of a process. It can be obtained from the exitcode attribute of the process:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
  • 4
    Be warned that this approach could become confusing. Processes should generally exit with exit code 0 is they completed without error. If you have anything monitoring your system process exit codes then you may see these reported as errors. – ferrouswheel May 23 '17 at 21:50
  • 2
    Perfect if you just want to raise an exception in the parent process on error. – crizCraig Jul 19 '18 at 17:45
11

The pebble package has a nice abstraction leveraging multiprocessing.Pipe which makes this quite straightforward:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

Example from: https://pythonhosted.org/Pebble/#concurrent-decorators

erikreed
  • 1,447
  • 1
  • 16
  • 21
11

Thought I'd simplify the simplest examples copied from above, working for me on Py3.6. Simplest is multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

You can set the number of processes in the pool with, e.g., Pool(processes=5). However it defaults to CPU count, so leave it blank for CPU-bound tasks. (I/O-bound tasks often suit threads anyway, as the threads are mostly waiting so can share a CPU core.) Pool also applies chunking optimization.

(Note that the worker method cannot be nested within a method. I initially defined my worker method inside the method that makes the call to pool.map, to keep it all self-contained, but then the processes couldn't import it, and threw "AttributeError: Can't pickle local object outer_method..inner_method". More here. It can be inside a class.)

(Appreciate the original question specified printing 'represent!' rather than time.sleep(), but without it I thought some code was running concurrently when it wasn't.)


Py3's ProcessPoolExecutor is also two lines (.map returns a generator so you need the list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

With plain Processes:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

Use SimpleQueue if all you need is put and get. The first loop starts all the processes, before the second makes the blocking queue.get calls. I don't think there's any reason to call p.join() too.

Chris
  • 5,664
  • 6
  • 44
  • 55
2

If you are using Python 3, you can use concurrent.futures.ProcessPoolExecutor as a convenient abstraction:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Aleph Aleph
  • 5,215
  • 2
  • 13
  • 28
2

You can use ProcessPoolExecutor to get a return value from a function as shown below:

from concurrent.futures import ProcessPoolExecutor

def test(num1, num2):
    return num1 + num2

with ProcessPoolExecutor() as executor:
    feature = executor.submit(test, 2, 3)
    print(feature.result()) # 5
Super Kai - Kazuya Ito
  • 22,221
  • 10
  • 124
  • 129
1

A simple solution:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
razimbres
  • 4,715
  • 5
  • 23
  • 50
0

I modified vartec's answer a bit since I needed to get the error codes from the function. (Thanks vertec!!! its an awesome trick)

This can also be done with a manager.list but I think is better to have it in a dict and store a list within it. That way, way we keep the function and the results since we can't be sure of the order in which the list will be populated.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j
Louis Thibault
  • 20,240
  • 25
  • 83
  • 152
pelos
  • 1,744
  • 4
  • 24
  • 34