7

I'm using the multiprocessing module to split up a very large task. It works for the most part, but I must be missing something obvious with my design, because this way it's very hard for me to effectively tell when all of the data has been processed.

I have two separate tasks that run; one that feeds the other. I guess this is a producer/consumer problem. I use a shared Queue between all processes, where the producers fill up the queue, and the consumers read from the queue and do the processing. The problem is that there is a finite amount of data, so at some point everyone needs to know that all of the data has been processed so the system can shut down gracefully.

It would seem to make sense to use the map_async() function, but since the producers are filling up the queue, I don't know all of the items up front, so I have to go into a while loop and use apply_async() and try to detect when everything is done with some sort of timeout...ugly.

I feel like I'm missing something obvious. How can this be better designed?

PRODCUER

class ProducerProcess(multiprocessing.Process):
    def __init__(self, item, consumer_queue):
        self.item = item
        self.consumer_queue = consumer_queue
        multiprocessing.Process.__init__(self)

    def run(self):
        for record in get_records_for_item(self.item): # this takes time
            self.consumer_queue.put(record)

def start_producer_processes(producer_queue, consumer_queue, max_running):
    running = []

    while not producer_queue.empty():
        running = [r for r in running if r.is_alive()]
        if len(running) < max_running:
            producer_item = producer_queue.get()
            p = ProducerProcess(producer_item, consumer_queue)
            p.start()
            running.append(p)
        time.sleep(1)

CONSUMER

def process_consumer_chunk(queue, chunksize=10000):
    for i in xrange(0, chunksize):
        try:
            # don't wait too long for an item
            # if new records don't arrive in 10 seconds, process what you have
            # and let the next process pick up more items.

            record = queue.get(True, 10)
        except Queue.Empty:                
            break

        do_stuff_with_record(record)

MAIN

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    consumer_queue = manager.Queue(1024*1024)
    producer_queue = manager.Queue()

    producer_items = xrange(0,10)

    for item in producer_items:
        producer_queue.put(item)

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
    p.start()

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)

Here is where it gets cheesy. I can't use map, because the list to consume is being filled up at the same time. So I have to go into a while loop and try to detect a timeout. The consumer_queue can become empty while the producers are still trying to fill it up, so I can't just detect an empty queue an quit on that.

    timed_out = False
    timeout= 1800
    while 1:
        try:
            result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
            if timed_out:
                timed_out = False

        except Queue.Empty:
            if timed_out:
                break

            timed_out = True
            time.sleep(timeout)
        time.sleep(1)

    consumer_queue.join()
    consumer_pool.close()
    consumer_pool.join()

I thought that maybe I could get() the records in the main thread and pass those into the consumer instead of passing the queue in, but I think I end up with the same problem that way. I still have to run a while loop and use apply_async() Thank you in advance for any advice!

Sean DiZazzo
  • 679
  • 8
  • 13

2 Answers2

2

You could use a manager.Event to signal the end of the work. This event can be shared between all of your processes and then when you signal it from your main process the other workers can then gracefully shutdown.

while not event.is_set():
 ...rest of code...

So, your consumers would wait for the event to be set and handle the cleanup once it is set.

To determine when to set this flag you can do a join on the producer threads and when those are all complete you can then join on the consumer threads.

sean
  • 3,955
  • 21
  • 28
  • I think this will work. Thank you! I'm not sure how the join() works from your description, but I think I found a way. I pass the event into the start_producer_process() process, and set() it after all of the producers have finished adding to the consumer_queue. At that point (back in the main thread) if the consumer_queue goes empty, it means that everything has been processed so I can break out of the while loop safely. – Sean DiZazzo Dec 20 '12 at 21:17
  • Sorry for the confusing part, the join would be in the main thread so that you would not just exit the program after your producers were finished and the consumers had just started to do their work. – sean Dec 20 '12 at 22:58
-1

I would like to strongly recommend SimPy instead of multiprocess/threading to do discrete event simulation.

Timothy
  • 4,467
  • 5
  • 28
  • 51