157

I have a script that's successfully doing a multiprocessing Pool set of tasks with a imap_unordered() call:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

However, my num_tasks is around 250,000, and so the join() locks the main thread for 10 seconds or so, and I'd like to be able to echo out to the command line incrementally to show the main process isn't locked. Something like:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(2)

Is there a method for the result object or the pool itself that indicates the number of tasks remaining? I tried using a multiprocessing.Value object as a counter (do_work calls a counter.value += 1 action after doing its task), but the counter only gets to ~85% of the total value before stopping incrementing.

Oren
  • 4,711
  • 4
  • 37
  • 63
MidnightLightning
  • 6,715
  • 5
  • 44
  • 68

12 Answers12

156

My personal favorite -- gives you a nice little progress bar and completion ETA while things run and commit in parallel.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass
Tim
  • 1,677
  • 1
  • 10
  • 4
  • 106
    what if pool returns a value? – Nickpick Feb 06 '17 at 10:57
  • 14
    I created an empty list called result before the loop then inside the loop just do result.append(x). I tried this with 2 processes and used imap instead of map and everything worked as I wanted it to @nickpick – bs7280 Jul 12 '17 at 22:08
  • 2
    so my progress bar is iterating to new lines instead of progressing in-place, any idea why this might be? – Austin May 31 '18 at 15:26
  • 2
    don't forget to `pip install tqdm` – Mr. T Sep 07 '18 at 21:50
  • 1
    Did you run `pool.close()`, `pool.join()`? – Sean Sep 24 '18 at 07:52
  • 6
    Don't forget to wrap this code in `if __name__ == "__main__":`, or else it may mysteriously not work – kevinsa5 Sep 27 '18 at 17:05
  • 5
    @bs7280 By result.append(x) did you mean result.append(_) ? What is x? – jason Apr 05 '19 at 23:18
  • Late to the party, but will @bs7280's solution work if output order is relevant (i.e. `map`/`imap` rather than `imap_unordered`? – hdkrgr Jul 24 '19 at 15:11
  • @jason I just tried it and it looks like it is results.append(_) – Ben Ogorek Aug 06 '19 at 17:38
  • I tried something similar with `results.append` but it seems like it is running everything in parallel before passing it to the loop and so `tqdm` does not give a progress bar until the work is over... – ojunk Oct 14 '19 at 14:03
  • 1
    Could we get an example where we need a return value? I tried adding to a list within the parallelized function, but it does not seem to work (empty list). – Pablo Dec 01 '21 at 09:26
  • 2
    @Nickpick easy, results = []; for result in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)): results.append(result) – John Jiang May 07 '22 at 23:51
97

There is no need to access private attributes of the result set:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • 7
    I see the print out only after the code exit (not every iteration). Do you have a suggestion? – Hanan Shteingart Nov 06 '14 at 10:47
  • @HananShteingart: It works fine on my system (Ubuntu) with both Python 2 and 3. I've used `def do_word(*a): time.sleep(.1)` as an example. If it doesn't work for you then create a [complete minimal code example](http://stackoverflow.com/help/mcve) which demonstrates your issue: describe using words what do you expect to happen and what happens instead, mention how do you run your Python script, what is your OS, Python version and [post it as a new question](http://tinyurl.com/stack-hints). – jfs Dec 01 '14 at 17:13
  • 21
    I had the same problem as @HananShteingart: it's because I was trying to use `Pool.map()`. I didn't realise that _only_ `imap()` and `imap_unordered()` work in this way - the documentation just says "A lazier version of map()" but really means "the underlying iterator returns results as they come in". – simonmacmullen Mar 24 '15 at 16:01
  • @simonmacmullen: both the question and my answer use `imap_unordered()`. Hanan's issue is probably due to `sys.stderr.write('\r..')` (overwriting the same line to show the progress). – jfs Mar 24 '15 at 19:55
  • 3
    Also possible! I mainly wanted to document a stupid assumption I'd made - in case anyone else reading this made it too. – simonmacmullen Mar 25 '15 at 12:00
  • 1
    @HananShteingart you may need to flush the output stream: `sys.stderr.flush()` – ealfonso Mar 06 '19 at 05:50
  • @ealfonso as I said earlier: it works on both Python 2 (the future import hints at it) and Python 3 as is. It is not obvious whether the behavior is guaranteed on Python 3 and therefore the flush() call wouldn't hurt. Though it might not help Hanan (the map() call — unlike the imap_unordered() call — won't return until it ends). – jfs Mar 06 '19 at 15:31
  • @Tupolev._: `enumerate(iterator, start=1)` makes `i` start at `1` – jfs Jan 22 '20 at 18:33
  • To get the values after, and to show a constantly updating progress bar: `with mp.Pool() as pool: results_list = [] for i, value in enumerate(pool.imap_unordered(do_work, range(num_tasks)), start=1): sys.stderr.write(f'\rProcessed: {i}/{num_tasks} tasks. {i/num_tasks:2.2%} complete') sys.stderr.flush() results_list.append(value) print(result_list, flush=True)` – Brett Elliot Aug 10 '22 at 13:04
39

I found that the work was already done by the time I tried to check it's progress. This is what worked for me using tqdm.

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

This should work with all flavors of multiprocessing, whether they block or not.

reubano
  • 5,087
  • 1
  • 42
  • 41
  • 10
    I think creates a bunch of threads, and each thread is counting independently – nburn42 Apr 26 '19 at 21:44
  • 1
    I have functions within functions which results in a pickling error. – ojunk Oct 14 '19 at 13:59
  • This does not create a progress bar for me, but it kind of works. It counts iterations (and displays total expected iterations). Although the count goes up and down because of threading stuff (I guess) it is not hard to see more or less where it is at any time. So far this is what works best for me (I have to use a return value, which complicates other answers). – Pablo Dec 01 '21 at 09:43
  • Maybe I am missing something, but how can different processes access to the very same pbar instance which is created in the main process memory space? – g.pickardou Jun 01 '22 at 11:24
39

As suggested by Tim, you can use tqdm and imap to solve this issue. I've just stumbled upon this problem and tweaked the imap_unordered solution, so that I can access the results of the mapping. Here's how it works:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

In case you don't care about the values returned from your jobs, you don't need to assign the list to any variable.

mrapacz
  • 889
  • 8
  • 22
  • This is the best answer. Shows progress while the tasks are completing and returns the results. – Justas Aug 13 '21 at 15:50
23

Found an answer myself with some more digging: Taking a look at the __dict__ of the imap_unordered result object, I found it has a _index attribute that increments with each task completion. So this works for logging, wrapped in the while loop:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

However, I did find that swapping the imap_unordered for a map_async resulted in much faster execution, though the result object is a bit different. Instead, the result object from map_async has a _number_left attribute, and a ready() method:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)
Cristian Ciupitu
  • 20,270
  • 7
  • 50
  • 76
MidnightLightning
  • 6,715
  • 5
  • 44
  • 68
  • 4
    I tested this for Python 2.7.6 and rs._number_left appears to be the number of chunks remaining. So if rs._chunksize isn't 1 then rs._number_left won't be the number of list items remaining. – Allen Aug 19 '14 at 21:14
  • Where should I put this code? I mean this is not executed until the content of `rs` is knowns and it is a bit late or not? – Wakan Tanka Aug 23 '15 at 22:24
  • @WakanTanka: It goes in the main script after it spins off the extra threads. In my original example, it goes in the "while" loop, where `rs` has already launched the other threads. – MidnightLightning Aug 24 '15 at 11:58
  • 1
    Could you please edit your question and/or answer to show minimum working example. I do not see `rs` in any loop, I'm multiprocessing newbie and this would help. Thank you very much. – Wakan Tanka Aug 24 '15 at 12:07
  • @WakanTanka: Okay, I copied the loop headers from the example in the original question; make more sense? – MidnightLightning Aug 24 '15 at 21:16
  • 1
    At least in `python 3.5`, the solution using `_number_left` does not work. `_number_left` represents the chunks that remain to be processed. For example, if I want to have 50 elements passed to my function in parallel, then for a thread pool with 3 processes `_map_async()` creates 10 chunks with 5 elements each. `_number_left` then represents how many of these chunks have been completed. – mSSM Jan 16 '16 at 20:55
  • Why isn't the number remaining, `remaining = rs._number_left * chunksize`? – Colin Helms Aug 03 '19 at 19:46
13

I know that this is a rather old question, but here is what I'm doing when I want to track the progression of a pool of tasks in python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

Basically, you use apply_async with a callbak (in this case, it is to append the returned value to a list), so you don't have to wait to do something else. Then, within a while-loop, you check the progression of the work. In this case, I added a widget to make it look nicer.

The output:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

Hope it helps.

Julien Tourille
  • 163
  • 1
  • 7
  • gotta change: `[pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]` for `(pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)` – David Przybilla Aug 28 '15 at 14:10
  • That's not true. A generator object will not work here. Checked. – swagatam Jul 13 '16 at 18:04
10

A simple solution with Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.2)
    return x**2


n = 10

with Pool(4) as p, tqdm(total=n) as pbar:
    res = [p.apply_async(
        work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
    results = [r.get() for r in res]
zeawoas
  • 446
  • 7
  • 18
8

Quick start

Using tqdm and multiprocessing.Pool

Install

pip install tqdm

Example

import time
import threading
from multiprocessing import Pool

from tqdm import tqdm


def do_work(x):
    time.sleep(x)
    return x


def progress():
    time.sleep(3)  # Check progress after 3 seconds
    print(f'total: {pbar.total} finish:{pbar.n}')


tasks = range(10)
pbar = tqdm(total=len(tasks))

if __name__ == '__main__':
    thread = threading.Thread(target=progress)
    thread.start()
    results = []
    with Pool(processes=5) as pool:
        for result in pool.imap_unordered(do_work, tasks):
            results.append(result)
            pbar.update(1)
    print(results)

Result




Flask

Install

pip install flask

main.py

import time
from multiprocessing import Pool

from tqdm import tqdm
from flask import Flask, make_response, jsonify

app = Flask(__name__)


def do_work(x):
    time.sleep(x)
    return x


total = 5  # num of tasks
tasks = range(total)
pbar = tqdm(total=len(tasks))


@app.route('/run/')
def run():
    results = []
    with Pool(processes=2) as pool:
        for _result in pool.imap_unordered(do_work, tasks):
            results.append(_result)
            if pbar.n >= total:
                pbar.n = 0  # reset
            pbar.update(1)
    response = make_response(jsonify(dict(results=results)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response


@app.route('/progress/')
def progress():
    response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response

Run (In Windows, for example)

set FLASK_APP=main
flask run

API list

test.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Progress Bar</title>
    <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
    <script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<button id="run">Run the task</button>
<br><br>
<div class="progress">
    <div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
         style="width: 10%">0.00%
    </div>
</div>
</body>
<script>
    function set_progress_rate(n, total) {
        //Set the rate of progress bar
        var rate = (n / total * 100).toFixed(2);
        if (n > 0) {
            $(".progress-bar").attr("aria-valuenow", n);
            $(".progress-bar").attr("aria-valuemax", total);
            $(".progress-bar").text(rate + "%");
            $(".progress-bar").css("width", rate + "%");
        }
    }

    $("#run").click(function () {
        //Run the task
        $.ajax({
            url: "http://127.0.0.1:5000/run/",
            type: "GET",
            success: function (response) {
                set_progress_rate(100, 100);
                console.log('Results:' + response['results']);
            }
        });
    });
    setInterval(function () {
        //Show progress every 1 second
        $.ajax({
            url: "http://127.0.0.1:5000/progress/",
            type: "GET",
            success: function (response) {
                console.log(response);
                var n = response["n"];
                var total = response["total"];
                set_progress_rate(n, total);
            }
        });
    }, 1000);
</script>
</html>

Result

XerCis
  • 917
  • 7
  • 6
4

I created a custom class to create a progress printout. Maby this helps:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results
Aronstef
  • 119
  • 1
  • 4
3

After doing some research, I wrote a small module called parallelbar. It allows you to display both the overall progress of the pool and for each core separately. It is easy to use and has a good description.

For example:

from parallelbar import progress_map
from parallelbar.tools import cpu_bench


if __name__=='__main__':
    # create list of task
    tasks = [1_000_000 + i for i in range(100)]
    progress_map(cpu_bench, tasks)

padu
  • 689
  • 4
  • 10
1

Try this simple Queue based approach, which can also be used with pooling. Be mindful that printing anything after the initiation of the progress bar will cause it to be moved, at least for this particular progress bar. (PyPI's progress 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()
0

Some answers work with the progress bar but I could not get results out of the pool

I used tqdm to create progress bar u can install it by pip install tqdm

Below simple code work pretty well with progress bar and u can get the result as well:

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep

tasks = range(5)
result = []

def do_work(x):
    # do something with x and return the result
    sleep(2)
    return x + 2

if __name__ == '__main__':
    pbar = tqdm(total=len(tasks))

    with Pool(2) as p:
        for i in p.imap_unordered(do_work, tasks):

            result.append(i)
            pbar.update(i)
    
    pbar.close()
    print(result)
mohammad H
  • 21
  • 3