4

I've got a program using the multiprocessing library to compute some stuff. There are about 10K inputs to compute, each of them taking between 0.2 second and 10 seconds.

My current approach uses a Pool:

# Inputs
signals = [list(s) for s in itertools.combinations_with_replacement(possible_inputs, 3)]

# Compute
with mp.Pool(processes = N) as p:
    p.starmap(compute_solutions, [(s, t0, tf, folder) for s in signals])
    print ("    | Computation done.")

I've noticed that on the 300 / 400 last inputs to check, the program became a lot slower. My question is: how does the Pool and the starmap() behave?

Fro my observation, I believe that if I got 10K inputs and N = 4 (4 processes), then the 2 500 first inputs are assigned to the first process, the 2 500 next to the second, ... and each process treats its inputs in a serial fashion. Which means that if some processes have cleared the Queue before others, they do not get new tasks to perform.

Is this correct?

If this is correct, how can I have a smarter system which could be represented with this pseudo-code:

workers = Initialize N workers
tasks = A list of the tasks to perform

for task in tasks:
    if a worker is free:
        submit task to this worker
    else:
        wait

Thanks for the help :)

N.B: What is the difference between the different map function. I believe map(), imap_unordered(), imap, starmap exists.

What are the differences between them and when should we use one or the other?

Mathieu
  • 5,410
  • 6
  • 28
  • 55

1 Answers1

6

Which means that if some processes have cleared the Queue before others, they do not get new tasks to perform.

Is this correct?

No. The main purpose of multiprocess.Pool() is to spread the passed workload to the pool of its workers - that's why it comes with all those mapping options - the only difference between its various methods is on how the workload is actually distributed and how the resulting returns are collected.

In your case, the iterable you're generating with [(s, t0, tf, folder) for s in signals] will have each of its elements (which ultimately depends on the signals size) sent to the next free worker (invoked as compute_solutions(s, t0, tf, folder)) in the pool, one at a time (or more if chunksize parameter is passed), until the whole iterable is exhausted. You do not get to control which worker executes which part, tho.

The workload may also not be evenly spread - one worker may process more entries than another in dependence of resource usage, execution speed, various internal events...

However, using map, imap and starmap methods of multiprocessing.Pool you get the illusion of even and orderly spread as they internally synchronize the returns from each of the workers to match the source iterable (i.e. the first element of the result will contain the resulting return from the called function with the first element of the iterable). You can try the async/unordered versions of these methods if you want to see what actually happens underneath.

Therefore, you get the smarter system by default, but you can always use multiprocessing.Pool.apply_async() if you want a full control over your pool of workers.

As a side note, if you're looking on optimizing the access to your iterable itself (as the pool map options will consume a large part of it) you can check this answer.

Finally,

What are the differences between them and when should we use one or the other?

Instead of me quoting here, head over to the official docs as there is quite a good explanation of a difference between those.

Community
  • 1
  • 1
zwer
  • 24,943
  • 3
  • 48
  • 66
  • Thanks, I'll have a more detailed look to it tomorrow. – Mathieu May 16 '18 at 15:18
  • 1
    To add on this answer, I am not entirely sure it is correct. What I did observe is that if I use the code above with a Pool of e.g. 2 workers, and that if the jobs worker 1 receive are much shorter than the jobs worker 2 receive, then worker 1 will finish long time before worker 2, and thus even if the jobs not yet started by worker 2 could be transferred to worker 1. In my case, each job resulted in a file generated. Sometimes, I had to wait 10/12 more hours for the last workers to finish their jobs (and a job doesn't last more than 15/20 mns). I solved it with the maxtaskperchild argument. – Mathieu Apr 08 '20 at 11:06
  • 1
    @Mathieu, you are correct that it pre-assigns subsets of tasks to each processor. I've checked this a dozen different ways. A single task that takes a long time will delay other tasks that are behind it in the queue, even if all the tasks assigned to other processors have already completed. maxtasksperchild=1 seems to work. This spawns a new process for each task, rather than pre-assigning tasks and leaving processes open the entire time. – Tom Dec 08 '22 at 02:59
  • Exactly, but spawning a new process isn't a cheap operation, adapting this parameter to your tasks is important as well. – Mathieu Dec 08 '22 at 08:31