9

When running a large number of tasks (with large parameters) using Pool.apply_async, the processes are allocated and go to a waiting state, and there is no limit for the number of waiting processes. This can end up by eating all memory, as in the example below:

import multiprocessing
import numpy as np

def f(a,b):
    return np.linalg.solve(a,b)

def test():

    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))
    p.close()
    p.join()

if __name__ == '__main__':
    test()

I'm searching for a way to limit the waiting queue, in such a way that there is only a limited number of waiting processes, and Pool.apply_async is blocked while the waiting queue is full.

André Panisson
  • 876
  • 9
  • 22

4 Answers4

8

multiprocessing.Pool has a _taskqueue member of type multiprocessing.Queue, which takes an optional maxsize parameter; unfortunately it constructs it without the maxsize parameter set.

I'd recommend subclassing multiprocessing.Pool with a copy-paste of multiprocessing.Pool.__init__ that passes maxsize to _taskqueue constructor.

Monkey-patching the object (either the pool or the queue) would also work, but you'd have to monkeypatch pool._taskqueue._maxsize and pool._taskqueue._sem so it would be quite brittle:

pool._taskqueue._maxsize = maxsize
pool._taskqueue._sem = BoundedSemaphore(maxsize)
ecatmur
  • 152,476
  • 27
  • 293
  • 366
  • 1
    I'm using Python 2.7.3, and the _taskqueue is of type Queue.Queue. It means it is a simple Queue, and not a multiprocessing.Queue. Subclassing multiprocessing.Pool and overriding __init__ works fine, but monkey-patching the object is not working as expected. However, this is the hack that I was searching for, thanks. – André Panisson Jun 15 '12 at 22:43
3

Wait if pool._taskqueue is over the desired size:

import multiprocessing
import time

import numpy as np


def f(a,b):
    return np.linalg.solve(a,b)

def test(max_apply_size=100):
    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))

        while p._taskqueue.qsize() > max_apply_size:
            time.sleep(1)

    p.close()
    p.join()

if __name__ == '__main__':
    test()
TaylorMonacelli
  • 350
  • 2
  • 3
  • 9
Roger Dahl
  • 15,132
  • 8
  • 62
  • 82
  • Just want to add that I found this to be the easiest solution to my memory problems with multiprocessing. I used max_apply_size = 10 and that works fine for my problem, which is a slow file conversion. Using a semaphore as @ecatmur suggests seems like a more robust solution but might be overkill for simple scripts. – Nate Oct 02 '17 at 19:22
  • TaylorMonacelli That your edits were rejected exemplifies the problems with mods on SO. Your edit fixed a bug. @greg-449 is a "drive-by-mod", approves only 15% of edits and gave a nonsensical reason for the rejection. – Roger Dahl Jan 17 '20 at 20:27
1

Here is a monkey patching alternative to the top answer:

import queue
from multiprocessing.pool import ThreadPool as Pool


class PatchedQueue():
  """
  Wrap stdlib queue and return a Queue(maxsize=...)
  when queue.SimpleQueue is accessed
  """

  def __init__(self, simple_queue_max_size=5000):
    self.simple_max = simple_queue_max_size  

  def __getattr__(self, attr):
    if attr == "SimpleQueue":
      return lambda: queue.Queue(maxsize=self.simple_max)
    return getattr(queue, attr)


class BoundedPool(Pool):
  # Override queue in this scope to use the patcher above
  queue = PatchedQueue()

pool = BoundedPool()
pool.apply_async(print, ("something",))

This is working as expected for Python 3.8 where multiprocessing Pool is using queue.SimpleQueue to setup the task queue. It sounds like the implementation for multiprocessing.Pool may have changed since 2.7

nijave
  • 489
  • 5
  • 11
  • I didn't test the ThreadPool one, but if I modify it to `from multiprocessing.pool import Pool`, it does not work (the limit is not changed, and it seems that the `SimpleQueue` is not being changed to `Queue`). Any idea how to solve this? – Filipe Jun 07 '21 at 17:07
0

You could add explicit Queue with maxsize parameter and use queue.put() instead of pool.apply_async() in this case. Then worker processes could:

for a, b in iter(queue.get, sentinel):
    # process it

If you want to limit the number of created input arguments/results that are in memory to approximately the number of active worker processes then you could use pool.imap*() methods:

#!/usr/bin/env python
import multiprocessing
import numpy as np

def f(a_b):
    return np.linalg.solve(*a_b)

def main():
    args = ((np.random.rand(1000,1000), np.random.rand(1000))
            for _ in range(1000))
    p = multiprocessing.Pool()
    for result in p.imap_unordered(f, args, chunksize=1):
        pass
    p.close()
    p.join()

if __name__ == '__main__':
    main()
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • Using `imap` makes no difference. The input queue is still unlimited and using this solution will end up eating all memory. – Radim Sep 11 '15 at 15:07
  • @Radim: the `imap` code in the answer works even if you give it an infinite generator. – jfs Sep 11 '15 at 15:19
  • Not in Python 2, unfortunately (haven't looked at code in py3). For some work arounds, see [this SO answer](http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration). – Radim Sep 11 '15 at 15:27