13

I am brand new to the multiprocessing package in python and my confusion will probably be easy for someone who knows more to clear up. I've been reading about concurrency and have searched for other questions like this and have found nothing. (FYI I do NOT want to use multithreading because the GIL will slow down my application a lot.)

I am thinking in the framework of events. I want to have multiple processes running, waiting for an event to happen. If the event happens, it gets assigned to a particular process, which operates and then returns to its idle state. There may be a better way to do this, but my reasoning is that I should spawn all the processes once and keep them open indefinitely, rather than creating then closing a process every time an event happens. Speed is an issue for me and my events can occur many thousands of times per second.

I came up with the following toy example, which is meant to send even numbers to one process, and odd numbers to another. Both processes are the same, they just append the number to a list.

from multiprocessing import Process, Queue, Pipe

slist=['even','odd']

Q={}
Q['even'] = Queue()
Q['odd'] = Queue()

ev,od = [],[]

Q['even'].put(ev)
Q['odd'].put(od)

P={}
P['even'] = Pipe()
P['odd'] = Pipe()



def add_num(s):
    """ The worker function, invoked in a process. The results are placed in
        a list that's pushed to a queue."""
#    while True :
    if not P[s][1].recv():
        print s,'- do nothing'

    else:            
        d = Q[s].get()
        print d
        d.append(P[s][1].recv())
        Q[s].put(d)
        print Q[s].get()
        P[s][0].send(False)
        print 'ya'




def piper(s,n):

    P[s][0].send(n)    
    for k in [S for S in slist if S != s]:
        P[k][0].send(False) 
    add_num(s)


procs = [ Process (
                   target=add_num,
                   args=(i,)
                   ) 
         for i in ['even','odd']]

for s in slist: 
    P[s][0].send(False)

for p in procs:
    p.start()  
    p.join()

for i in range(10):
    print i
    if i%2==0:
        s = 'even'
    else:
        s = 'odd'
    piper(s,i)


print 'results:', Q['odd'].get(),Q['even'].get()

This code produces the following:

even - do nothing

Any insight from the wise into this problem, where my code or reasoning falls short etc. would be greatly appreciated.

Wapiti
  • 1,851
  • 2
  • 20
  • 40
  • 2
    if you want a single element tuple, you need a comma at the end, like this `args=(i,)` – 1.618 Apr 10 '15 at 22:42
  • Thank you. That indeed eliminated my error. But the code is still not working for some reason. I have edited the question above to reflect the new status and let it remain open while I continue to work on it. – Wapiti Apr 11 '15 at 00:47
  • 1
    That `print d` statement in the `add_num` function should throw an exception. – Himal Apr 11 '15 at 01:42
  • True. I've fixed that and updated again. I need to stop working on this now, and probably for the best as it is completely mystifying me. – Wapiti Apr 11 '15 at 02:33
  • 1
    @Wapiti Can you explain what you're actually trying to do here? The use of both a `Queue` and a `Pipe` for each is confusing. You're also calling `queue.get`, `queue.put` and then `queue.get` again, all inside the worker function. Why is that? The first `get` call will always cause a deadlock if you ever hit it, because you never `put` anything into the queue from the parent. Also, you're loading `False` into both Pipes, launching the children, which see the `False` when they call `if not P[s][1].recv():` and then immediately exit. Why do that? What's the expected behavior here? – dano Apr 11 '15 at 16:02
  • Thanks for your question. I'm trying to have a central process that delivers data (in this case just numbers but it generalizes). I want there to be two processes (generalizes to n) which do nothing unless data gets sent to them. The way I thought to do this was with pipes. There are two pipes which connect the main process with the two processes. The job of these processes is also generalizable, but in this case it is to append the number to a list. I use a queue here because I want the object operated on by the process to be accessible from the main process as well. I know it's wrong... – Wapiti Apr 11 '15 at 18:31
  • Your goal can probably be easily accomplished using multiprocessing.Pool – 1.618 Apr 11 '15 at 18:56
  • I thought `Pool` did not allow you to differentiate between processes, so one job is split between the number of procs. Can I explicitly send the data to the appropriate process? Or perhaps I can make the processes generic and pass the target object – Wapiti Apr 11 '15 at 19:28

1 Answers1

19

Here is an approach I've used a couple of times with good success:

  1. Launch a multiprocessing pool.

  2. Use a multiprocessing SyncManager to create multiple queues (one for each type of data that needs to be handled differently).

  3. Use apply_async to launch the functions that process data. Just like the queues, there should be one function for each type of data that needs to be processed differently. Each function launched gets the queue that corresponds to its data as an input argument. The functions will do their work in an infinite loop that starts by getting data from the queue.

  4. Begin processing. During processing, the main process sorts the data and decides which function should be handling it. Once the decision is made, the data is placed on the queue that corresponds to that function.

  5. After all data has been handled, the main process puts a value called a "poison pill" into each queue. The poison pill is a value that the worker processes all recognize as a signal to exit. Since the queues are first-in first-out (FIFO), then they are guaranteed to pull the poison pill as the last item in the queues.

  6. Close and join the multiprocessing pool.

Code

Below is an example of this algorithm. The example code's goal is to use the algorithm previously described to divide odd numbers by 2, and even numbers by -2. All results are placed in a shared list accessible by the main process.

import multiprocessing

POISON_PILL = "STOP"

def process_odds(in_queue, shared_list):

    while True:

        # block until something is placed on the queue
        new_value = in_queue.get() 

        # check to see if we just got the poison pill
        if new_value == POISON_PILL:
            break

        # we didn't, so do the processing and put the result in the
        # shared data structure
        shared_list.append(new_value/2)

    return

def process_evens(in_queue, shared_list):

    while True:    
        new_value = in_queue.get() 
        if new_value == POISON_PILL:
            break

        shared_list.append(new_value/-2)

    return

def main():

    # create a manager - it lets us share native Python object types like
    # lists and dictionaries without worrying about synchronization - 
    # the manager will take care of it
    manager = multiprocessing.Manager()

    # now using the manager, create our shared data structures
    odd_queue = manager.Queue()
    even_queue = manager.Queue()
    shared_list = manager.list()

    # lastly, create our pool of workers - this spawns the processes, 
    # but they don't start actually doing anything yet
    pool = multiprocessing.Pool()

    # now we'll assign two functions to the pool for them to run - 
    # one to handle even numbers, one to handle odd numbers
    odd_result = pool.apply_async(process_odds, (odd_queue, shared_list))
    even_result = pool.apply_async(process_evens, (even_queue, shared_list))
    # this code doesn't do anything with the odd_result and even_result
    # variables, but you have the flexibility to check exit codes
    # and other such things if you want - see docs for AsyncResult objects

    # now that the processes are running and waiting for their queues
    # to have something, lets give them some work to do by iterating
    # over our data, deciding who should process it, and putting it in
    # their queue
    for i in range(6):

        if (i % 2) == 0: # use mod operator to see if "i" is even
            even_queue.put(i)

        else:
            odd_queue.put(i)

    # now we've finished giving the processes their work, so send the 
    # poison pill to tell them to exit
    even_queue.put(POISON_PILL)
    odd_queue.put(POISON_PILL)

    # wait for them to exit
    pool.close()
    pool.join()

    # now we can check the results
    print(shared_list)

    # ...and exit!
    return


if __name__ == "__main__":
    main()

Output

This code produces this output:

[0.5, -0.0, 1.5, -1.0, 2.5, -2.0]

Notice that the order of the results is unpredictable, because we can't guarantee in what order the functions will be able to get items from their queues and put the results into the list. But you can certainly to whatever post-processing you need, which could include sorting.

Rationale

I think this would be a good solution to your issue because:

  1. You're correct that there is huge overhead to spawning processes. This single-producer/multiple-consumer approach eliminates that when you use a pool to keep workers alive for the entire duration of the program.

  2. It addresses your concerns about being able to handle data differently depending on attributes of the data. In your comments, you expressed concerns about being able to send data to specific processes. In this approach, you can choose which processes to give data to, because you have to choose which queue to put it on. (By the way, I think you're thinking of the pool.map function, which, as you correctly believe, doesn't allow you to perform different operations in the same job. apply_async does.)

  3. I've found it to be very expandable and flexible. Need to add more types of data handling? Just write your handler function, add one more queue, and add to logic to main to route the data to your new function. Are you finding that one queue is getting backed up and becoming a bottleneck? You can call apply_async with the same target function and queue multiple times to get multiple workers working on the same queue. Just make sure you give the queue enough poison pills so that all of the workers get one.

Limitations

Any data you want to pass on a queue must be picklable (serializable) by the pickle module. Look here to see what can and can't be pickled.

There are probably other limitations as well, but I can't think of any others off of the top of my head.

skrrgwasme
  • 9,358
  • 11
  • 54
  • 84
  • This is a great answer and very helpful. I've made some progress since posting but not solved the problem totally. I have a few questions: what if the data never stops, assuming the processing is fast enough for the queues not to get backed up, could this continuity pose a problem? Also, if I wanted to alter functionality later to make each functions input depend on another's output, would it be as simple as setting up the queues in the right structure? Would the asynchronous nature of the processes create any problems? – Wapiti Apr 26 '15 at 02:43
  • An "infinitely" running script isn't necessarily a problem by itself. Just make sure your infinite loops manage their variables properly so things go out of scope and can be garbage collected when you're done with them, otherwise you'll eventually run out of memory. Having data get passed from one function to the next shouldn't be an issue either; I've had data that needed several stages of processing, so I tied workers in a pool together with queues like a pipeline. It can get complicated, but it's not an issue as long as it's hooked up properly. – skrrgwasme Apr 26 '15 at 03:38
  • 1
    I appreciate this answer a lot. Thanks for the detail and reply to my questions. Having no experience with the package, it's taken a bit of time to get my head around the architecture. Your provided in exactly what I was looking for. Just a sound voice of experience. Thanks much. – Wapiti Apr 26 '15 at 04:19