53

I wrote a script that has multiple threads (created with threading.Thread) fetching URLs from a Queue using queue.get_nowait(), and then processing the HTML. I am new to multi-threaded programming, and am having trouble understanding the purpose of the queue.task_done() function.

When the Queue is empty, it automatically returns the queue.Empty exception. So I don't understand the need for each thread to call the task_done() function. We know that we're done with the queue when its empty, so why do we need to notify it that the worker threads have finished their work (which has nothing to do with the queue, after they've gotten the URL from it)?

Could someone provide me with a code example (ideally using urllib, file I/O, or something other than fibonacci numbers and printing "Hello") that shows me how this function would be used in practical applications?

J. Taylor
  • 4,567
  • 3
  • 35
  • 55

4 Answers4

79

Queue.task_done is not there for the workers' benefit. It is there to support Queue.join.


If I give you a box of work assignments, do I care about when you've taken everything out of the box?

No. I care about when the work is done. Looking at an empty box doesn't tell me that. You and 5 other guys might still be working on stuff you took out of the box.

Queue.task_done lets workers say when a task is done. Someone waiting for all the work to be done with Queue.join will wait until enough task_done calls have been made, not when the queue is empty.


eigenfield points out in the comments that it seems really weird for a queue to have task_done/join methods. That's true, but it's really a naming problem. The queue module has bad name choices that make it sound like a general-purpose queue library, when it's really a thread communication library.

It'd be weird for a general-purpose queue to have task_done/join methods, but it's entirely reasonable for an inter-thread message channel to have a way to indicate that messages have been processed. If the class was called thread_communication.MessageChannel instead of queue.Queue and task_done was called message_processed, the intent would be a lot clearer.

(If you need a general-purpose queue rather than an inter-thread message channel, use collections.deque.)

user2357112
  • 260,549
  • 28
  • 431
  • 505
  • 2
    Yes, but what I'm asking is why the queue needs to know when the task (i.e. processing the HTML, inserting the extracted data into the DB, etc) is done at all? When it's empty I'm done with it. Why can't I just know that I'm done when the `queue.Empty` exception is thrown? Why does a queue of URLs need to know that I successfully entered something into the database, or processing some HTML? All the queue should care about is whether all of the URLs have been dispatched to some thread. – J. Taylor Apr 03 '18 at 19:05
  • 2
    @J.Taylor: Whoever's calling `Queue.join` needs to know. – user2357112 Apr 03 '18 at 19:06
  • For the use case I'm describing (trying to fetch and process pages in parallel), would I even need to call `Queue.join()` though? Why could I not just keep calling `Queue.get_nowait()` until it throws the `Queue.Empty` exception and know that I'm done at that point? Is `task_done()`/`Queue.join()` only useful when I need to ensure that every item is processed before the queue is terminated? For cases where I just need to ensure I've gotten all the items, can I do away with `queue.join()` altogether? – J. Taylor Apr 03 '18 at 19:49
  • 2
    @J.Taylor: Depends on who needs to know the work is done, and whether work is added to the queue while work is being done. The *workers* don't need to call `task_done` to know whether they're done. `task_done` is a way to communicate to any threads waiting for work to finish. – user2357112 Apr 03 '18 at 19:55
  • 13
    To me, the concept of a Queue that implies task_is_done / join semantics is unusual. A queue should only be concerned with put and get and wither it is empty or full. Not task_done() stuff. Am i in twilight zone here? – daparic Jul 16 '18 at 03:25
  • 1
    @ifelsemonkey: Its about the joining in the end. The main process has to know when the workers are _done_, not when the queue is empty. – JEM_Mosig Nov 08 '18 at 21:37
  • 1
    Since when does the `queue` concept suddenly got entangled with wither workers are done or not? – daparic Nov 09 '18 at 01:51
  • 3
    @ifelsemonkey It's just convenient and useful in a very common usage scenario, distributing tasks among workers. And it only applies if you need it. If you never plan to call `join()` on the queue, you don't need to know or care about `task_done()` either. – user4815162342 Dec 13 '18 at 21:53
  • The docs are a little silent on what to do when a worker fails to complete a task (e.g. exception). Presumably, it should have task_done in a finally block? (Also not what the python docs recommend) – user48956 Dec 16 '21 at 00:45
21

.task_done() is used to mark .join() that the processing is done.

If you use .join() and don't call .task_done() for every processed item, your script will hang forever.


Ain't nothin' like a short example;

import logging
import queue
import threading
import time

items_queue = queue.Queue()
running = False


def items_queue_worker():
    while running:
        try:
            item = items_queue.get(timeout=0.01)
            if item is None:
                continue

            try:
                process_item(item)
            finally:
                items_queue.task_done()

        except queue.Empty:
            pass
        except:
            logging.exception('error while processing item')


def process_item(item):
    print('processing {} started...'.format(item))
    time.sleep(0.5)
    print('processing {} done'.format(item))


if __name__ == '__main__':
    running = True

    # Create 10 items_queue_worker threads
    worker_threads = 10
    for _ in range(worker_threads):
        threading.Thread(target=items_queue_worker).start()

    # Populate your queue with data
    for i in range(100):
        items_queue.put(i)

    # Wait for all items to finish processing
    items_queue.join()

    running = False
Jossef Harush Kadouri
  • 32,361
  • 10
  • 130
  • 129
  • 2
    Why do you need the additional test of "if item is None:" when you will never reach it (unless you actually store None values in the items_queue which you don't do it right now) ? As far as I see the Empty exception is thrown if queue is empty so the execution will "jump" into the catch clause. – Alex Jan 21 '21 at 07:57
  • Is your " If you use .join() and don't call .task_done() for every processed item, your script will hang forever." an actual quote from somewhere else? If so, please link to the source. Otherwise it would be nice to have some info that this is "just" the gist of your (great) answer. – bugmenot123 Oct 05 '22 at 09:29
18

"Read the source, Luke!" -- Obi-one Codobi

The source for ayncio.queue is pretty short.

  • the number of unfinished tasks goes up by one when you put to the queue.
  • it goes down by one with you call task_done
  • join() awaits there being no unfinished tasks.

This makes join useful if and only if you are calling task_done(). Using the classic bank analogy:

  • people come in the doors and get in line; door is a producer doing a q.put()
  • when a teller is idle and a person is in line, they go to the teller window. teller does a q.get().
  • When the teller has finished helping the person, they are ready for the next one. teller does a q.task_done()
  • at 5 p.m., the doors are locked door task finishes
  • you wait until both the line is empty and each teller has finished helping the person in front of them. await q.join(tellers)
  • then you send the tellers home, who are now all idling with an empty queue. for teller in tellers: teller.cancel()

Without the task_done(), you cannot know every teller is done with people. You cannot send a teller home while they have a person at his or her window.

Charles Merriam
  • 19,908
  • 6
  • 73
  • 83
5

Could someone provide me with a code example (ideally using urllib, file I/O, or something other than fibonacci numbers and printing "Hello") that shows me how this function would be used in practical applications?

@user2357112's answer nicely explains the purpose of task_done, but lacks the requested example. Here is a function that calculates checksums of an arbitrary number of files and returns a dict mapping each file name to the corresponding checksum. Internal to the function, the work is divided among a several threads.

The function uses of Queue.join to wait until the workers have finished their assigned tasks, so it is safe to return the dictionary to the caller. It is a convenient way to wait for all files being processed, as opposed to them being merely dequeued.

import threading, queue, hashlib

def _work(q, checksums):
    while True:
        filename = q.get()
        if filename is None:
            q.put(None)
            break
        try:
            sha = hashlib.sha256()
            with open(filename, 'rb') as f:
                for chunk in iter(lambda: f.read(65536), b''):
                    sha.update(chunk)
            checksums[filename] = sha.digest()
        finally:
            q.task_done()

def calc_checksums(files):
    q = queue.Queue()
    checksums = {}
    for i in range(1):
        threading.Thread(target=_work, args=(q, checksums)).start()
    for f in files:
        q.put(f)
    q.join()
    q.put(None)  # tell workers to exit
    return checksums

A note on the GIL: since the code in hashlib internally releases the GIL while calculating the checksum, using multiple threads yields a measurable (1.75x-2x depending on Python version) speedup compared to the single-threaded variant.

user4815162342
  • 141,790
  • 18
  • 296
  • 355