218

To make my code more "pythonic" and faster, I use multiprocessing and a map function to send it a) the function and b) the range of iterations.

The implanted solution (i.e., calling tqdm directly on the range tqdm.tqdm(range(0, 30))) does not work with multiprocessing (as formulated in the code below).

The progress bar is displayed from 0 to 100% (when python reads the code?) but it does not indicate the actual progress of the map function.

How can one display a progress bar that indicates at which step the 'map' function is ?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Any help or suggestions are welcome...

Adam Smooch
  • 1,167
  • 1
  • 12
  • 27
SciPy
  • 5,412
  • 4
  • 18
  • 18
  • Can you post the code snippet of the progress bar? – Alex Jan 29 '17 at 12:14
  • 7
    For people in search for a solution with `.starmap()`: [Here](https://stackoverflow.com/a/57364423/9059420) is a patch for `Pool` adding `.istarmap()`, which will also work with `tqdm`. – Darkonaut Aug 06 '19 at 04:58

10 Answers10

236

Use imap instead of map, which returns an iterator of the processed values.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
one
  • 2,205
  • 1
  • 15
  • 37
hkyi
  • 3,516
  • 2
  • 19
  • 9
  • I tried this solution. It worked, but for some reason, the call to `list()` is necessary, as well as passing the size of the list in the `total=` argument for `tqdm()`. Why is that? – Douglas De Rizzo Meneghetti Aug 08 '17 at 14:04
  • 27
    An enclosing list() statement waits for the iterator to end. total= is also required since tqdm does not know how long the iteration will be, – hkyi Aug 14 '17 at 03:27
  • 35
    Is there a similar solution for `starmap()` ? – mr.tarsa Apr 20 '18 at 08:56
  • 3
    `for i in tqdm.tqdm(...): pass ` may be a more straight-forward, that `list(tqdm.tqdm)` – savfod Aug 02 '18 at 10:54
  • 3
    This works but has anyone else had it continuously print the progress bar on a newline for each iteration? – Moo Nov 16 '18 at 06:06
  • If you encounter locking issues while trying this solution, try removing the `tqdm.write()` statements from your code. – Pragy Agarwal Mar 13 '19 at 21:33
  • 7
    The behaviour is wired when specific `chunk_size` of `p.imap`. Can `tqdm` update every iteration instead of every chunk? – huangbiubiu Mar 26 '19 at 10:41
  • None of the answers worked for multithreading, though. Has anyone found a concise solution? – Eduardo Pignatelli Apr 13 '19 at 09:13
  • 1
    The method works; however, each bar is updated on the same line (overlapping progress bars for different processes). Does anyone know how to solve this? – Timbus Calin Sep 28 '19 at 06:38
  • @hkyi please can you elaborate more on why we need to call list(). thanks – rohanmehto2 Jan 30 '20 at 10:26
  • The "patch" solution for starmap can be found here: https://stackoverflow.com/questions/57354700/starmap-combined-with-tqdm – akozlu Mar 15 '20 at 09:27
  • 5
    I don't think this solution works properly. Stays in 0% for almost all the time, and suddenly goes to 100%. – Carlos Souza Sep 06 '20 at 02:00
  • @CarlosSouza That's because imap keep order, iterator won't skip unfinished items. You can use `imap_unordered`. – Ziming Song Aug 03 '23 at 11:20
211

Sorry for being late but if all you need is a concurrent map, I added this functionality in tqdm>=4.42.0:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

References: https://tqdm.github.io/docs/contrib.concurrent/ and https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

It supports max_workers and chunksize and you can also easily switch from process_map to thread_map.

casper.dcl
  • 13,035
  • 4
  • 31
  • 32
  • Cool (+1), but throws `HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))` in Jupyter – Ébe Isaac Jun 26 '20 at 08:24
  • @Ébe-Isaac see https://github.com/tqdm/tqdm/issues/937 – casper.dcl Jun 26 '20 at 08:27
  • 2
    I see an issue with discussion to hack tqdm_notebook, however, can't workout a solution to solve for tqdm.contrib.concurrent. – Ébe Isaac Jun 26 '20 at 09:32
  • how to close the process with process_map? something like 'p.close()' and 'p.join()' ? – Xudong May 05 '21 at 12:53
  • 3
    @Xudong `process_map` creates, runs , closes/joins and returns a list. – casper.dcl May 05 '21 at 15:23
  • 2
    This is great! So glad I found it. One question remains, when I use this in a jupyter notebook, it doesn't work very well. I know there is a `tqdm.notebook`, is there someway to merge the two? – jlconlin May 17 '21 at 19:02
  • I report the same issues when using this in a jupyter notebook. Especially it crashes for `thread_map` – Vladimir Vargas Jun 06 '21 at 01:00
  • 2
    This makes unconditional copies of the iterated arguments, while the others seems to do copy-on-write. – Passer By Jun 28 '21 at 09:38
  • 3
    Hmm.. finishes while the progress bar is stuck at zero. – Sterling Aug 14 '21 at 04:54
  • If i pass in some kwargs (for instance, initargs, initializer - which are kwrgs for multiprocessing.Pool), does the wrapper pass them onto the Pool instance created? I can see it does pass through max_workers, and chunksize. – P S Solanki Sep 12 '21 at 16:14
  • 5
    @jlconlin @Vladimir Vargas I don't have any issues if I do something like e.g. `thread_map(fn, *iterables, tqdm_class=tqdm.notebook.tqdm, max_workers=12)` in a Jupyter Notebook today. – snooze92 Oct 13 '21 at 07:14
  • Using this with requests gives me the wrong number of finished iterations. Prefer @SciPy solution – gruvw Jul 26 '22 at 23:23
  • 2
    When I try this, my progress bar is stuck at zero and never updates. – ifly6 Aug 23 '22 at 20:51
  • Hi, I'm using the thread_map, which, looks weird with unrecognized texts, like "?" in a box. Something like ! box[?]t.t7...Done! box[?]box[?]box[?].., the bar shows 0%. I'm using it in Windows default command line. – Crear Oct 23 '22 at 19:33
  • Works perfectly here! Thnks – Glauberiano Feb 03 '23 at 19:15
  • bizarre to include this functionality in tqdm. talk about scope creep – Nimitz14 Jun 21 '23 at 22:50
96

Solution found. Be careful! Due to multiprocessing, the estimation time (iteration per loop, total time, etc.) could be unstable, but the progress bar works perfectly.

Note: Context manager for Pool is only available in Python 3.3+.

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for _ in p.imap_unordered(_foo, range(0, max_)):
                pbar.update()
SciPy
  • 5,412
  • 4
  • 18
  • 18
36

You can use p_tqdm instead.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))
Victor Quach
  • 469
  • 4
  • 4
9

based on the answer of Xavi Martínez I wrote the function imap_unordered_bar. It can be used in the same way as imap_unordered with the only difference that a processing bar is shown.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))
Oliver Wilken
  • 2,654
  • 1
  • 24
  • 34
7
import multiprocessing as mp
import tqdm


iterable = ... 
num_cpu = mp.cpu_count() - 2 # dont use all cpus.


def func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(num_cpu) as p:
        list(tqdm.tqdm(p.imap(func, iterable), total=len(iterable)))
Dariusz Krynicki
  • 2,544
  • 1
  • 22
  • 47
6

For progress bar with apply_async, we can use following code as suggested in:

https://github.com/tqdm/tqdm/issues/484

import time
import random
from multiprocessing import Pool
from tqdm import tqdm

def myfunc(a):
    time.sleep(random.random())
    return a ** 2

pool = Pool(2)
pbar = tqdm(total=100)

def update(*a):
    pbar.update()

for i in range(pbar.total):
    pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()
Humza Naveed
  • 71
  • 1
  • 3
1

Here is my take for when you need to get results back from your parallel executing functions. This function does a few things (there is another post of mine that explains it further) but the key point is that there is a tasks pending queue and a tasks completed queue. As workers are done with each task in the pending queue they add the results in the tasks completed queue. You can wrap the check to the tasks completed queue with the tqdm progress bar. I am not putting the implementation of the do_work() function here, it is not relevant, as the message here is to monitor the tasks completed queue and update the progress bar every time a result is in.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results
Nick B.
  • 241
  • 2
  • 5
1

Based on "user17242583" answer, I created the following function. It should be as fast as Pool.map and the results are always ordered. Plus, you can pass as many parameters to your function as you want and not just a single iterable.

from multiprocessing import Pool
from functools import partial
from tqdm import tqdm


def imap_tqdm(function, iterable, processes, chunksize=1, desc=None, disable=False, **kwargs):
    """
    Run a function in parallel with a tqdm progress bar and an arbitrary number of arguments.
    Results are always ordered and the performance should be the same as of Pool.map.
    :param function: The function that should be parallelized.
    :param iterable: The iterable passed to the function.
    :param processes: The number of processes used for the parallelization.
    :param chunksize: The iterable is based on the chunk size chopped into chunks and submitted to the process pool as separate tasks.
    :param desc: The description displayed by tqdm in the progress bar.
    :param disable: Disables the tqdm progress bar.
    :param kwargs: Any additional arguments that should be passed to the function.
    """
    if kwargs:
        function_wrapper = partial(_wrapper, function=function, **kwargs)
    else:
        function_wrapper = partial(_wrapper, function=function)

    results = [None] * len(iterable)
    with Pool(processes=processes) as p:
        with tqdm(desc=desc, total=len(iterable), disable=disable) as pbar:
            for i, result in p.imap_unordered(function_wrapper, enumerate(iterable), chunksize=chunksize):
                results[i] = result
                pbar.update()
    return results


def _wrapper(enum_iterable, function, **kwargs):
    i = enum_iterable[0]
    result = function(enum_iterable[1], **kwargs)
    return i, result
Cookiereg
  • 29
  • 4
-3

This approach simple and it works.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()
Vijayabhaskar J
  • 417
  • 2
  • 4
  • 17