18

I am trying to implement a multithreaded producer-consumer pattern using Queue.Queue in Python 2.7. I am trying to figure out how to make the consumers, i.e. the worker threads, stop once all required work is done.

See the second comment by Martin James to this answer: https://stackoverflow.com/a/19369877/1175080

Send an 'I am finished' task, instructing the pool threads to terminate. Any thread that gets such a task requeues it and then commits suicide.

But this does not work for me. See the following code for example.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

This program hangs after all three workers have read the exit indicator, i.e. -1 from the queue, because each worker requeues -1 before exiting, so the queue never becomes empty and q.join() never returns.

I came up with the following but ugly solution where I send a -1 exit indicator for each worker via the queue, so that each worker can see it and commit suicide. But the fact that I have to send an exit indicator for each worker feels a little ugly.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send one stop indicator for each worker.
    for i in range(3):
        q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

I have two questions.

  1. Can the method of sending a single exit indicator for all threads (as explained in the second comment of https://stackoverflow.com/a/19369877/1175080 by Martin James) even work?
  2. If the answer to the previous question is "No", is there a way to solve the problem in a way that I don't have to send a separate exit indicator for each worker thread?
Lone Learner
  • 18,088
  • 20
  • 102
  • 200
  • sending a killing signal for each worked looks like a good solution for me, I wouldnt say is so ugly. You can also just join the threads instead of join the queue – Netwave Aug 02 '17 at 11:10
  • 2
    Note that there is a [`ThreadPool`](https://stackoverflow.com/a/3386632/3767239) class available which takes the load of "manually" distributing tasks between multiple threads. You can `join` such a pool (instead of the queue) and then sending the *"stop"* signal will eventually terminate all threads. Actually I don't see why you want to `join` the queue instead of the threads here. With Python 3 you have even more functionality (and better documented) through the [concurrent](https://docs.python.org/dev/library/concurrent.futures.html#threadpoolexecutor) module. – a_guest Aug 02 '17 at 11:19
  • Some additional remarks. From your example code it is not clear why you would use such a *"stop"* command in the first place (you could just leave that part out and wait for the queue to `join`). Then - if you use such a command - it is not guaranteed that each thread will be shut down "properly": `q.join()` might resume before all threads received the `-1` because you call `q.task_done()` before re-putting the `-1` to the queue (which means the task count may reach zero before `-1` is re-put (which increases the count) and thus `q.join()` may resume). – a_guest Aug 02 '17 at 13:15
  • @a_guest Have you actually taken my code, edited it as you suggest, and seen the outcome? If I leave that part ("stop" command) out and only wait for the queue to join, then the program hangs forever after the queue joins, i.e. after `q.join()` returns because the worker threads do not exit. That's why the "stop" command is there to ensure that the worker threads commit suicide and the main program can terminate. – Lone Learner Aug 02 '17 at 14:49
  • @a_guest In this specific example, there is no possibility of `q.join()` resuming prematurely because at least one `q.task_done()` occurs after a `q.put(-1)` due to the artificial 1-second delay in the worker threads. However, what you mention is true for real-world program like this where there would be no such artificial delay. – Lone Learner Aug 02 '17 at 14:55
  • @LoneLearner If that is your concern, just add `os._exit(0)` at the end - not a "clean" exit but your example code doesn't seem to require one anyway (no resources to be released, ...). About the `q.join`: A worker needs 1 second to complete a task, there are 3 workers and 0.5 seconds between two subsequently queued tasks. This means each task will be immediately consumed by a worker (also the `-1`). Because you `sleep(1)` also when `-1` is received, by the moment this call returns all other workers have finished their tasks already 0.5 seconds ago and `q.task_done()` brings the count to zero. – a_guest Aug 03 '17 at 08:04
  • @a_guest Did you actually run my code and observe the behavior you mention, or are you just theorizing? What you mention about `q.join()` does not happen if I run this code on my system. Here's why: After the last `q.put()` call in the `for`-loop in the master, i.e. the `q.put(9)` call in the master, some worker receives gets this `9` from the queue by calling `q.get()`. After this, that worker sleeps for 1 second but the master sleeps only for 0.5 second and then immediately calls `q.put(-1)` three times. So the queue is never really empty until all the three workers get these `-1`s. – Lone Learner Aug 03 '17 at 13:20
  • @a_guest In fact, in my second example code it is guaranteed that the following sequence of operations occur: a worker gets 0, another gets 1, the worker that got 0 calls `q.task_done()`, ..., some worker gets 8, some worker that got 7 calls `q.task_done()`, another worker gets 9, the worker that got 8 calls `q.task_done()`, the three `-1`s are sent, two free workers (that had 7 and 8 earlier) get the `-1`s, the worker that got 9 now calls `q.task_done()` and then it gets the last `-1`. So the `q` is never empty until the worker that once had 9 gets the last `-1` and calls `q.task_done()`. – Lone Learner Aug 03 '17 at 13:30
  • @LoneLearner I didn't refer to your second example but to your first (where you put only a single `-1`). My point is that the queue's task count drops to zero *before* all threads received (and processed) the `-1`. Admittedly this was "theoretical" in a sense that I didn't observe that behavior (which is probably due to `-1` being re-put "immediately" after `q.task_done()`). However if you add (for example) `time.sleep(0.001)` in the `if`-branch before `q.put(-1)` then you will observe `q.join()` to resume *before* all threads have received the `-1`. – a_guest Aug 03 '17 at 15:11
  • @a_guest I agree that you are right about what you mention about the first code example. – Lone Learner Aug 04 '17 at 06:55
  • Why structure the multithreading this way in the first place? I think this approach attempts to use the `Queue()` in an unintended way, specifically overloading it as a means of thread communication (sending `-1` "kill" signals). Not queueing these signals makes the application behave as expected (when the queue is empty, the parent thread unblocks and exits killing the child threads). – tdube Aug 07 '17 at 19:34
  • @tdube I have already explained that in my question. See second paragraph where I explain that I got this idea from the second comment by Martin James to this answer: https://stackoverflow.com/a/19369877/1175080 . Do you have a suggestion on how to better structure multithreading here? Also, Travis' solution ( https://stackoverflow.com/a/45471622/1175080 ) below seems to solve the problem without the need for any overloading of `Queue.Queue` with sending kill signals. – Lone Learner Aug 08 '17 at 02:42
  • @LoneLearner My recommendation would change based on the actual problem you're trying to solve. Yes, Travis' solution looks good to me. – tdube Aug 08 '17 at 13:37

4 Answers4

19

Don't call it a special case for a task.

Use an Event instead, with non-blocking implementation for your workers.

stopping = threading.Event()

def worker(n, q, timeout=1):
    # run until the master thread indicates we're done
    while not stopping.is_set():
        try:
            # don't block indefinitely so we can return to the top
            # of the loop and check the stopping event
            data = q.get(True, timeout)
        # raised by q.get if we reach the timeout on an empty queue
        except queue.Empty:
            continue
        q.task_done()

def master():
    ...

    print 'waiting for workers to finish'
    q.join()
    stopping.set()
    print 'done'
  • 1
    I believe you meant `q.get(True, timeout)`, i.e. you meant the `block` parameter of `q.get()` to be `True` instead of `False`. If you set it as `False`, then the `q.get()` does not block at all and returns immediately which causes the `while` loop to spin very fast. If you set it to `True`, then it blocks for `timeout` seconds before unblocking. – Lone Learner Aug 07 '17 at 09:28
12

Can the method of sending a single exit indicator for all threads (as explained in the second comment of https://stackoverflow.com/a/19369877/1175080 by Martin James) even work?

As you have notice it can't work, spreading the message will make the last thread to update the queue with one more item and since you are waiting for a queue that will never be empty, not with the code you have.

If the answer to the previous question is "No", is there a way to solve the problem in a way that I don't have to send a separate exit indicator for each worker thread?

You can join the threads instead of the queue:

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)]
    for t in threads:
        threads.start()
    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    for t in threads:
        t.join()
    print 'done'

master()

As the Queue documentation explain get method will rise an execption once its empty so if you know already the data to process you can fill the queue and then spam the threads:

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        try:
            data = q.get(block=False, timeout=1)
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
        except Queue.Empty:
            break


def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

Here you have a live example

Netwave
  • 40,134
  • 6
  • 50
  • 93
  • Could you please answer my first question too? – Lone Learner Aug 02 '17 at 11:13
  • @LoneLearner, yes, give me a sec :) – Netwave Aug 02 '17 at 11:17
  • @DanielSanchez Your answer is good, but your second example changed the question a bit. The OP instantiated his worker threads first and then populated the queue, while you swap those tasks. If you didn't swap those tasks, you might have some worker threads exiting early (due to timeout) if they see the empty queue before work loads are added. Your approach of removing the additional thread communication signals is appropriate. Personally, I think having threads block for work and the main thread controlling exit is the cleanest approach for most applications. – tdube Aug 07 '17 at 19:49
  • @tdube, yes, that is just about changing the timeout, I specified that if he knows the exact number or tasks (if he dont want to stream them) its better to fill the queue first so I changed the example for a reason I explained. If it is not clear enough Im open for an expand. any sugestion? Thanks – Netwave Aug 07 '17 at 21:05
  • Sorry, I guess I missed your reference to that. :) – tdube Aug 07 '17 at 21:45
4

Just for completeness sake: You could also enqueue a stop signal which is -(thread count). Each thread then can increment it by one and re queue it only if the stop signal is != 0.

    if data < 0: # negative numbers are used to indicate that the worker should stop
        if data < -1:
            q.put(data + 1)
        # Commit suicide.
        print 'worker', n, 'is exiting'
        break

But i'd personally go with Travis Mehlinger or Daniel Sanchez answer.

SleepProgger
  • 364
  • 6
  • 20
3

In addition to @DanielSanchez excellent answer, I propose to actually rely on a similar mechanism as a Java CountDownLatch.

The gist being,

  • you create a latch that will open only after a certain counter went down,
  • when the latch is opened, the thread(s) waiting on it will be allowed to proceed with their execution.

  • I made an overly simple example, check here for a class like example of such a latch:

    import threading
    import Queue
    import time
    
    WORKER_COUNT = 3
    latch = threading.Condition()
    count = 3
    
    def wait():
        latch.acquire()
        while count > 0:
            latch.wait()
        latch.release()
    
    def count_down():
        global count
        latch.acquire()
        count -= 1
        if count <= 0:
            latch.notify_all()
        latch.release()
    
    def worker(n, q):
        # n - Worker ID
        # q - Queue from which to receive data
        while True:
            data = q.get()
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
            if data == -1: # -1 is used to indicate that the worker should stop
                # Requeue the exit indicator.
                q.put(-1)
                # Commit suicide.
                count_down()
                print 'worker', n, 'is exiting'
                break
    
    # master() sends data to worker() via q.  
    
    def master():
        q = Queue.Queue()
    
        # Create 3 workers.
        for i in range(WORKER_COUNT):
            t = threading.Thread(target=worker, args=(i, q))
            t.start()
    
        # Send 10 items to work on.
        for i in range(10):
            q.put(i)
            time.sleep(0.5)
    
        # Send an exit indicator for all threads to consume.
        q.put(-1)
        wait()
        print 'done'
    
    master()
    
Adonis
  • 4,670
  • 3
  • 37
  • 57