8

How can I script a Python multiprocess that uses two Queues as these ones?:

  1. one as a working queue that starts with some data and that, depending on conditions of the functions to be parallelized, receives further tasks on the fly,
  2. another that gathers results and is used to write down the result after processing finishes.

I basically need to put some more tasks in the working queue depending on what I found in its initial items. The example I post below is silly (I could transform the item as I like and put it directly in the output Queue), but its mechanics are clear and reflect part of the concept I need to develop.

Hereby my attempt:

import multiprocessing as mp

def worker(working_queue, output_queue):
    item = working_queue.get() #I take an item from the working queue
    if item % 2 == 0:
        output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
    else:
        working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue 

if __name__ == '__main__':
    static_input = range(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    for result in iter(output_q.get, None):
        print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.

This does not end nor print any result.

At the end of the whole process I would like to ensure that the working queue is empty, and that all the parallel functions have finished writing to the output queue before the later is iterated to take out the results. Do you have suggestions on how to make it work?

Jaqo
  • 329
  • 1
  • 5
  • 15

2 Answers2

5

The following code achieves the expected results. It follows the suggestions made by @tawmas.

This code allows to use multiple cores in a process that requires that the queue which feeds data to the workers can be updated by them during the processing:

import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() == True:
            break #this is the so-called 'poison pill'    
        else:
            picked = working_queue.get()
            if picked % 2 == 0: 
                    output_queue.put(picked)
            else:
                working_queue.put(picked+1)
    return

if __name__ == '__main__':
    static_input = xrange(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    results_bank = []
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
       if output_q.empty() == True:
           break
       results_bank.append(output_q.get_nowait())
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
    results_bank.sort()
    print results_bank
Jaqo
  • 329
  • 1
  • 5
  • 15
  • Your print loop gets stuck waiting forever once the result queue is empty. You should use get_nowait and explicitly catch the Empty exception to quit cleanly. – tawmas Feb 06 '14 at 22:45
  • Thanks again for your help. I am doing a `try: \n print result \n except Empty: \n break \n` This is printing the total results expected, but the console output is still complaining about the exception. I think I am not properly handling it. – Jaqo Feb 07 '14 at 11:29
  • You need to get from the queue inside your try. – tawmas Feb 07 '14 at 12:50
  • Dear @tawmas your help has been extremely useful. Thanks again. I could not manage to specify the exception. Instead, I used a while True loop, and checked when the queue was empty. This should be reliable as only one cpu-core does such task. I edited the code in the answer above with the solution you helped to find. – Jaqo Feb 07 '14 at 13:45
3

You have a typo in the line that creates the processes. It should be mp.Process, not mp.process. This is what is causing the exception you get.

Also, you are not looping in your workers, so they actually only consume a single item each from the queue and then exit. Without knowing more about the required logic, it's not easy to give specific advice, but you will probably want to enclose the body of your worker function inside a while True loop and add a condition in the body to exit when the work is done.

Please note that, if you do not add a condition to explicitly exit from the loop, your workers will simply stall forever when the queue is empty. You might consider using the so-called poison pill technique to signal the workers they may exit. You will find an example and some useful discussion in the PyMOTW article on Communication Between processes.

As for the number of processes to use, you will need to benchmark a bit to find what works for you, but, in general, one process per core is a good starting point when your workload is CPU bound. If your workload is IO bound, you might have better results with a higher number of workers.

tawmas
  • 7,443
  • 3
  • 25
  • 24
  • You're welcome! Please not that while you where doing that I edited my reply to start addressing the remainder of your question. – tawmas Feb 06 '14 at 17:57
  • I just read the remaining part of your answer. I will try to apply a while True loop. I wonder if the process finishes if there are no more items in the queue to work on. I would like to use something like a Queue length, but the documentation states this is not reliable. – Jaqo Feb 06 '14 at 18:04
  • Some more details for you! – tawmas Feb 06 '14 at 18:14
  • I am trying to place the so-called 'poison pill' at the end of the initial tasks of a JoinableQueue. But as this Queue is getting more tasks along the multiprocesing, the pill gets before some tasks that must be processed. Should I better check for a 'lock', even if this can affect performance? – Jaqo Feb 06 '14 at 18:58
  • You seem to be adding more work from the workers themselves. If that is the case, it is the worker who should post one poison pill when it finds that it has no more real work to post. This way, your workers will die one by one. – tawmas Feb 06 '14 at 19:04
  • 1
    Indeed, my workers are queuing more tasks depending on what they find along their work. I tried to implement your suggestion about making each worker post the poison pill when no more work is available. I just used the unreliable .empty() method; on the worst case scenario I would get a single core working at the end on the tasks that were not realized as pending by the rest of workers. I got my results printed, but the "In" line in the console does not appear anymore. Do you know why? I updated the code in the following anser and set yours as the right answer. Thanks so much for your help. – Jaqo Feb 06 '14 at 20:23
  • Some code to copy and paste would be nice, along with the explanations. – Walrus the Cat Nov 20 '14 at 00:42