1

I'm trying to use multiprocessing to spawn 4 processes that brute force some calculations and each has a very small chance of manipulating a single list object on each iteration that I want to share between them. Not best practice in the guidelines but I need "many hands".

The code works fine for relatively small numbers of iterations but when increasing the number to a certain threshold, all four processes will go into zombie state. They fail silently.

I attempt to track the modifications to the shared list by using multiprocessing.Queue(). It appears from this SO post, this closed Python issue – "not a bug", and several posts referring to these, that the underlying pipe can become overloaded and the processes just hang. The accepted answer in the SO post is extremely difficult to decipher because of so much excess code.

Edited for clarity:
The examples in the documentation do very lightweight things, almost always single function calls. Therefore, I don't know whether I'm misunderstanding and abusing features.

The guidelines say:

It is probably best to stick to using queues or pipes for communication between processes rather than using the lower level synchronization primitives from the threading module.

Does "communicate" here mean something other than what I'm actually doing in my example?
Or
Does this mean that I should be sharing my_list in the queue rather than with a manager? Wouldn't this mean queue.get and queue.put on every iteration of every process?

If maxsize is less than or equal to zero, the queue size is infinite.

Doing this does not fix the error in my failing example. Until the point that I do queue.put() all of the data is stored within a normal Python list: my_return_list so is this actually failing due to the links I provided?

Is there a better way of doing this compared to my current workaround? I can't seem to find others taking an approach that looks similar, I feel I'm missing something. I need this to work for both Windows and Linux.

Failing example (depending on iterations under __main__):

import multiprocessing as mp
import random
import sys

def mutate_list(my_list, proc_num, iterations, queue, lock):
    my_return_list = []

    if iterations < 1001:
        # Works fine
        for x in xrange(iterations):
            if random.random() < 0.01:
                lock.acquire()

                print "Process {} changed list from:".format(proc_num)
                print my_list
                print "to"
                random.shuffle(my_list)
                print my_list
                print "........"
                sys.stdout.flush()

                lock.release()

                my_return_list.append([x, list(my_list)])
    else:
        for x in xrange(iterations):
            # Enters zombie state
            if random.random() < 0.01:
                lock.acquire()
                random.shuffle(my_list)
                my_return_list.append([x, list(my_list)])
                lock.release()
            if x % 1000 == 0:
                print "Completed iterations:", x
                sys.stdout.flush()

    queue.put(my_return_list)


def multi_proc_handler(iterations):

    manager = mp.Manager()
    ns = manager.list()
    ns.extend([x for x in range(10)])
    queue = mp.Queue()
    lock = manager.Lock()

    print "Starting list to share", ns
    print ns
    sys.stdout.flush()

    p = [mp.Process(target=mutate_list, args=(ns,x,iterations,queue,lock)) for x in range(4)]

    for process in p: process.start()
    for process in p: process.join()

    output = [queue.get() for process in p]
    return output

if __name__ == '__main__':
    # 1000 iterations is fine, 100000 iterations will create zombies
    multi_caller = multi_proc_handler(100000) 

Workaround using multiprocessing.Manager.list():

import multiprocessing as mp
import random
import sys

def mutate_list(my_list, proc_num, iterations, my_final_list, lock):

    for x in xrange(iterations):
        if random.random() < 0.01:
            lock.acquire()
            random.shuffle(my_list)
            my_final_list.append([x, list(my_list)])
            lock.release()
        if x % 10000 == 0:
            print "Completed iterations:", x
            sys.stdout.flush()


def multi_proc_handler(iterations):

    manager = mp.Manager()
    ns = manager.list([x for x in range(10)])
    lock = manager.Lock()

    my_final_list = manager.list() # My Queue substitute

    print "Starting list to share", ns
    print ns
    sys.stdout.flush()

    p = [mp.Process(target=mutate_list, args=(ns,x,iterations,my_final_list,
        lock)) for x in range(4)]

    for process in p: process.start()
    for process in p: process.join()

    return list(my_final_list)

if __name__ == '__main__':

    multi_caller = multi_proc_handler(100000) 
Community
  • 1
  • 1
roganjosh
  • 12,594
  • 4
  • 29
  • 46
  • I think the real difference is that you `put` a large amount of data into the `Queue` in the first version and you incrementally `append` the data into the `list` in the second version. Both versions are sending and receiving data between two processes. So the first version is apparently trying to send too much data. –  Aug 16 '16 at 21:06
  • @DavidCullen but I can make the queue's storage infinite (according to documentation) so it should surely work like `manager.list()` or `manager.dict()`? Nothing is added as-I-go so to speak. – roganjosh Aug 16 '16 at 21:17
  • @rojanjosh: It's not the size of the `Queue` that is the problem, but the size of the buffer of the underlying IPC mechanism. If you carefully read [this bug report](http://bugs.python.org/issue8237), it is possible to fill the buffer of the underlying pipe, which then causes the `put` call to block. –  Aug 16 '16 at 21:24
  • @DavidCullen you've just linked to a similar problem that I did in my question after "this SO post". I cannot find a way online around this. If you have a way of applying this to my problem, I will up-vote at the least. – roganjosh Aug 16 '16 at 21:32
  • @DavidCullen there's a bit of a black hole I think for people trying to use multiprocessing from an intermediate stance. I gave a link to a more recent bug report in my question, but it's not clear to me how it gels with what I want to do. – roganjosh Aug 16 '16 at 21:46
  • @rogandjosh: Does your workaround work? If so, it is an acceptable solution. Just because the documentation recommends avoiding the transfer of large amounts of data between processes does not mean that your code is wrong. A recommendation is not a law. Working code trumps everything. –  Aug 16 '16 at 21:46
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/121107/discussion-between-david-cullen-and-roganjosh). –  Aug 16 '16 at 21:50
  • @DavidCullen it works, but I don't know why. This raises two things: a) a skipped iteration in the list the processes share is nothing, but what if it takes 5 iterations on average? b) is all the CPU work I see firing up actually productive or me designing something convoluted that it has to fight? – roganjosh Aug 16 '16 at 21:51
  • It works because you are pushing smaller amounts of data through a socket (on everything but Windows) or through a named pipe (on Windows). Something on the other end of the socket is reading the data as you write it, so the socket buffer (on everything but Windows) or the pipe buffer (on Windows) never gets full. –  Aug 16 '16 at 21:55
  • @DavidCullen can you formulate this into an answer? I'm still trying to learn what's going on and the Stack Overflow post I linked has left me lost. Is my overall approach to the list for manipulation correct, or is there a better way? – roganjosh Aug 16 '16 at 22:09

1 Answers1

1

Queue versus list

Underneath the hood, a multiprocessing.Queue and a manager.list() are both writing to and reading from a buffer.

Queue

shared_queue = multiprocessing.Queue()

When you call put with N or more bytes (where N is dependent on a lot of variables), it is more than the buffer can handle and the put blocks. You might be able to get the put to unblock by calling get in another process. This is an experiment that should be easy to perform using the first version of your code. I highly recommend that you try this experiment.

list

manager = multiprocessing.Manager()
shared_list = manager.list()

When you call append, you are passing much less than N bytes, and the write to the buffer succeeds. There is another process that reads the data from the buffer and appends it to the actual list. This process is created by the manager. Even if you call append with N or more bytes, everything should keep working, because there is another process reading from the buffer. You can pass an arbitrary number of bytes to another process this way.

Summary

Hopefully, this clarifies why your "workaround" works. You are breaking up the writes to the buffer into smaller pieces and you have a helper process that is reading from the buffer to put the pieces into the managed list.

  • Ok, this makes a lot of sense. "I highly recommend that you try this experiment." So, if I understand correctly, this means all 4 processes doing a `get` and `put` on the queue on every iteration to get the most up-to-date list (or, a `get` on every iteration, a `put` when things change)? All the examples say that queues are the way to share info but then in examples it seems like it's just a mechanism to pull everything together at the end. The pickle/unpickle cycle made me think of additional overhead doing this constantly. – roganjosh Aug 17 '16 at 07:22