0

I wrote the following code, which doesn't seem to distribute tasks in the way I would expect:

import multiprocessing
from time import sleep
import os

def multi_test(time):
    print(f'{os.getpid()}: {time}s')
    sleep(time)

pool = multiprocessing.Pool(2)
pool.map(multi_test, [1,1,1,1,4,1,1,1,1])
pool.close()
pool.join()

The output shows the processor ID and time:

68858: 1s
68857: 1s
68858: 1s
68857: 1s
68857: 4s
68858: 1s
68858: 1s
68858: 1s # 2 second delay after this line, because the other core is doing the next task (WHY?)
68857: 1s

The code takes 7 seconds to run. I would have expected it to take 6 seconds, because the last four 1s tasks could be done at the same time as one 4s task, but python has given the last task to the same processor doing the 4s task.

       <1><2><3><4><5><6><7>
68858: 1s 1s 1s 1s 1s 
68857: 1s 1s <----4s---> 1s

Why is this happening and what can I do to change it?

If I take away two 1s tasks from the beginning that happen in parallel (i.e., [1,1,4,1,1,1,1]), then the result is as I would expect:

       <1><2><3><4><5>
68858: 1s 1s 1s 1s 1s 
68857: 1s <----4s--->

1 Answers1

1

It seems that the map function splits the input data into chunks and the chunksize will be in your case 2. Your entries are then not processed in the order you would expect. The little rewrite of your code help on visualizing that because the enumerate provided the index to the worker to show which entries are processed. The pid (process) is reused and does not help on that.

Also the chunksize changes - in your case - the timing as you mentioned. With chunksize=1 (see code) it is 6 seconds and without setting chunksize (internally calculated to 2) it is 7 seconds.

Why there is that logic I unfortunately cannot explain.

from multiprocessing import Pool
from time import sleep
from datetime import datetime
import os

def worker(data):
    pid = os.getpid()
    idx, time = data
    print(f'started  :: {datetime.now()} idx={idx} time={time} pid={pid}')
    sleep(time)
    print(f'finished :: {datetime.now()} idx={idx} time={time} pid={pid}')

if __name__ == '__main__':
    with Pool(2) as pool:
        pool.map(worker, enumerate([1,1,1,1,4,1,1,1,1]), chunksize=1)

For completeness I added the part I analyzed in pool.py:

        if chunksize is None:
            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
            if extra:
                chunksize += 1

See also: "chunksize" parameter in multiprocessing.Pool.map

  • Thank you very much. I've accepted your answer, but unfortunately I can't upvote it, because I don't have enough reputation. – Matthias Schmitz May 09 '23 at 18:31
  • The chunking is there because callers often have a relatively cheap function to call over a very large number of items. Dispatching then in groups is to amortise the IO costs. E.g. try `pool.map(int, range(10_000), chunksize=1)` and with the default `chunksize=None`. – Sam Mason May 09 '23 at 20:39
  • @MatthiasSchmitz instead of `map`, it may be better to just use `.apply_async` here – juanpa.arrivillaga May 09 '23 at 21:57
  • @SamMason Okay, so a higher chunksize is good for many cheap functions, but in this case I have a small number of more expensive functions, so a chunksize of 1 is better. Thanks! – Matthias Schmitz May 11 '23 at 14:10