1

I'm having a few issues crop up when using processes and queues.

When I run the following code the target function simply gets an item from a master queue and adds it to another queue specific to that process.

import sys
import multiprocessing
from Queue import Empty

# This is just taking a number from the queue
# and adding it to another queue
def my_callable(from_queue, to_queue):
    while True:
        try:
            tmp = from_queue.get(0)
            to_queue.put(tmp)
            print to_queue
        except Empty:
            break

# Create a master queue and fill it with numbers
main_queue = multiprocessing.Queue()
for i in xrange(100):
    main_queue.put(i)

all_queues = []
processes = []
# Create processes
for i in xrange(5):
    # Each process gets a queue that it will put numbers into
    queue = multiprocessing.Queue()
    # Keep up with the queue we are creating so we can get it later
    all_queues.append(queue)
    # Pass in our master queue and the queue we are transferring data to
    process = multiprocessing.Process(target=my_callable,
                                      args=(main_queue, queue))
    # Keep up with the processes
    processes.append(process)

for thread in processes:
    thread.start()

for thread in processes:
    thread.join()

When the target function prints the queue being used, you'll notice that one queue is used almost exclusively.

If you then take the output and print it, you'll see that most of the numbers end up under a single queue.

def queue_get_all(q):
   items = []
   maxItemsToRetreive = 100
   for numOfItemsRetrieved in range(0, maxItemsToRetreive):
       try:
           if numOfItemsRetrieved == maxItemsToRetreive:
               break
           items.append(q.get_nowait())
       except Empty, e:
           break
   return items

for tmp in all_queues:
    print queue_get_all(tmp)

What is causing this? Is there something in my code I should be doing that will even out the work these processes are doing?

OUTPUT

[0, 2, 3, 4, 5, 6, 7, 8]
[1, 9, 10]
[11, 14, 15, 16]
[12, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
[13]

1 Answers1

2

I think you have two problems here:

def my_callable(from_queue, to_queue):
    while True:
        try:
            tmp = from_queue.get(0)
            to_queue.put(tmp)
            print to_queue
        except Empty:
            break

From the docs for get:

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the Queue.Empty exception (timeout is ignored in that case).

Since you are passing 0 as the first parameter, it's equivalent to get(False). This makes it non-blocking, which means if it can't get a value out immediately it will raise an Empty exception, which will end your worker process. Since all your 'work' functions are identical and try to pull from the main queue at the same time, some might not be able to get a value right away and will die.

Giving the .get() a small timeout should fix this problem.

The second problem is that your 'work' function takes basically zero time to complete. Give it a little pause with sleep(.2) to simulate some non-trival work and it will distribute across the workers:

def my_callable(from_queue, to_queue):
    while True:
        try:
            tmp = from_queue.get(True, .1)
            sleep(0.2)
            to_queue.put(tmp)
        except Empty:
            break

EDIT:

I forgot to say, generally it is better for this type of problem to not rely on the timeout of .get() to signal the end of the queue. You get more control if you use some type of "end of queue" marker object that you pass into the queue that tells the workers it is time to quit. This way you can have them all block, waiting for either new input or an exit "command".

bj0
  • 7,893
  • 5
  • 38
  • 49
  • Ok. So this indeed evens out the work. So setting a timeout of 1 and a sleep of .01 -- So any idea what kind of object should be at the end? Maybe a specific string? – user3123576 Apr 16 '15 at 18:07