4

So, I have a problem which I assume must be common:

I'd like to parallelize a script with a multiprocessing.Pool, handing inputs to the pool, having it process them in parallel, and receive the outputs in the parent process.

apply_async() looks like the best fit for what I want to do. But I can't just give a callback function, since in the end I want to print all the results to a single file. I think handing it a callback which prints to a single filehandle will result in jumbled results (not even sure I can pass a filehandle between processes like that).

So how's the best way to submit inputs to the Pool, then receive the outputs and handle them in the main process? At the moment I'm just collecting the AsyncResult objects in a list and periodically iterating through it, calling a .get() method on each.

Update

I'll clarify a few parameters of my problem in response to comments:

  1. @martineau and @Juggernaut: By not "jumbled" I mean I'd really like to preserve the order of the input so that the output is in the same order.

  2. @RolandSmith and @martineau: My main process is just reading inputs from a file, handing them to the pool, and printing the results. I could just call .apply(), but then the main process is waiting for the function to complete before it proceeds. I'm using multiprocessing to reap the benefits of parallelization and have many inputs processed simultaneously.

Nick S
  • 555
  • 4
  • 17
  • 2
    What is your script doing while the `Pool` is working? – Roland Smith Oct 19 '17 at 16:44
  • 2
    In other words, if your script has nothing else to do while waiting for the results, then you might as well just use `apply()` which blocks until the final result is ready. – martineau Oct 19 '17 at 16:57
  • 4
    See [**multiprocessing.Pool: When to use apply, apply_async or map?**](https://stackoverflow.com/questions/8533318/multiprocessing-pool-when-to-use-apply-apply-async-or-map). – martineau Oct 19 '17 at 17:02
  • What exactly do you mean by "jumbled results"? – martineau Oct 19 '17 at 17:17
  • 1
    @martineau and RolandSmith: thanks for your questions. I added some clarifications to the original question. – Nick S Oct 19 '17 at 17:33
  • It might be worth mentioning that `apply_async` can still do what OP wants and keep the order of input - the trick is to build a `queue-based` structure and you can put `(res, index)` into the queue. In the end, you only need to sort based on `index` once to get the wanted results, that said, you can leverage the non-blocking advantage of `apply_async`. – lqi May 02 '22 at 18:50

3 Answers3

2

To kind of answer your question, I don't think you can do what you want without a callback.

You want the results computed asynchronously, yet printed in the same order as the inputs. This implies not only having to wait until all of the inputs are processed before printing them, but also some way to know their relative position in the inputs so they can be sorted back into that order before outputting them.

So here's how to do it with one. As I previous said, the slightly tricky part is that the results must include something that indicates the corresponding position of the input so the results can be sorted in a similar order before printing. Because of that requirement, the script must wait until all the inputs have been processed.

Note that despite this, you are getting the benefits of parallel processing in the sense that the individual results themselves are being created by concurrent processes.

import multiprocessing as mp
from random import randint
from time import sleep

def my_func(*args):
    print('my_func:', args)
    index, x = args
    sleep(randint(1, 3))  # Take a varying amount of time to finish.
    return index, x*x  # Return result index and value.

if __name__ == '__main__':

    result_list = []

    def errorhandler(exc):
        print('Exception:', exc)

    def log_result(result):
        # This is called whenever my_func() returns a result.
        # result_list is modified only by the main process, not the pool workers.
        result_list.append(result)

    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(my_func, args=(i, i*2), callback=log_result,
                         error_callback=errorhandler)
    pool.close()
    pool.join()  # Wait for all subprocesses to finish.

    print('result_list:', result_list)
    sorted_results = [x[1] for x in sorted(result_list)]
    print('sorted results:', sorted_results)

Output:

my_func: (5, 10)
my_func: (1, 2)
my_func: (4, 8)
my_func: (7, 14)
my_func: (3, 6)
my_func: (9, 18)
my_func: (0, 0)
my_func: (6, 12)
my_func: (2, 4)
my_func: (8, 16)
result_list: [(2, 16), (3, 36), (5, 100), (1, 4), (4, 64), (7, 196), (9, 324), (0, 0), (6, 144), (8, 256)]
sorted results: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
martineau
  • 119,623
  • 25
  • 170
  • 301
  • Thanks, I was wondering if using indices to indicate order was the best idea. As for the async + ordered requirements, I'm actually getting that without waiting until the end by chunking the inputs (what I meant by "periodically iterating through" the results list). One of the main things I was wondering is whether there's any better way? Or maybe a tweak to that idea? – Nick S Oct 19 '17 at 20:35
  • 1
    If you used `Pool.map()` it would keep the results in order, so the passing of indices, unpacking of arguments, and sorting the results would no longer be necessary. It also has an optional `chunksize=` keyword argument that allows easily "chunking" of the inputs. This would give the equivalent results. Besides allowing the concurrent processing you want to happen, it also reduces the overhead needed to get them, so it ought to make slightly faster overall. – martineau Oct 19 '17 at 21:09
  • The are other approaches to doing this concurrently besides using a `Pool`—such as by using a `multiprocessing.Condition` variable. In their [documentation](https://docs.python.org/3/library/threading.html#condition-objects) they show an example of a generic producer-consumer situation. Something like that could also be used to do what you want. (No, the documentation link isn't a mistake—[`multiprocessing.Condition`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Condition) is just an alias for `threading.Condition`). – martineau Oct 19 '17 at 21:24
  • Yeah, after looking at `map_async()` again I think it's probably the best way to do this. The end result will be basically the same as what I'm currently doing, but with less management code. If you'd like to add a new answer using it, I'll accept that instead. Otherwise, maybe I'll write one. It'd be nice for someone with the same problem to find. – Nick S Oct 20 '17 at 19:56
2

As you requested in a comment, here's code showing how to do it with Pool.map() instead of Pool.async()—which seems like a much better fit for this problem given the need to wait for all the results before further output processing can be done (e.g. it needs to be in the same order as the input).

As you can see it requires substantially less code and doesn't require sorting the results before outputting them (so is probably faster, too).

import multiprocessing as mp
from random import randint
from time import sleep

def my_func(x):
    print('my_func:', x)
    sleep(randint(1, 3))  # Take a varying amount of time to finish.
    return x*x

if __name__ == '__main__':

    input_data = range(10)
    with mp.Pool(10) as pool:
        result_list = pool.map(my_func, input_data)  # Blocks until finished.

    print('result_list:', result_list)  # Will be in same order as input_data.

Output:

my_func: 3
my_func: 2
my_func: 1
my_func: 0
my_func: 8
my_func: 5
my_func: 7
my_func: 6
my_func: 4
my_func: 9
result_list: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
martineau
  • 119,623
  • 25
  • 170
  • 301
0

I think handing it a callback which prints to a single filehandle will result in jumbled results

The solution is to populate a Queue in your callback with the result and then fetch them later. Queues are thread-safe so you don't have to worry about the jumbled results you're talking about.

from queue import Queue
results = Queue()

def callback(result):
    results.put(result)

later

item = results.get()
Amin Etesamian
  • 3,363
  • 5
  • 27
  • 50
  • 1
    `Queue`s maybe thread-safe. but the results can still be "jumbled" in the sense of being out-of-order because of different subprocesses finishing at different times and basically adding their result to it in a random order. – martineau Oct 19 '17 at 16:51
  • @martineau Since we don't have the script and the OP mentions "printing to a single file", so the results from different processes are likely to be strings and in this case, you are right. What is your solution? I don't think asynchronous operation is a solution here! – Amin Etesamian Oct 19 '17 at 16:58
  • 1
    I agree that the asynchronous processing here probably doesn't need to use `apply_async()` to do it—and have mentioned it in a comment under the question. – martineau Oct 19 '17 at 17:04
  • @martineau I edited my answer right now. what about using `Lock`? – Amin Etesamian Oct 19 '17 at 17:05
  • 1
    Sorry, I really don't understand how using a `Lock` here would do anything that `Queue` wouldn't do automatically—or how it would ensure the results were in the proper order (if that even matters, the OP isn't clear about what they mean by "jumbled"). – martineau Oct 19 '17 at 17:10
  • You are right. Im confused by the meaning of `jumbled` too. – Amin Etesamian Oct 19 '17 at 17:12
  • I've tried using `multiprocessing.Pipe`s, and generally had a bad time. For some reason `.poll()` was unreliable, and the dealbreaker was exception handling, which was a nightmare. Is `Queue` likely to be different? – Nick S Oct 19 '17 at 17:45