5

I am trying to update one common dictionary through multiple processes. Could you please help me find out what is the problem with this code? I get the following output:

inside function
{1: 1, 2: -1}
comes here
inside function
{1: 0, 2: 2}
comes here
{1: 0, 2: -1}

Thanks.

from multiprocessing import Lock, Process, Manager

l= Lock()


def computeCopyNum(test,val):
    l.acquire()
    test[val]=val
    print "inside function"
    print test
    l.release()
    return

a=dict({1: 0, 2: -1})

procs=list()

for i in range(1,3):
    p = Process(target=computeCopyNum, args=(a,i))
    procs.append(p)
    p.start()

for p in procs:
p.join()
    print "comes here"

print a
user1050325
  • 1,252
  • 2
  • 10
  • 14

3 Answers3

10

The answer is actually quite simple. You're using the multiprocessing module, with which you start several different python processes. Different processes have different address spaces and they do not share memory, so all your processes write to their own local copy of the dictionary.

The easiest way to do inter-process communication when using the multiprocessing module is to use a queue to communicate between the slave processes and the master process.

from multiprocessing import Process, Queue

def computeCopyNum(queue, val):
    queue.put(val) # can also put a tuple of thread-id and value if we would like to

procs=list()

queue = Queue()
for i in range(1,3):
    p = Process(target=computeCopyNum, args=(queue, i))
    procs.append(p)
    p.start()

for _ in procs:
    val = queue.get()
    # do whatever with val

for p in procs:
    p.join()

If each slave-process can generate multiple output values it might be prudent to let each slave-process write a sentinel-value to the queue to signal to the master that it's done. Then the code might look something like:

def slave(queue):
    for i in range(128): # just for example
        val = #some calculated result
        queue.put(val)

    queue.put(None) # add a sentinel value to tell the master we're done

queue = Queue()

# spawn 32 slave processes
num_procs = 32
procs = [Process(target=slave, args=(queue, )) for _ in range(num_procs)]
for proc in procs: 
    proc.start()

finished = 0
while finished < num_procs:
    item = queue.get()
    if item is None: 
        finished += 1
    else: 
        # do something with item

for proc in procs: 
    proc.join()

You can also use a Manager, as shown in another answer. The problem with that approach is that a lot of implicit memory copying between process address spaces might occur, and that can be hard to reason about. I always prefer using explicit queues.

dnaq
  • 2,104
  • 1
  • 14
  • 24
  • +1, good point about memory copying. Still, for pedagogical reasons, I think `Manager` is a better starting point for beginners. – senderle Sep 21 '12 at 22:05
  • When I use the Queue, I get the following error:Break on __THE_PROCESS_HAS_FORKED_AND_YOU_CANNOT_USE_THIS_COREFOUNDATION_FUNCTIONALITY___YOU_MUST_EXEC__() to debug. The process has forked and you cannot use this CoreFoundation functionality safely. You MUST exec(). – user1050325 Sep 21 '12 at 23:30
  • Could you post your code? I verified that the code i posted worked, if you just copy the first sample verbatim it should work. – dnaq Sep 22 '12 at 09:50
  • I don't know how to post the code in comment section, the formatting isn't showing up... But I am putting a hashtable each time in the queue instead of a single value. Is that what is causing the problem? – user1050325 Sep 25 '12 at 20:48
  • better use multiprocessing.JoinableQueue – Bob Mar 04 '17 at 12:50
6

You import Manager, but you don't do anything with it. As a first approach, do this instead:

a = Manager().dict({1: 0, 2: -1})

Global variables won't work the way you expect when using multiprocessing. Subprocess only have access to a copy, and the changes they make are forgotten when they exit, unless you're using a specially designed object capable of propagating information between processes.

There are a number of different alternatives for passing data between processes, but using a Manager object as above is usually the simplest. You can also use the Manager object to create multiple shared objects:

manager = Manager()
a = manager.dict({1: 0, 2: -1})
b = manager.list((1, 2, 3))

See the Manager docs for more.

Also, the lock you use is unnecessary. The Manager takes care of that for you. As the docs say,

Generally synchronization primitives are not as necessary in a multiprocess program as they are in a multithreaded program.

senderle
  • 145,869
  • 36
  • 209
  • 233
  • @user1050325, you said you got an error using a `Queue` -- did you try the `Manager` approach? For basic inter-process communication, `Managers` are totally fine. Let me know if it works for you. – senderle Sep 22 '12 at 03:54
  • Amazing and simple solution. – Ronak Jain Jul 21 '16 at 10:54
0

Processes don't share memory the way that threads do. Each process ends up with it's own independent copy of a. If you want to do work in different threads, you need to use a pipe or some other interprocess communication to get the data back to a central process.

BostonJohn
  • 2,631
  • 2
  • 26
  • 48