7

I've got a problem that can already be solved by multiprocessing.Pool but the solution is not very optimal. Namely, what I have is have a rather small set of inputs each of which maps to a large dataset. While I can use imap_unordered with a function returning a list, this is far from efficient, because each of the large datasets must be returned as a list.

My function could return them as a generator for lower latency instead, but I cannot return a generator from a subprocess.

A dummy example:

import time
import multiprocessing


def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)

def wrapper(x):
    return list(generate(x))


with multiprocessing.Pool(10) as pool:
    for ready in pool.imap_unordered(wrapper, range(0, 100, 10)):
        for item in set(ready):  # to show that order does not matter:
            print(item)

The problem is that while the entire run now takes only 10th of the time of running sequentially, I still need to wait 10 seconds for the very first result, which could be available right away by:

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)

for ready in map(generate, range(0, 100, 10):
    for item in set(ready):
        print(item)

Which will print first item without delay, but takes 100 seconds to run.

What I cannot do is to subdivide the problem further, the generators in the subprocesses need to be evaluated lazily by the consumer:

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)


with multiprocessing.Pool(10) as pool:
    for item in pool.??flatmap_unordered??(generate, range(0, 100, 10)):
        print(item)

which would print the first item right away, yet takes only ~10 seconds to run!

How could I achieve that?

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • I will take a look at this, but if IRCC, an ancient toy of mine, "lelo", worked fine with out-of-process generators. https://bitbucket.org/jsbueno/lelo/src/master/lelo/_lelo.py - (it does not do the Pool, though - each call to the decorated function takes place in a subprocess - https://bitbucket.org/jsbueno/lelo/src/master/ ) – jsbueno Feb 13 '22 at 23:50
  • Can you elaborate on why you want to use `Pool` for this? Your complete example just takes around 80 lines of code with `multiprocessing.Process` and `multiprocessing.Queue` and you'd have an overall simpler and robust setup. Does your real setup rely on any `Pool`-feature you don't show us in the example? – Darkonaut Feb 14 '22 at 01:11
  • @Darkonaut well, I was hoping to to do this the "simple way" - a pattern that would rather take 8 than 80 lines using the standard library facilities. Considering that my use case is just a little more *complex* than those primitives available in `Pool`; it is counter-intuitive that the code that is *not* using it should simpler and robust... what's the sweet spot of `Pool` then anyway? – Antti Haapala -- Слава Україні Feb 14 '22 at 18:40
  • Well it's actually "just" 60 lines...Without using `Pool` you use the same primitives as `Pool` does under the hood, but you can spare all the code `Pool` will setup and _run_ whether you really need it or not, so that's how it is overall simpler. Your need here, where you already want _partial_ results isn't the usual use-case where you only need the end-result of a _whole_ task. ... – Darkonaut Feb 14 '22 at 19:10
  • ...The plumbing necessary for setting up a generator and getting intermediate results yielded drop by drop just doesn't fit to the shared infrastructure set in place for all the provided pool-methods. I know this isn't exactly obvious if you haven't spent some time with the source, though. – Darkonaut Feb 14 '22 at 19:10

6 Answers6

2

There seems to be no builtin way for a Pool to incrementally collect generated items. However, it is reasonably straightforward to write your own "flat map" helper.

The general idea is to have a wrapper in the pool processes that runs the iterator and pushes each individual item to a queue. In the main process, there just is a plain loop that gets and yields each item.

import functools
import multiprocessing


def flatmap(pool: multiprocessing.Pool, func, iterable, chunksize=None):
    """A flattening, unordered equivalent of Pool.map()"""
    # use a queue to stream individual results from processes
    queue = multiprocessing.Manager().Queue()
    # reuse task management and mapping of Pool
    pool.map_async(
        functools.partial(_flat_mappper, queue, func),
        iterable,
        chunksize,
        # callback: push a signal that everything is done
        lambda _: queue.put(None),
        lambda err: queue.put((None, err))
    )
    # yield each result as it becomes available
    while True:
        item = queue.get()
        if item is None:
            break
        result, err = item
        if err is None:
            yield result
        else:
            raise err


def _flat_mappper(queue: multiprocessing.Queue, func, *args):
    """Helper to run `(*args) -> iter` and stream results to a queue"""
    for item in func(*args):
        queue.put((item, None))

If desired, one could patch the Pool type itself to have flatmap as a method instead of a function.


The flatmap helper can be directly used to accumulate results across the generators. For the example case, it finishes in a bit more than 10 seconds.

import time

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)


if __name__ == "__main__":
    with multiprocessing.Pool(10) as pool:
        for item in flatmap(pool, generate, range(0, 100, 10)):
            print(item)
MisterMiyagi
  • 44,374
  • 10
  • 104
  • 119
  • after adding imports + replacing random.randbytes with secrets.token_bytes for Python 3.8, running it it prints the *first* iteration and then halts. – Antti Haapala -- Слава Україні Feb 07 '22 at 13:54
  • @AnttiHaapala Thanks for the ``secrets`` suggestion. I've copy/pasted the code as for my local setup where it's working on Python 3.9.9. I'll have to try with some other versions... – MisterMiyagi Feb 07 '22 at 14:02
  • @AnttiHaapala I've now run the code with CPython 3.8.12, 3.9.9 and 3.10.1 and PyPy 3.7.10 (7.3.5) on my local system (MacOS 10.14.6) and some online interpreters, but cannot reproduce what you describe. What system are you running on? Can you try your setup on some online interpreter? – MisterMiyagi Feb 07 '22 at 14:45
  • 1
    Actually the `sleep` never returns, so something *very no no* happens with the queue – Antti Haapala -- Слава Україні Feb 07 '22 at 21:07
  • @Darkonaut I am not running an IDE. – Antti Haapala -- Слава Україні Feb 08 '22 at 05:49
  • " I've now run the code ... on MacOS¨ - the fundamental multiprocessing difference of Linux and MacOS is thatthe former uses Fork by default, while on MacOS it is "spawn" by default. The error on Anti's side is certainly related to that. There is probably a way to create the "Manager()" that would work with both methods. – jsbueno Feb 14 '22 at 00:03
  • @jsbueno I've also tried with "fork" and "forkserver" on my system (the code does not hit the cases that require spawn). I *guess* the online interpreters are Linux, but I don't know for sure and all Linux'es I have access to run an ancient 3.6. Would appreciate if someone could test it with a setup closer to Antti's. – MisterMiyagi Feb 14 '22 at 06:19
  • 1
    Fedora 35, with a Python 3.10 here - it hangs just like Antti describes it. – jsbueno Feb 14 '22 at 12:51
  • 1
    EEEK. I found out why it "hangs" - Antti: if you copyied and pasted the snippets from here, just like I did, it was missing `import time`. You mentioned adding imports, but did you include it? With it in place, the example here worked on my setup as well. Otherwise, a nice debugging hint is to add an error callback to the map_async (it is there on my version of the snippet) – jsbueno Feb 14 '22 at 13:44
  • 1
    @jsbueno Thanks for spotting the missing import, looks like I didn't properly copy from my code file. – MisterMiyagi Feb 14 '22 at 20:07
2

This is one instance where I believe the simplest approach would be to implement your own multiprocessing pool from daemon processes and multiprocessing.Queue instances (which are more performant than the managed queues returned by calls to multiprocessing.Manager().Queue():

import time
import multiprocessing


# We just need something distinct from a value
# generated by generate. In this case nothing fancy is required:
SENTINEL = None

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)

def worker(in_q, out_q):
    while True:
        x = in_q.get()
        for j in generate(x):
            out_q.put(j)
        # put sentinel to queue to show task is complete:
        out_q.put(SENTINEL)


# In case we are running under Windows:
if __name__ == '__main__':
    POOL_SIZE = 10
    in_q = multiprocessing.Queue()
    out_q = multiprocessing.Queue()
    for _ in range(POOL_SIZE):
        multiprocessing.Process(target=worker, args=(in_q, out_q), daemon=True).start()
    t = time.time()
    # Submit the 10 tasks:
    for x in range(0, 100, 10):
        in_q.put(x)
    # We have submitted 10 tasks, so when we have seen 10
    # sentinels, we know we have processed all the results
    sentinels_seen = 0
    results = []
    while sentinels_seen < 10:
        return_value = out_q.get()
        if return_value is SENTINEL:
            sentinels_seen += 1
        else:
            # Process return value:
            results.append(return_value)
            if len(results) == 1:
                # First result:
                print('Elapsed time to first result:', time.time() - t)
    print('Total elapsed time:', time.time() - t)

Prints:

Elapsed time to first result: 0.004938602447509766
Total elapsed time: 10.025248765945435
Booboo
  • 38,656
  • 3
  • 37
  • 60
2

I dislike the idea of bypassing Pool's internal queues by overlaying it with external queues. IMO it leads to much more moving parts and unnecessary complexity and you easily end up creating hard to detect race-conditions. Pool alone is fairly complex under the hood and bloating the amount of code that runs, by even more piping on top is something I'd rather seek to avoid (KISS). It's using Pool only for it's side-effect of managing worker processes and if at all, I'd only consider it for one-off code, not for a system build for stability or possibly evolving needs.

To give you some comparison for the complexity argument...Pool employs three worker-threads only to manage workers and funnel data back and forth. Together with the main-thread this makes four threads in the parent-process. The Non-Pool version provided below on the other hand is bi-threaded (multiprocessing.Queue starts a feeder-thread upon first usage of .put()) . The ~60 lines of the non-Pool solution compare to ~900 lines of multiprocessing.pool.py alone. A good portion of the latter will run anyway, only to shuffel around Nones instead of your actual results.

Pool is great for the frequent use case of processing function-tasks as a whole, retrieving sub-task results from generators just does not fit the bill here.


Using multiprocessing.Pool

Now if you are determined to go ahead with this approach anyway, at least don't use Manager for it. There's hardly any reason to reach for Manager on a single node and the only need I can think of you really need to use manager-queues for, is when you have to send a queue-reference to an already up and running process. Since Pool's initializer() allows you to pass arguments already at worker-process-startup, manager-queues also are not necessary.

You have to be aware, that every interaction between parent- and child-process through Manager-proxies results in a detour through an extra manager-process, increasing latency by more IPC, context-switches and cache-flushes. It also has the potential to result in a considerably increased memory-footprint.

import time
import multiprocessing as mp

# def generate(x): ... # take from question

def init_queue(queue):
    globals()['queue'] = queue


def wrapper(x):
    q = queue
    for item in generate(x):
        q.put(item)


if __name__ == '__main__':


    POISON = 'POISON'

    queue = mp.SimpleQueue()

    with mp.Pool(processes=4, initializer=init_queue, initargs=(queue,)) as pool:
        pool.map_async(
            func=wrapper,
            iterable=range(0, 100, 10),
            chunksize=1,
            callback=lambda _: queue.put(POISON)
        )
        for res in iter(queue.get, POISON):
            print(res)

Using multiprocessing.Process

Now the alternative I would prefer over using Pool in this case, is building your own little specialized pool, with multiprocessing.Process and some multiprocessing-queue. Yes, it's a bit more code to write, but the amount of code that actually runs is considerably reduced compared to any solution involving multiprocessing.Pool. This leaves less room for subtle bugs, plus it's more open to changing conditions and uses less system resources.

import time
import multiprocessing as mp
from itertools import chain

DONE = 'DONE'
POISON = 'POISON'


def _worker(func, inqueue, outqueue):
    for chunk in iter(inqueue.get, POISON):
        for res in func(chunk):
            outqueue.put(res)
        outqueue.put(DONE)
    outqueue.put(POISON)


def _init_pool(n_workers, func):
    """Initialize worker-processes and queues."""
    inqueue, outqueue = mp.Queue(), mp.Queue()

    pool = [
        mp.Process(target=_worker, args=(func, inqueue, outqueue))
        for _ in range(n_workers)
    ]
    for p in pool:
        p.start()

    return pool, inqueue, outqueue


def iflatmap(n_workers, func, iterable):
    """Yield results from subprocesses unordered and immediately."""
    iterable = chain(iterable, [POISON] * n_workers)
    pool, inqueue, outqueue = _init_pool(n_workers, func)

    for _ in pool:
        inqueue.put(next(iterable))

    while n_workers:
        res = outqueue.get()
        if res == DONE:  # there's a free worker now
            inqueue.put(next(iterable))
        elif res == POISON:  # a worker has shut down
            n_workers -= 1
        else:
            yield res

    for p in pool:
        p.join()

The example here is using only four workers to show this solution (also) doesn't rely on having the same number of workers and tasks. Since it also doesn't need to know the length of the input-iterable for its control-flow, this allows for providing a generator of unknown length as iterable. If you prefer it classy, you can wrap the logic above in a "Pool"-class instead.

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)


if __name__ == '__main__':

    for res in iflatmap(n_workers=4, func=generate, iterable=range(0, 100, 10)):
        print(res)

For people unfamiliar with the usage of iter(object, sentinel): docs and some reasoning here

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
1

MisterMiyagi's answer may be stalling on linux due to the way "Queue" is serialized when inside a "partial" object. (I can't figure out why).

This is the same code, with a hack to pass the queue around to the mapper on the parameter side, and not embedded in the callable object.

Brain feeling in slow motion today, sorry I can't figure out exactly what is taking place - anyway, this variant of the code worked here:

import functools
import multiprocessing
import secrets
from itertools import repeat
import time


def flatmap(pool: multiprocessing.Pool, func, iterable, chunksize=None):
    """A flattening, unordered equivalent of Pool.map()"""
    # use a queue to stream individual results from processes
    queue = multiprocessing.Manager().Queue()
    sentinel = secrets.token_bytes()
    # reuse task management and mapping of Pool
    pool.map_async(
        functools.partial(_flat_mappper, func),
        zip(iterable,repeat(queue)),
        chunksize,
        # callback: push a signal that everything is done
        lambda _: queue.put(sentinel),
        lambda *e: print("argh!", e),
    )
    # yield each result as it becomes available
    while True:
        item = queue.get()
        if item == sentinel:
            break
        yield item


def _flat_mappper(func, arg):
    """Helper to run `(*args) -> iter` and stream results to a queue"""
    data, queue = arg
    for item in func(data):
        queue.put(item)

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)


if __name__ == "__main__":
    with multiprocessing.Pool(10) as pool:
        for item in flatmap(pool, generate, range(0, 100, 10)):
            print(item)
jsbueno
  • 99,910
  • 10
  • 151
  • 209
1

As the OP pointed out, objects passed between Processes must be pickleable. The stated goal is to process data objects as soon as they become available. Therefore the secondary Processes must pass the objects back to the main Process as soon as possible. This establishes constraints on which tools can be used.

The secondary Processes can use a multiprocessing.Queue to transmit the individual objects back to the main Process. The worker functions in the secondary Processes do not need to return anything. Some of the machinery in the standard library methods is designed to collect and manage returned values; it will not be useful. However, it is still necessary to initialize the secondary Processes and submit tasks to them. It is also handy to manage the program flow using the standard library context managers.

The methods of multiprocessing.Pool will work, but I prefer a solution using ProcessPoolExecutor. My impl1() below uses Pool, and impl2() uses the Executor. Both solutions are short and, I think, easy to understand and generalize. In both cases, a multiprocessing.Queue object must be initialized and passed to the secondary Process in its initialization function.

Since the order in which the data is returned to the main Process is not deterministic, it is necessary to have some way of sorting out which data came from which task. I use a simple integer indexing scheme and collect all the results into a single dictionary. This is managed in a secondary thread, while the main thread waits for all the submitted tasks to finish (the ProcessPoolExecutor context will not exit until all the submitted tasks are done). When that happens, I post a sentinel object to the queue to shut it down. I join the secondary thread to wait for the all the objects to be processed. At this point the dictionary containing the collected data items is complete.

The code is multithreaded but the structure is pretty simple. The functions of the main Process are well separated into setup, submission of tasks, and data handling. There's no worry about race conditions.

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import threading
from collections import defaultdict

# Process code
_q = None
def initialize(q):
    global _q  # pylint: disable=global-statement
    _q = q

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)

def wrapper(x):
    for j in generate(x):
        _q.put((x, j))

# collection thread
def collect(q, results):
    for a, b in iter(q.get, (None, None)):
        print(a, b)
        results[a].append(b)
        
def impl1(q):
    with multiprocessing.Pool(10, initializer=initialize,
                              initargs=(q,)) as pool:
        for _ in pool.imap_unordered(wrapper, range(0, 100, 10)):
            pass
        
def impl2(q):
    with ProcessPoolExecutor(10, initializer=initialize, initargs=(q,)) as ex:
        for n in range(0, 100, 10):
            ex.submit(wrapper, n)
        
def main():
    results = defaultdict(list)
    q = multiprocessing.Queue()
    thr = threading.Thread(target=collect, args=(q, results))
    thr.start()

    # impl1(q)
    impl2(q)
    q.put((None, None))
    thr.join()
    print(results)

if __name__ == "__main__":
    main()
Paul Cornelius
  • 9,245
  • 1
  • 15
  • 24
0

You could probably use multiprocessing.pipes and send the result over the connections to get the first item(s) immediately -

import time
import multiprocessing

def generate_and_send(x, conn):
    for j in range(x, x + 10):
        conn.send(j)
        time.sleep(1)
    conn.send("POISON")

print(f'program begins at {time.ctime()}')
pipes = [multiprocessing.Pipe() for _ in range(10)]
child_conns = [y for (x, y) in pipes]
parent_conns = [x for (x, y) in pipes]
processes = [multiprocessing.Process(target=generate_and_send, args=(*el,)) for
        el in zip(range(0, 100, 10), child_conns)]

for p in processes:
    p.start()

first_printed = False
while True:
    if parent_conns:
        for par_conn in parent_conns:
            recvd_val = par_conn.recv()
            if recvd_val == "POISON":
                par_conn.close()
                parent_conns.remove(par_conn)
            else:
                if not first_printed:
                    print(f'first item printed at {time.ctime()}')
                    first_printed = True
                print(recvd_val)
    else:
        break
print(f'program ends at {time.ctime()}')
Mortz
  • 4,654
  • 1
  • 19
  • 35
  • While the example used even-sized sleeps, in reality the processing times are uneven and `par_conn.recv()` reads one process at a time – Antti Haapala -- Слава Україні Feb 20 '22 at 09:48
  • But this approach can still save time if the "slowest first item" is generated faster than the "fastest full list" - what I mean is if the first item on the first connection takes, say, 3 seconds and the last connection takes 4 seconds to generate the full list - you still save ~1 seconds, right? – Mortz Feb 20 '22 at 13:37
  • Of course, there is probably no way to know that beforehand, but if we know the times on average, then we can probably make each generator "smaller" or "larger" - as in make it generate, say, 15 items instead of 10 – Mortz Feb 20 '22 at 13:40