47

I am doing some parallel processing, as follows:

with mp.Pool(8) as tmpPool:
        results = tmpPool.starmap(my_function, inputs)

where inputs look like: [(1,0.2312),(5,0.52) ...] i.e., tuples of an int and a float.

The code runs nicely, yet I cannot seem to wrap it around a loading bar (tqdm), such as can be done with e.g., imap method as follows:

tqdm.tqdm(mp.imap(some_function,some_inputs))

Can this be done for starmap also?

Thanks!

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
sdgaw erzswer
  • 2,182
  • 2
  • 26
  • 45
  • 1
    If possible, I would say change your `my_function` to receive one packed argument and unpack it inside the function and then use `imap` – Tomerikoo Aug 05 '19 at 08:26
  • Yes, that is the default solution currently. I am still wondering whether starmap supports this (or any variant of it) – sdgaw erzswer Aug 05 '19 at 08:47
  • Not that I'm aware of or can see in the docs. The only variant I know of is `starmap_async` which is simply non-blocking but still returns a result object. I believe you will have to adjust your function to work with `imap` as it is the only option that works as a generator and not returning all results at once. Will be happy to see if there is a better solution – Tomerikoo Aug 05 '19 at 09:39
  • 1
    Thanks, Currently, I've re-implemented it with imap. Would be nice to have the istarmap also! – sdgaw erzswer Aug 05 '19 at 11:21

4 Answers4

47

It's not possible with starmap(), but it's possible with a patch adding Pool.istarmap(). It's based on the code for imap(). All you have to do, is create the istarmap.py-file and import the module to apply the patch before you make your regular multiprocessing-imports.

Python <3.8

# istarmap.py for Python <3.8
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    if self._state != mpp.RUN:
        raise ValueError("Pool not running")

    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self._cache)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Python 3.8+

# istarmap.py for Python 3.8+
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    self._check_running()
    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Then in your script:

import istarmap  # import to apply patch
from multiprocessing import Pool
import tqdm    


def foo(a, b):
    for _ in range(int(50e6)):
        pass
    return a, b    


if __name__ == '__main__':

    with Pool(4) as pool:
        iterable = [(i, 'x') for i in range(10)]
        for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
                           total=len(iterable)):
            pass
Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • 1
    Very nice, this is exactly what I was after! Thanks! – sdgaw erzswer Aug 06 '19 at 04:48
  • I get `AttributeError: '_PoolCache' object has no attribute '_cache'` - any ideas? It occurs at the line `result = mp.IMapIterator(self._cache)` – wfgeo Jul 09 '20 at 08:06
  • 1
    @wfgeo I'm using `mpp` as name for the module, your example uses `mp`. Do you get the error with exactly my example from the answer, too? – Darkonaut Jul 09 '20 at 08:40
  • Yes I just replaced `mpp` and `mp`, it's just a personal convention, sorry. I do get the error with the same code, but it was because I had not called the module `istarmap`. I am currently having trouble bundling it into my own module, however, I can't seem to figure out the import statement if I put istarmap as a submodule in my own module – wfgeo Jul 09 '20 at 08:46
  • @wfgeo That's okay, I just didn't knew if you had some other module named `mp`. I'm afraid that's not really enough information to understand your problem, but you need to import `istarmap` _before_ you import anything else from multiprocessing. – Darkonaut Jul 09 '20 at 09:12
  • @wfgeo I get the same error you did. For me it had not anything t do with the abbreviation `mpp` vs `mp` but i had to alter the following line. Instead of `result = mpp.IMapIterator(self._cache)` i need `result = mpp.IMapIterator(self)` By inspecting it in the debugger is, that the constructor of `IMapIterator` needs to get a Pool instance, and not the cache (c.f. https://github.com/python/cpython/blob/3.8/Lib/multiprocessing/pool.py#L838) – physicus Sep 11 '20 at 12:00
  • @physicus Thanks, I looked into it and saw they did some breaking changes in Python 3.8. There's a bit more to tweak, than just replacing `self._cache`, I've updated my answer with a version for Python 3.8. – Darkonaut Sep 11 '20 at 15:10
  • @Darkonaut With your code how do you retrieve the outputs of the function? With starmap for example we just used result = pool.starmap(....) – Julien Drevon Nov 11 '22 at 18:13
  • 1
    @JulienDrevon It's already iterating over the results. In the example the results is assigned to `_`, within `for _ in tqdm.tqdm(...`, because the result doesn't get used, but that's just convention for this case. You could write `for result in tqdm.tqdm(...` and then `print(result)` on every iteration or whatever you want to do with it. – Darkonaut Nov 11 '22 at 18:36
47

The simplest way would probably be to apply tqdm() around the inputs, rather than the mapping function. For example:

inputs = zip(param1, param2, param3)
with mp.Pool(8) as pool:
    results = pool.starmap(my_function, tqdm.tqdm(inputs, total=len(param1)))

Note that the bar is updated when my_function is called, rather than when it returns. If that distinction matters, you can consider rewriting starmap as some other answers suggest. Otherwise, this is a simple and efficient alternative.

corey
  • 604
  • 6
  • 11
  • 5
    Thanks a lot. This should be the accepted answer, I think. I had to pass the input length as `total` to `tqdm` to make it work. – SaTa Feb 05 '21 at 20:06
  • 1
    You're correct, you'll likely need the total arg for streaming/lazy iterables – corey Feb 05 '21 at 20:16
  • Quick update: this did provide the progress bar, but the updates were not as dynamic as I hoped. It froze too much. – SaTa Feb 05 '21 at 21:21
  • 2
    Did you use chunksize != 1? It's possible elements were being pulled from the input in chunks so the progress bar updated irregularly – corey Feb 05 '21 at 22:00
  • Thanks, having `chunksize != 1` helped and made it a smoother bar! – SaTa Feb 06 '21 at 02:11
  • res_1, res_2 = zip(*pool.starmap(order2seq_multiproc, tqdm(tasks, total=len(tasks)))). This my code but the progress bar is always full and no update. Where should add the chunksize != 1? I put it right after total, it doesn't work, saying "name 'chunksize' is not defined" – Rocco Feb 07 '21 at 00:57
  • chunksize is a parameter of starmap, not tqdm. So try pool.starmap(order2seq_multiproc, tqdm(input), chunksize=chunksize) – corey Feb 08 '21 at 08:07
  • 2
    Note: a zip object doesn't have a length. Instead, total=len(param1) would work – ZaxR Jul 26 '21 at 19:50
  • 14
    This seems to only track when the inputs are being sent, but not when the processing of `my_function` is completed. – gofvonx Nov 04 '21 at 16:57
  • 3
    I'm not sure if you're aware but gofvonx is right. This measures input- not output-progression. That's also why this appears to be faster as some people commented. Now imagine all but the last task taking up five seconds but the last one hour to complete. You could end up with getting displayed 100% progress for almost an hour before the actual finish... – Darkonaut Feb 17 '22 at 00:28
  • 3
    Progresbar progresses very quickly and get to the %100, and then it still continues to run until it is done. – Volkan Yurtseven Apr 15 '22 at 09:39
18

As Darkonaut mentioned, as of this writing there's no istarmap natively available. If you want to avoid patching, you can add a simple *_star function as a workaround. (This solution inspired by this tutorial.)

import tqdm
import multiprocessing

def my_function(arg1, arg2, arg3):
  return arg1 + arg2 + arg3

def my_function_star(args):
    return my_function(*args)

jobs = 4
with multiprocessing.Pool(jobs) as pool:
    args = [(i, i, i) for i in range(10000)]
    results = list(tqdm.tqdm(pool.imap(my_function_star, args), total=len(args))

Some notes:

I also really like corey's answer. It's cleaner, though the progress bar does not appear to update as smoothly as my answer. Note that corey's answer is several orders of magnitude faster with the code I posted above with chunksize=1 (default). I'm guessing this is due to multiprocessing serialization, because increasing chunksize (or having a more expensive my_function) makes their runtime comparable.

I went with my answer for my application since my serialization/function cost ratio was very low.

cydonian
  • 1,686
  • 14
  • 22
  • This is the best answer! Your notes about corey's answer is on point! – ethanjyx Oct 19 '21 at 21:43
  • This is great, thanks! I think as an extension to this, you can write a general function `f_star(f, args)` that returns `f(*args)`. Then you can write this as a utility function and use it anywhere you want to use `tqdm` with `starmap`. – Adam Oppenheimer Sep 10 '22 at 19:10
  • Sorry, I made a mistake in my suggestion, it should say `f_star(f_args)` takes a tuple of `(f, args)` and returns `f(*args)`. – Adam Oppenheimer Sep 10 '22 at 19:57
-4

The temporary solution: rewriting the method to-be-parallelized with imap.

sdgaw erzswer
  • 2,182
  • 2
  • 26
  • 45