23

I am doing some calculations on large collections of bytes. The process runs on chunks of bytes. I am trying to use parallel processing using multiprocessing for performance enhancement. Initially I tried to use pool.map but that only allows single argument, then I found about pool.starmap. But pool.starmap gives results only when all the processes have finished. I want results as they come (sort of). I am trying to use pool.imap which does provide results as processes finish but does not allow multiple arguments (my function requires 2 arguments). Also, the sequence of result is important.

Some sample code below:

pool = mp.Pool(processes=4)
y = []
for x in pool.starmap(f, zip(da, repeat(db))):
    y.append(x)

The above code works, but only gives the results once all the processes have completed. I cannot see any progress. This is why I tried to use pool.imap, works well but with only single argument:

pool = mp.Pool(processes=4)
y = []
for x in pool.imap(f, da)):
    y.append(x)

On multiple arguments raises the following exception:

TypeError: f() missing 1 required positional argument: 'd'

Looking for simple way to achieve all 3 requirements:

  1. parallel processing using multiple parameters/arguments
  2. manage to see progress while the processes are running
  3. ordered results.

Thanks!

Abdul Qadir
  • 439
  • 1
  • 6
  • 12

3 Answers3

12

You can simulate starmap using imap via the functools.partial() function:

import functools
import multiprocessing as mp

def my_function(constant, my_list, optional_param=None):
    print(locals())

with mp.Pool() as pool:
    list(
        pool.imap(
            functools.partial(
                my_function, 2, optional_param=3
            ),
            [1,2,3,4,5]
        )
    )

Outputs:

$ python3 foo.py
{'optional_param': 3, 'my_list': 1, 'constant': 2}
{'optional_param': 3, 'my_list': 3, 'constant': 2}
{'optional_param': 3, 'my_list': 2, 'constant': 2}
{'optional_param': 3, 'my_list': 4, 'constant': 2}
{'optional_param': 3, 'my_list': 5, 'constant': 2}
bugmenot123
  • 1,069
  • 1
  • 18
  • 33
confused00
  • 2,556
  • 21
  • 39
5

I can answer the first two question pretty quickly. I think you should be able to handle the third question after understanding the first two.

1. Parrallel Processing with Multiple Arguments

I'm not sure about the whole "starmap" equivalent but here's an alternative. What I've done in the past is condense my arguments into a single data object like a list. For example, if you want to pass three arguments to your map_function, you could append those arguments into a list, and then use the list with the .map() or .imap() function.

def map_function(combo):
    a = combo[0]
    b = combo[1]
    c = combo[2]
    return a + b + c

if '__name__' == '__main__':
    combo = []
    combo[0] = arg_1
    combo[1] = arg_2
    combo[2] = arg_3

    pool = Pool(processes=4)
    pool.map(map_function, combo)

2. Tracking Progress

A good way to do this is using multiprocessing's shared value. I actually asked this (almost) same exact question about a month ago. This allows you to manipulate the same variable from the different processes created by your map function. For the sake of learning, I'm going to let you read and figure out the shared state solution on your own. If you're still having trouble after a few attempts, I'll be more than happy to help you, but I beleive that teaching yourself how to understand something is much more valuable than me giving you the answer.

Hope this helps!!

Community
  • 1
  • 1
Austin A
  • 2,990
  • 6
  • 27
  • 42
  • While I will go through your link, I just wanted to mention that using pool.starmap I do get items 1 & 3 but not 2 and using pool.imap I am getting all 3 except multiple argument part. My 2nd argument is a 3-d list (which remains constant for all processes). would you still recommend joining them into a combination. Thanks! – Abdul Qadir Sep 11 '15 at 04:48
  • the solution at the link works and now i can achieve all 3. many thanks for that. i wanted to see update using progressbar. i have been trying to make it work it but starting the progressbar in the main and updating in the add_print does not seem to work. i even tried to mark progressbar as global, or passing it as parameter. but no luck. any thoughts please. – Abdul Qadir Sep 11 '15 at 06:45
  • The three lines to unpack `combo` can be condensed to `a, b, c = combo`. – Terry Jan Reedy Jul 28 '23 at 17:44
1

I think this solution exactly meets your 3 requirements: https://stackoverflow.com/a/28382913/2379433

In short, p = Pool(); p.imap will enable you to see progress and maintain order. If you want map functions with multiple arguments, you can use a fork of multiprocessing that provides better serialization and multiple arguments. See the link for an example.

Community
  • 1
  • 1
Mike McKerns
  • 33,715
  • 8
  • 119
  • 139