4

I want to implement producer-consumer pattern by using multiprocessing.pool.Pool

Since the JoinableQueue cannot be used in Pool (would claim RuntimeError: JoinableQueue objects should only be shared between processes through inheritance), I have to use multiprocessing.Manager() inspired by this answer.

The question is: now the program may hang when consumer jobs are larger than producer jobs.

import queue
import random
from multiprocessing import Manager, Pool


def consumer(q):
    while True:
        try:
            res = q.get(block=False)
            if res is None:
                break
            print(f'Consume {res}')
        except queue.Empty:
            pass


def producer(q, food):
    for i in range(2):
        res = f'{food} {i}'
        print(f'Produce {res}')
        q.put(res)
    q.put(None) # sentinel


if __name__ == "__main__":
    with Pool() as pool:
        jobs = 2
        foods = ['apple', 'banana', 'melon', 'salad']
        q = Manager().Queue()
        [pool.apply_async(func=consumer, args=(q, )) for _ in range(jobs + 1)]  # would hang

        # would not hang only when the consumer jobs is equal or less than the producer jobs
        # [pool.apply_async(func=consumer, args=(q, )) for _ in range(jobs)] 

        [
            pool.apply_async(func=producer, args=(q, random.choice(foods)))
            for _ in range(jobs)
        ]

        pool.close()
        pool.join()

Seems like those extra consumers can't get the sentinel and just wait there forever.

So what's the elegant way to implement the producer-consumer pattern in multiprocessing.pool.Pool?

Or is it only possible with multiprocessing.Process + JoinableQueue?

funkid
  • 577
  • 1
  • 10
  • 30
  • your producer puts only one "sentinel" to the queue, how multiple consumers could get it in case of single producer? – NobbyNobbs May 03 '21 at 05:25
  • @NobbyNobbs Actually I don't know whether put sentinel into the queue is the **correct** way to implement the producer-consumer pattern in this `Pool` case. I can put *a lot of* sentinels to *notify* those consumers, but is that a good way? Or is that an elegant way? And it seems that I can't put the sentinel back into the queue when a consumer get one since it would mislead the other consumers? – funkid May 03 '21 at 05:30
  • Look at `multiprocessing.Event` for consumers cancellation – NobbyNobbs May 03 '21 at 05:54
  • @NobbyNobbs Thanks, but `Event` is more like a pause-restart mechanism? – funkid May 03 '21 at 06:27

3 Answers3

2

You can use a multiprocessing.JoinableQueue by having your process pool workers accessing it as a global variable that gets initialized using a pool initializer:

import multiprocessing

def init_pool(input_q, output_q):
    global in_q, out_q
    in_q = input_q
    out_q = output_q


def worker():
    print(type(in_q))


# required by Windows
if __name__ == '__main__':
    in_q = multiprocessing.JoinableQueue()
    out_q = multiprocessing.JoinableQueue()
    pool = multiprocessing.Pool(2, initializer=init_pool, initargs=(in_q, out_q))
    pool.apply(worker)

Prints:

<class 'multiprocessing.queues.JoinableQueue'>
Booboo
  • 38,656
  • 3
  • 37
  • 60
1

Seems like using multiprocessing.Process + JoinableQueue is a more elegant way.

import queue
import random
from multiprocessing import JoinableQueue, Process


def consumer(q: JoinableQueue):
    while True:
        try:
            res = q.get(block=False)
            print(f'Consume {res}')
            q.task_done()
        except queue.Empty:
            pass


def producer(q: JoinableQueue, food):
    for i in range(2):
        res = f'{food} {i}'
        print(f'Produce {res}')
        q.put(res)
    q.join()


if __name__ == "__main__":
    foods = ['apple', 'banana', 'melon', 'salad']
    jobs = 2
    q = JoinableQueue()

    producers = [
        Process(target=producer, args=(q, random.choice(foods)))
        for _ in range(jobs)
    ]

    # daemon=True is important here
    consumers = [
        Process(target=consumer, args=(q, ), daemon=True)
        for _ in range(jobs * 2)
    ]

    # + order here doesn't matter
    for p in consumers + producers:
        p.start()

    for p in producers:
        p.join()
funkid
  • 577
  • 1
  • 10
  • 30
1

In funkid's self-answer, when using JoinableQueue, you don't have to send/receive the trailing None in producer/consumer anymore. All producers wait until all of the items in the queue have been consumed completely (after the final task_done), then joined by the main process.

As pointed out by @Koby, daemon=True is important because it allows the consumers (blocked by the empty queue) to be killed when the main process terminates.

Modified program:

import random
from multiprocessing import JoinableQueue, Process


def consumer(q: JoinableQueue):
    while True:
        res = q.get()
        print(f'Consume {res}')
        q.task_done()


def producer(q: JoinableQueue, food):
    for i in range(2):
        res = f'{food} {i}'
        print(f'Produce {res}')
        q.put(res)
    q.join()


if __name__ == "__main__":
    foods = ['apple', 'banana', 'melon', 'salad']
    jobs = 2
    q = JoinableQueue()

    producers = [
        Process(target=producer, args=(q, random.choice(foods)))
        for _ in range(jobs)
    ]

    # daemon=True is important here
    consumers = [
        Process(target=consumer, args=(q, ), daemon=True)
        for _ in range(jobs * 2)
    ]

    # + order here doesn't matter
    for p in consumers + producers:
        p.start()

    for p in producers:
        p.join()
ernestchu
  • 91
  • 3