61

I have written a little script to distribute workload between 4 threads and to test whether the results stay ordered (in respect to the order of the input):

from multiprocessing import Pool
import numpy as np
import time
import random


rows = 16
columns = 1000000

vals = np.arange(rows * columns, dtype=np.int32).reshape(rows, columns)

def worker(arr):
    time.sleep(random.random())        # let the process sleep a random
    for idx in np.ndindex(arr.shape):  # amount of time to ensure that
        arr[idx] += 1                  # the processes finish at different
                                       # time steps
    return arr

# create the threadpool
with Pool(4) as p:
    # schedule one map/worker for each row in the original data
    q = p.map(worker, [row for row in vals])

for idx, row in enumerate(q):
    print("[{:0>2}]: {: >8} - {: >8}".format(idx, row[0], row[-1]))

For me this always results in:

[00]:        1 -  1000000
[01]:  1000001 -  2000000
[02]:  2000001 -  3000000
[03]:  3000001 -  4000000
[04]:  4000001 -  5000000
[05]:  5000001 -  6000000
[06]:  6000001 -  7000000
[07]:  7000001 -  8000000
[08]:  8000001 -  9000000
[09]:  9000001 - 10000000
[10]: 10000001 - 11000000
[11]: 11000001 - 12000000
[12]: 12000001 - 13000000
[13]: 13000001 - 14000000
[14]: 14000001 - 15000000
[15]: 15000001 - 16000000

Question: So, does Pool really keep the original input's order when storing the results of each map function in q?

Sidenote: I am asking this, because I need an easy way to parallelize work over several workers. In some cases the ordering is irrelevant. However, there are some cases where the results (like in q) have to be returned in the original order, because I'm using an additional reduce function that relies on ordered data.

Performance: On my machine this operation is about 4 times faster (as expected, since I have 4 cores) than normal execution on a single process. Additionally, all 4 cores are at 100% usage during the runtime.

daniel451
  • 10,626
  • 19
  • 67
  • 125

3 Answers3

92

Pool.map results are ordered. If you need order, great; if you don't, Pool.imap_unordered may be a useful optimization.

Note that while the order in which you receive the results from Pool.map is fixed, the order in which they are computed is arbitrary.

user2357112
  • 260,549
  • 28
  • 431
  • 505
  • 5
    Why is `imap_unordered` 'an optimization'? Is it faster? – daniel451 Dec 22 '16 at 00:20
  • 10
    @ascenator: If later outputs are ready first, you can use them without waiting for earlier outputs to finish. – user2357112 Dec 22 '16 at 00:22
  • 1
    Is starmap also order then? – Victor 'Chris' Cabral Sep 18 '19 at 21:14
  • 4
    @Victor'Chris'Cabral: Yes, `Pool.starmap` results are ordered. – user2357112 Sep 18 '19 at 21:27
  • 1
    @users235... where can I read this up? Couldn't see this in the multiprocessing documentation for starmap. Thx – whiletrue Sep 16 '20 at 14:56
  • @Fábio: The starmap docs say "Like map() except..." and don't say anything about the order handling being different. – user2357112 Sep 16 '20 at 14:59
  • @user2357112supportsMonica I don't quite get the explanation. Aren't the computations of the individual elements of map independent of each other? Why would they need to wait or synchronize in such a case? – spurra Apr 05 '22 at 14:56
  • @spurra: Regardless of what order the results are *computed* in, `Pool.map` will *give* you the results in input order. This does not impose any synchronization constraints on the computation itself. – user2357112 Apr 05 '22 at 15:22
  • @user2357112supportsMonica I get that. But why is `imap_unordered` then an optimization? The explanation you provided: "If later outputs are ready first, you can use them without waiting for earlier outputs to finish." does not make sense to me if the results are computed asynchronously with Pool.map. – spurra Apr 05 '22 at 16:07
  • @spurra: With `Pool.map`, if computation for a later input finishes early, you can't *do anything* with the result until all earlier inputs finish computation. `Pool.imap_unordered` removes that constraint, at the expense of losing input order. – user2357112 Apr 05 '22 at 16:42
  • @user2357112supportsMonica I still don't understand. If `task[i+1]` finishes before `task[i]` why do I have to wait? They do not depend on each other. I can just store the result of `task[i+1]` into `result[i+1]` where `result` is a pre-initialized list. – spurra Apr 05 '22 at 17:10
  • @user2357112supportsMonica ah wait I think I get it. Are you saying that if `task[i]` is a lot slower than `task[i+1]`, I would have access to `task[i+1]` a lot earlier for `imap_unordered` than `map`, like when iterating through the results? I was thinking before in terms of a blocking operation, i.e the time it takes until all tasks are done. – spurra Apr 05 '22 at 17:15
20

The documentation bills it as a "parallel equivalent of the map() built-in function". Since map is guaranteed to preserve order, multiprocessing.Pool.map makes that guarantee too.

mgilson
  • 300,191
  • 65
  • 633
  • 696
  • does keeping the order cost reasonable amounts of computation time? – daniel451 Dec 22 '16 at 00:19
  • 3
    I would doubt it. The advantage to `imap_unordered` (as reported by @user2357112) is that you can immediately start processing results as they become available in the main process. With `Pool.map`, you need to wait until all n-1 items to have results ready before you can start processing the n'th item. – mgilson Dec 22 '16 at 00:22
2

Note that while the results are ordered, the execution isn't necessarily ordered.

From the documentation:

map(func, iterable[, chunksize])

A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

In my experience, it often chunks the list into pairs, so that items #1 & #2 go to the first process/thread, #3 & #4 to the second, and so on. In this example, the order would be [#1, #3, #2, #4] -- but this can vary depending on the number and duration of each process/thread (for example, if #1 is a very long process, #2 could be delayed enough to be the very last process to run).

Obviously, if the order of execution matters to you (like it does for us -- more on this below), then this is highly undesirable.

Fortunately, there is a fairly simple solution: just set the chunksize to 1!

pool.map(func, my_list, 1)

The documentation states this parameter specifies an approximate chunksize, but in my experience, setting it to 1 works: it feeds the items to the pool one by one, rather than in chunks.


Edit: Our use case may not be very standard, so let me provide some details:

  • We have to process large numbers of fast & slow jobs in parallel (the degree of parallelism depends on the number of nodes, and cores per nodes).
  • We use a multiprocessing thread pool to kick off those jobs (in a separate process), and wait for them to complete (using ThreadPool.map), before doing other things.
  • These jobs can take minutes or hours to complete (it's not just some basic calculations).
  • These workflows happen all the time (usually daily or hourly).
  • The order of execution matters mostly in terms of compute time efficiency (which equals money, in the cloud). We want the slowest jobs to run first, while the faster jobs complete with the leftover parallelism at the end. It's like filling a suitcase -- if you start with all the small items, you're going to have a bad time.

Here's an example: let's say we have 20 jobs to run on 4 threads/processes -- the first two each take ~2 hours to run, and the other ones take a few minutes. Here are the two alternative scenarios:

With chunking (default behavior):

#1 & #2 will be chunked into the same thread/process (and hence run sequentially), while the other ones will be executed in similarly chunked order. All the other threads/processes will be idle while #2 completes. Total runtime: ~4 hours.

Without chunking (setting chunksize = 1):

#1 & #2 will not be chunked into the same thread/process, and hence run in parallel. The other ones will be executed in order as threads/processes become available. Total runtime: ~2 hours.

When you're paying for compute in the cloud, this makes a huge difference -- especially as the hourly & daily runs add up to monthly & yearly bills.

Marco Roy
  • 4,004
  • 7
  • 34
  • 50
  • If you need sequential execution, don't use a pool at all. Just apply your function sequentially in a single process and save yourself a bunch of bugs and overhead. – user2357112 May 17 '22 at 17:07
  • Setting `chunksize` to 1 doesn't guarantee execution order. – user2357112 May 17 '22 at 17:08
  • 1
    Not all use cases are "parallel execution in any order" or "strictly sequential execution". Sometimes, you want limited parallelism with a little bit of control -- for example, running jobs/tasks in separate threads/processes, but with a certain order, to improve the efficiency (i.e. slowest ones first, fastest ones last). If the order is random and the slowest one goes last, then you're wasting compute/money with idle threads (if you're on the cloud). It's similar to the Travelling salesman problem. `chunksize = 1` may not guarantee execution order, but it definitely helps a lot. – Marco Roy May 18 '22 at 01:28
  • 1
    @MarcoRoy ThankYou! same problem, applied solution, same results. :) – Rex Apr 05 '23 at 02:52