0

I am using multiprocessing to process my records.

queue = Queue()

def produce(i, item):
    data = process(i, item)
    queue.put(data)

def process(i, item):
    data = do_processing(i, item)
    return data

if __name__ == '__main__':
    records = load_records()

    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        print('produce items')
        for i, item in ennumerate(records.items()):
            executor.submit(produce, i, item)

    print('queue size:{}'.format(queue.qsize()))
    while not queue.empty():
        save(queue.get())

Here I put records in a queue from produce as that step is time-consuming. After processing the records, I save them. As the consume step is not time consuming, I don't bother to run it in a separate thread.

Here after I execute the code, the queue remains empty. What is going on here?

Exploring
  • 2,493
  • 11
  • 56
  • 97
  • Presumably the program ends before any thread has put anything in the queue. – mkrieger1 Sep 16 '21 at 22:30
  • But I am running the executor with context i.e. `with`. `with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor ` should wait until everything is processed. That has been my understanding. – Exploring Sep 16 '21 at 22:32
  • Possibly related: https://stackoverflow.com/questions/68751929/why-does-pythons-threadpoolexecutor-work-queue-appear-to-accept-more-items-than – Jeremy Friesner Sep 16 '21 at 22:32
  • 2
    You're using multi-processing rather than multi-threading. Each process will create its own copy of queue(). You need to use an explicitly shared data structure. Alternatively, just call process() directly, and use the fact that there are various map calls that will return all the results. – Frank Yellin Sep 16 '21 at 23:20
  • @FrankYellin is there shared data structure that is equivalent to a queue that I can swap out? – Exploring Sep 16 '21 at 23:23
  • Looks like I can simply swap queue with multiprocessing.Queue and that should do. – Exploring Sep 16 '21 at 23:36
  • To answer your question, yes, but they make your code much more complicated. Why not just write: `for result in executor.imap(function, values): handle(result)` – Frank Yellin Sep 16 '21 at 23:48
  • And to answer your question, look at `multiprocessing.Queue`. But again, I don't think is the best way. – Frank Yellin Sep 16 '21 at 23:52
  • @FrankYellin `for result in executor.imap(function, values): handle(result) ` would not work as downward I am writing to the same file. – Exploring Sep 16 '21 at 23:54
  • Then make your code `for result in executor.imap(...): queue.put(result)`. And then handle what's in the queue once you have all your results. Multiprocessor queues are expensive and difficult. In general, you want your threads to do something and return a result. Handling the results should be handled by your main thread. – Frank Yellin Sep 17 '21 at 00:00
  • @FrankYellin Can you please add an example as I find it vague to interpret what you said? Thanks in advance. – Exploring Sep 17 '21 at 00:07
  • @FrankYellin current code is handling the result in the main thread. Main thread is writing the contents of queue to file `save(queue.get())`. – Exploring Sep 17 '21 at 00:10
  • See the code snippet in the answer below. Answers let me do proper indentation. – Frank Yellin Sep 17 '21 at 00:13

3 Answers3

1

I think this is how to do what you want. As mentioned in a comment, each process runs in its own memory space so global variables like the queue can't be simply shared and you can't pass it as an argument to each process.

When using a ProcessPoolExecutor, you can effectively do what's needed—sharing the queue—by defining an initializer function that will be called at the beginning of each process that will create a global for that process and pass the queue as an argument to it.

Here's something that closely resembles your code and is actually runnable illustrating what I mean:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue
import os


MAX_RECORDS = 10

def load_records():
    return dict.fromkeys(range(MAX_RECORDS), 0)

def do_processing(item):
    return item

def init_queue(queue):
    globals()['queue'] = queue  # Makes queue a global in each process.

def produce(i, item):
    data = process(i, item)
    queue.put(data)

def process(i, item):
    data = do_processing(item)
    return data


if __name__ == '__main__':
    records = load_records()

    queue = Queue()
    with ProcessPoolExecutor(max_workers=os.cpu_count(),
                             initializer=init_queue, initargs=(queue,)) as executor:
        print('producing items')
        for i, item in enumerate(records.items()):
            future = executor.submit(produce, i, item)
        print('done producing items')

    print('queue size: {}'.format(queue.qsize()))
    while not queue.empty():
        print(queue.get())

Output:

producing items
done producing items
queue size: 10
(0, 0)
(1, 0)
(2, 0)
(3, 0)
(4, 0)
(5, 0)
(6, 0)
(7, 0)
(8, 0)
(9, 0)
martineau
  • 119,623
  • 25
  • 170
  • 301
  • thanks for adding a detailed explanation. However, this code is exactly the same as what I submitted though this version is more readable. Have I missed anything? – Exploring Sep 17 '21 at 01:43
  • This could cause deadlock with a full queue preventing a child from joining at the `__exit__` of the executor. https://stackoverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em – Aaron Sep 17 '21 at 04:28
  • @Exploring: Yes. In my version there is actually a single `Queue` being shared among the processes. In yours there is a different one in each process, including the main. – martineau Sep 17 '21 at 06:04
  • @martineau - in my solution, it is not creating multiple Queue. I am using `multiprocessing.Queue()`. So your comment is not correct. – Exploring Sep 17 '21 at 06:14
  • @Exploring: No, you are mistaken because you don't understand how multiprocessing works in Python. The `queue = multiprocessing.Queue()` will be executed each time the script is re-imported, so there will be a *different* module-level global variable in each one. – martineau Sep 17 '21 at 06:17
  • @martineau - I am not reimporting or doing anything. Running the script as is. – Exploring Sep 17 '21 at 06:20
  • @Exploring: That does not matter, it's done implicitly. As I said, you don't understand how multiprocessing works. I am not going to argue with you anymore about it. – martineau Sep 17 '21 at 06:22
  • @Aaron: The linked question does not apply because `join()` is not being called. The `__exit__` of a `concurrent.futures.Executor` context manager calls its [`shutdown()`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown) method, which by default waits until all pending futures are done executing. Besides that, it's very unlikely that the queue will ever get full, since no [`maxsize`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue) is specified when it's created. If it ever got full, there's no memory left anyway. – martineau Sep 17 '21 at 06:33
  • @martineau - what you said makes sense as I re-read the doc again. Appreciate for taking the time to respond. Working on this code last 3 days constantly and still trying to get my head around python multiprocessing. Will upvote. By the way, even this code gets very slow/stuck after a while. After processing 60K items, it started to progress very slow. I have plenty to free memory (>20GB free). But CPU gets all exhausted. I have 16 cores and `htop` shows load average of `17.89`. Dont understand what is causing all this busy wait. – Exploring Sep 17 '21 at 07:40
  • I ran the code almost as is. But it is strange, what is causing all this busy wait. `produce` is just adding items to the queue and I don't even have a consumer. – Exploring Sep 17 '21 at 07:49
  • I am almost thinking just to use redis but still would love to know what are issues here. Any pointers welcome. – Exploring Sep 17 '21 at 07:51
  • @Exploring: I believe it's obvious my answer works. The reason you're getting a queue size of zero is because each process has its own queue and so they're not all using the one created in the main process (each has its own). In my answer there's only one queue which is actually being shared among them all — as you can see from the results printed. I have no idea why whatever code you're using is getting slower because the code in your question is incomplete and not the real code you're running — and what's in mine is just the minimum necessary to make it runnable). – martineau Sep 17 '21 at 07:55
  • @Exploring: It's hard to give pointers without knowing what actually going on. – martineau Sep 17 '21 at 07:56
  • @martineau - its not giving queue size of 0. Your code snippet and what I have set wrongly i.e. `queue = multiprocessing.Queue()` both gets stuck/slow after processing ~60K records. Its strange both of them runs very fast initially and then gets slow as CPU reaches high load average. – Exploring Sep 17 '21 at 08:02
  • I read lots of issues in SO where it was mentioned people experienced slow response from `multiprocessing.Queue()`. – Exploring Sep 17 '21 at 08:10
  • @Exploring: What do you mean? The title of your question is "Why does the queue size remain zero?" — so now you're now saying it's not? Regardless, I have no idea why it would get slower unless some resource what being consumed and not recycled or something just keeps getting bigger and bigger. It's important to note that using the `ProcessPoolExecutor` as a context manager via the `with` causes things to block at the end until all the futures have executed, so the queue could get very large before anything is removed from it. This code is not concurrently removing items from the queue. – martineau Sep 17 '21 at 08:28
  • @martineau initially made that mistake, which I solved fast as I understood I am using `multiprocessing.Queue()` incorrectly. Probably I should create a new question for this slow behaviour. I have plenty of free memory in machine (>20GB free), so even if queue get large, it is ok. But see very high CPU load average from htop. – Exploring Sep 17 '21 at 08:33
  • @Exploring: Yes, you would need to ask a new question for that because it's a different topic. The fact that you were initially confusing threads and processes until I edited and fixed it only confused matters. Regardless, I believe I correctly answered the titular question that was asked and think you should accept my answer. – martineau Sep 17 '21 at 09:50
0

Use multiprocessing.Queue() for handling multiprocessing.

queue = multiprocessing.Queue()

def produce(item):
    data = process(item)
    queue.put(data)

def process(item):
    data = do_processing(item)
    return data

if __name__ == '__main__':
    records = load_records()

    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        print('produce items')
        for item in records.items():
            executor.submit(produce, item)

    print('queue size:{}'.format(queue.qsize()))
    while not queue.empty():
        save(queue.get())
Exploring
  • 2,493
  • 11
  • 56
  • 97
-2
def process(item):
    data = do_processing(item)
    return data

queue = Queue() # not a multiprocessing queue

with multiprocessing.Pool(processes=os.cpu_count()) as pool:
    for result in pool.imap(process, records):
        queue.put(result)

while not queue.empty():
    save(queue.get())
Frank Yellin
  • 9,127
  • 1
  • 12
  • 22
  • 2
    Thank you for this code snippet, which might provide some limited, immediate help. A [proper explanation](https://meta.stackexchange.com/q/114762) would greatly improve its long-term value by showing why this is a good solution to the problem and would make it more useful to future readers with other, similar questions. Please edit your answer to add some explanation, including the assumptions you’ve made. – martineau Sep 17 '21 at 00:17
  • @martineau: I didn't feel like repeating everything that I had already written above. I presumed the OP knew what was going on. – Frank Yellin Sep 17 '21 at 01:01
  • Exploring: The code above will call process on each element of records. It does exactly what the original code did. Do you need something additional? – Frank Yellin Sep 17 '21 at 01:02
  • 1
    @Exploring. I purposely not using Multprocessing.Queue(). It is expensive. Instead, there is just one queue on the main thread. As each result is returned back, it is put on a single queue by the main thread. – Frank Yellin Sep 17 '21 at 01:05
  • 1
    My point was that the OP is not the only audience (assuming they understand everything). Folks shouldn't have to wade through a bunch of comments under the question picking out yours to understand the code either. – martineau Sep 17 '21 at 01:10
  • @FrankYellin 'ProcessPoolExecutor' object has no attribute 'imap' – Exploring Sep 17 '21 at 01:32
  • Oops. Sorry. That's what I get for typing from memory. I meant to create a pool. Fixed. – Frank Yellin Sep 17 '21 at 01:41
  • @FrankYellin can you please put a little bit more code snippet? `process` calls `produce` which calls `queue.put(data)`. So I can't make sense of what you pasted here. – Exploring Sep 17 '21 at 01:50
  • @FrankYellin how to pass counter to imap (executor.submit(produce, i, item))? – – Exploring Sep 17 '21 at 02:31
  • #1) I added some code to be a little bit clearer. I'm not calling `produce()` at all. I've gotten rid of it. All queue handling should be done by your local process. Trying to do queue handling using a global multiprocessor queue is going to complicate your code and slow it down. – Frank Yellin Sep 17 '21 at 19:56
  • #2) If you want to include a counter, the do something like `pool.imap(process, enumerate(records))`. Then rewrite `process()` so that it knows its single argument is a list of the record number and the record. – Frank Yellin Sep 17 '21 at 19:58