2

First look at the following code:

pool = multiprocessing.Pool(processes=N)
batch = []
for item in generator():
    batch.append(item)
    if len(batch) == 10:
        pool.apply_async(my_fun, args=(batch,))
        batch = []
# leftovers
pool.apply_async(my_fun, args=(batch,))

Essentially I'm retrieving data from a generator, collecting in into a list and then spawning a process that consumes the batch of data.

This may look fine but when the consumers (aka the pool processes) are slower than the producer (aka the generator) memory usage of the main process grows until the generator stops or... the system runs out of memory.

How can I avoid this problem?

martineau
  • 119,623
  • 25
  • 170
  • 301
Manuel
  • 270
  • 3
  • 11
  • Have you tried to build a list of lists and use `pool.map_async()`? or maybe `starmap_async`?? – wwii Jun 27 '18 at 20:09
  • see similar question https://stackoverflow.com/questions/17241663/filling-a-queue-and-managing-multiprocessing-in-python on how to use a queue with a process pool. – Michael Doubez Jun 27 '18 at 21:12
  • `apply_async` returns an `AsyncResult` object, I don't see that you are using it anywhere. – wwii Jun 28 '18 at 03:04

2 Answers2

3

You might want to use a limited-size queue in this case.

q = multiprocessing.Queue(maxSize).

When used with max. size, this will provide you with the necessary counting and block the thread that is calling q.put() when it is full, so you could never post more than a certain number of work items on it and thus limit the memory needed to store the pending items.

Alternatively, you could use a counting semaphore (e.g., multiprocessing.BoundedSemaphore(maxSize)). Acquire it each time you get a work item from the generator and release it in your work function (my_fun) once the item is processed. This way, the maximum number of work items waiting to be processed will never exceed the initial value of the semaphore.

Leo K
  • 5,189
  • 3
  • 12
  • 27
2

Use the grouper itertools recipe to chunk the data from your generator.

Use the infrastructure in concurrent futures to handle task submission and retrieval with the processes.

You could

  • submit a group of tasks; wait for them to finish; then submit another group, or
  • keep the pipeline full by submitting a new task each time one completes.

Setup (attempt to simulate your process):

import concurrent.futures
import itertools, time, collections, random
from pprint import pprint

# from itertools recipes
def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fillvalue)

# generator/iterator facsimile
class G:
    '''Long-winded range(n)'''
    def __init__(self, n=108):
        self.n = n
        self.a = []
    def __iter__(self):
        return self
    def __next__(self):
        #self.a.append(time.perf_counter())
        if self.n < 0:
            raise StopIteration
        x = self.n
        self.n -= 1
        return x

def my_func(*args):
    time.sleep(random.randint(1,10))
    return sum(*args)

Wait for groups of tasks to complete

if __name__ == '__main__':
    nworkers = 4
    g = G()
    # generate data three-at-a-time
    data = grouper(g, 3, 0)
    results = []
    fs = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
        for args in data:
            print(f'pending:{len(executor._pending_work_items)}')
            # block submission - limit pending tasks to conserve resources (memory) 
            if len(executor._pending_work_items) == nworkers:
                # wait till all complete and get the results
                futures = concurrent.futures.wait(fs, return_when=concurrent.futures.ALL_COMPLETED)
                #print(futures)
                results.extend(future.result() for future in futures.done)
                fs = list(futures.not_done)
            # add a new task
            fs.append(executor.submit(my_func, args))
        # data exhausted - get leftover results as they finish
        for future in concurrent.futures.as_completed(fs):
            print(f'pending:{len(executor._pending_work_items)}')
            result = future.result()
            results.append(result)

    pprint(results)

Keep the process pool full.

if __name__ == '__main__':
    nworkers = 4
    g = G()
    # generate data three-at-a-time
    data = grouper(g, 3, 0)
    results = []
    fs = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
        for args in data:
            print(f'pending:{len(executor._pending_work_items)}')
            # block submission - limit pending tasks to conserve resources (memory) 
            if len(executor._pending_work_items) == nworkers:
                # wait till one completes and get the result
                futures = concurrent.futures.wait(fs, return_when=concurrent.futures.FIRST_COMPLETED)
                #print(futures)
                results.extend(future.result() for future in futures.done)
                fs = list(futures.not_done)
            # add a new task
            fs.append(executor.submit(my_func, args))
        # data exhausted - get leftover results as they finish
        for future in concurrent.futures.as_completed(fs):
            print(f'pending:{len(executor._pending_work_items)}')
            result = future.result()
            results.append(result)

    pprint(results)
wwii
  • 23,232
  • 7
  • 37
  • 77
  • 1
    Well, the main problem is that the data does not fit into memory (hence the use of a generator) so I can't see how this avoids that. – Manuel Jun 28 '18 at 07:37
  • 1
    I agree with @Manuel, this does not solve the memory issue (at least theoretically). – diningphil Jun 28 '18 at 08:21