3

Below is code slightly simplified from an example in hte Python docs section 16.6 which shows how to put and get objects from an MP queue.

What if my program puts -- for example -- some big number of objects in the mp queue, and after Y have been gotten out of the result queue, application logic decides that it really doesn't need to process the rest of the objects.

So, how to I delete/purge/flush/clear the contents of the queue? The same question appears to have been asked here, with the answer " but I can't believe that there's no way to do that.

How to clear a multiprocessing queue in python

Thanks

import time, import random

from multiprocessing import Process, Queue, current_process

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

def calculate(func, args):
    result = func(*args)
    return current_process().name, func.__name__, args, result

def plus(a, b):
    time.sleep(5*random.random())
    return a, b, a + b

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS2 = [(plus, (i, 8)) for i in range(10000)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)
        print task_queue.qsize()

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)+len(TASKS2)):
        s = done_queue.get()
        print s[0], s[3][2]

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')

if __name__ == '__main__':
    test()
Community
  • 1
  • 1
RonJohn
  • 349
  • 8
  • 20

1 Answers1

2

In the question you reference, How to clear a multiprocessing queue in python, there's an answer that demonstrates how to clear a Queue. Not the "accepted" answer, but the one with the majority of the up-votes…

If your jobs are light-weight, this question is a duplicate, and the referenced solution should work for your case too… as queue.get() clears the Queue.

If your jobs take a while (as yours seem to, due to the sleep), then it's easier to delete the Queue, which would mean refactoring your code so a new empty Queue can replace an existing Queue that is to be deleted. To maintain pointer integrity, you can keep all your Queue objects in a list queues, and then just replace the Queue at the appropriate index.

Community
  • 1
  • 1
Mike McKerns
  • 33,715
  • 8
  • 119
  • 139
  • What's the essential difference between what I currently have `for i in range(big_num): s = done_queue.get()` and `while not done_queue.empty()`? They both get() in a loop... – RonJohn Jan 19 '15 at 21:08
  • 1
    The "empty" is doing a `queue.get()` and then throwing away the remaining results by not assigning it to a variable. You can keep getting results as you need them by calling `queue.get()` and saving the result each time… then when you want to empty the `Queue`, iterate over `queue.get()` and don't assign to a variable… and you can stop when an `Empty` exception is thrown (i.e. catch the exception). – Mike McKerns Jan 19 '15 at 22:38
  • 1
    But all the work inside the "action" function (in this example it's `plus()`, but in my code, it can take up to 30 seconds to complete). So... it's not the results that I care about, it's the hours of processing that's still going to happen when running all of those `queue.get()` statements. – RonJohn Jan 19 '15 at 22:52
  • 1
    Totally understood. Then it's easier to delete the `Queue`, which would mean refactoring code so a new empty `Queue` can replace the existing `Queue` that is deleted. To maintain pointer integrity, you can keep all your `Queue` objects in a list `queues`, and then just replace the `Queue` at the appropriate index. – Mike McKerns Jan 19 '15 at 23:14
  • Thanks. It ain't perfect, but it's better that setting event flags and having the top of the worker function check the event status at the top of it's code... – RonJohn Jan 19 '15 at 23:20
  • @MikeMcKerns Actually it will be really helpful if you could elaborate here about how to use the "replace index" approach to keep the pointer integrity. To me, in multi-processing, due to the "pickle" nature of the Queue implementation in Python, I can't really find a way to replace an existing reference to a Queue without recreating a started process. For example, for a process: `Process(target=execution_logic, args=(queue))`, after it starts, how can you pass in another queue in runtime to the process? I tried passing in `queue_list`, and use `queue_list[0]` inside, but it doesn't work. – Rex Wang Jul 26 '18 at 18:53