I am trying to use a queue to load up a bunch of tasks and then have a process pool setup go at it where each process pops-out a task out of the queue and works on it. I am running into problems in that the setup is not working. Something is blocking the processes from getting started and I need help in figuring out the bug. E.g. the queue is filled up correctly, however, when the individual process runs, it doesn't start processing the task subroutine.
# -*- coding: utf-8 -*-
"""
Created on Tue Aug 30 17:08:42 2022
@author: Rahul
"""
import threading
import queue
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing as mp
import time
q = queue.Queue()
# some worker task
def worker(id, q):
print(f'{id}:: Worker running', flush=True)
while q.unfinished_tasks > 0:
item = q.get()
print(f'{id}::Working on {item}', flush=True)
print(f'{id}::Finished {item}', flush=True)
q.task_done()
print(f'{id}::Sleeping. Item: {item}', flush=True)
time.sleep(0.1)
print(
f'We reached the end. Queue size is {q.unfinished_tasks}', flush=True)
def main():
print('running main')
# Send thirty task requests to the worker.
for item in range(30):
q.put(item)
# Confirm that queue is filled
print(f'Size of queue {q.unfinished_tasks}')
id = 0
# start process pool
with ProcessPoolExecutor(max_workers=4) as executor:
executor.map(worker, [1, 2, 3, 4], [q, q, q, q])
# Block until all tasks are done.
q.join()
print('All work completed')
if __name__ == "__main__":
main()
This creates the following output and is stuck after that, no control of keyboard etc., have to shutdown IDE and restart.
running main
Size of queue 30