2

I am trying to use a queue to load up a bunch of tasks and then have a process pool setup go at it where each process pops-out a task out of the queue and works on it. I am running into problems in that the setup is not working. Something is blocking the processes from getting started and I need help in figuring out the bug. E.g. the queue is filled up correctly, however, when the individual process runs, it doesn't start processing the task subroutine.

# -*- coding: utf-8 -*-
"""
Created on Tue Aug 30 17:08:42 2022

@author: Rahul
"""

import threading
import queue
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing as mp
import time


q = queue.Queue()

# some worker task


def worker(id, q):

    print(f'{id}:: Worker running', flush=True)
    while q.unfinished_tasks > 0:
        item = q.get()
        print(f'{id}::Working on {item}', flush=True)
        print(f'{id}::Finished {item}', flush=True)
        q.task_done()

        print(f'{id}::Sleeping. Item: {item}', flush=True)

        time.sleep(0.1)
    print(
        f'We reached the end.  Queue size is {q.unfinished_tasks}', flush=True)


def main():

    print('running main')
    # Send thirty task requests to the worker.
    for item in range(30):
        q.put(item)

    # Confirm that queue is filled
    print(f'Size of queue {q.unfinished_tasks}')
    id = 0

    # start process pool
    with ProcessPoolExecutor(max_workers=4) as executor:
        executor.map(worker, [1, 2, 3, 4], [q, q, q, q])

    # Block until all tasks are done.
    q.join()
    print('All work completed')


if __name__ == "__main__":
    main()

This creates the following output and is stuck after that, no control of keyboard etc., have to shutdown IDE and restart.

running main
Size of queue 30
Buddy Li
  • 35
  • 5

1 Answers1

1

For multiprocessing, there are 2 ways to uses a queue.

You have to either

  1. use queue as shared global via initializer parameter or
  2. use a manager

See Python multiprocessing.Queue vs multiprocessing.manager().Queue() for examples for how to set it up.

Below is an example of using a manager of OP's use case.

A few things to highlight:

  1. Uses manager.Queue() which helps share the queue across different processes.

  2. Typically for worker process, it's better to use while True loop that terminates when seeing some SENTINEL value. This will allow the worker to wait even if all current work is done, in case there are more work coming. It's also more robust than q.empty() check (or q.unfinished_tasks, which doesn't exist in multiprocessing version)

  3. Using the SENTINEL approach requires adding 4 SENTINEL values, 1 for each process, after all the tasks.

  4. with ProcessPoolExecutor ... context manager is blocking, meaning it will wait until all Processes exits before continuing to the next lines. You may consider using explicit shutdown for non-block statments, e.g.

executor = ProcessPoolExecutor(max_workers=4)
executor.map(...)
executor.shutdown(wait=False)

Now finally, the example solution:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing as mp
import time



# some worker task
SENTINEL = 'DONE'

def worker(id, q):

    print(f'{id}:: Worker running', flush=True)
    # better to use the while True with SENTINEL
    # other methods such as checking 'q.empty()' may be unreliable.
    while True:  
        item = q.get()
        if item == SENTINEL:
            q.task_done()
            break
        print(f'{id}::Working on {item}', flush=True)
        print(f'{id}::Finished {item}', flush=True)
        q.task_done()

        print(f'{id}::Sleeping. Item: {item}', flush=True)

        time.sleep(0.1)
    print(
        f'We reached the end.', flush=True)


def main():

    print('running main')
    # Send thirty task requests to the worker.
    with mp.Manager() as manager:
        q = manager.Queue()

        for item in range(30):
            q.put(item)

        # adding 4 sentinel values at the end, 1 for each process.
        for _ in range(4):
            q.put(SENTINEL)

        # Confirm that queue is filled
        print(f'Approx queue size: {q.qsize()}')
        id = 0

        # start process pool
        with ProcessPoolExecutor(max_workers=4) as executor:
            executor.map(worker, [1, 2, 3, 4], [q, q, q, q])

        print('working')
        # Block until all tasks are done.
        q.join()
        print('All work completed')


if __name__ == "__main__":
    main()
Tim
  • 3,178
  • 1
  • 13
  • 26
  • In worker() when SENTINEL item is reached, queue.task_done() is not being called. Does that mean that SENTINEL persists in queue forever without getting popped out fully? – Buddy Li Sep 01 '22 at 07:10
  • Could you also please pin-point why while True forever loop construct is needed in worker() beyond the completion of main() thread? I saw the Python doc example with forever while loop, and the worker process keeps running in background long after main thread finished and control returned back to IDE prompt. I confirmed it by adding an item to queue by typing q.put(some_item) on interpreter prompt, and it triggered the worker() automatically. 1/2 – Buddy Li Sep 01 '22 at 07:20
  • Contd. 2/2 I only want the worker to start monitoring queue when some event tells it that q is being used of about to be used. Rest of the time, I would like the worker to either stay asleep or be finished, so a new instance can be created. Otherwise, the worker keeps using cpu cycles in the background to continuously run the ‘while True’ loop and exiting out of it. – Buddy Li Sep 01 '22 at 07:21
  • @BuddyLi you are correct, it does indeed need `q.task_done()` after reaching the `SENTINEL` also. The answer is updated. This would successfully terminate all worker nodes after the task is done. The reason not to use `q.empty()` check is because that may lead to race conditions. Suppose there is 1 item left, 2 processes both check `q.empty()` before either one calls `q.get()`. Then, both process will enter the loop and try to get the 1 item. 1 process will be left hanging (or until timeout). You may also want some error handling using `except queue.Empty` in that case. So, sentinel is cleaner – Tim Sep 01 '22 at 14:41
  • does q.get() create a block until q.task_done() is called? – Buddy Li Sep 02 '22 at 04:00
  • no, `q.get()` creates a block until it gets the next item from the queue. `q.join()` creates a block until all `q.task_done()` is called for all the items in the queue. – Tim Sep 02 '22 at 14:31