7

I have the following function

from multiprocessing import Pool
def do_comparison(tupl):
    x, y = tupl # unpack arguments
    return compare_clusters(x, y)

def distance_matrix(clusters, condensed=False):
    pool = Pool()
    values = pool.map_async(do_comparison, itertools.combinations(clusters, 2)).get()
    do stuff

Is it possible to print the progress of pool.map_async(do_comparison, itertools.combinations(clusters, 2)).get()? I tried it by adding a count to do_comparison like so

count = 0
def do_comparison(tupl):
    global count
    count += 1
    if count % 1000 == 0:
        print count
    x, y = tupl # unpack arguments
    return compare_clusters(x, y)

But aside from it not looking like a good solution, the numbers don't print until the end of the script. Is there a good way to do this?

Niek de Klein
  • 8,524
  • 20
  • 72
  • 143

2 Answers2

4

I track progress as follows:

import multiprocessing
import time

class PoolProgress:
  def __init__(self,pool,update_interval=3):
    self.pool            = pool
    self.update_interval = update_interval
  def track(self, job):
    task = self.pool._cache[job._job]
    while task._number_left>0:
      print("Tasks remaining = {0}".format(task._number_left*task._chunksize))
      time.sleep(self.update_interval)


def hi(x): #This must be defined before `p` if we are to use in the interpreter
  time.sleep(x//2)
  return x

a = list(range(50))

p   = multiprocessing.Pool()
pp  = PoolProgress(p)

res = p.map_async(hi,a)

pp.track(res)
Daffy
  • 841
  • 9
  • 23
Richard
  • 56,349
  • 34
  • 180
  • 251
  • 1
    That one line `a = [x for x in range(50)]` can be made more readable like `a = list(range(50))` – Daffy Dec 03 '18 at 05:00
4

The solution from Richard works well with a low number of jobs, but for some reason, it seems to freeze at a very high number of jobs, I found best to use:

import multiprocessing
import time

def track_job(job, update_interval=3):
    while job._number_left > 0:
        print("Tasks remaining = {0}".format(
        job._number_left * job._chunksize))
        time.sleep(update_interval)



def hi(x): #This must be defined before `p` if we are to use in the interpreter
  time.sleep(x//2)
  return x

a = [x for x in range(50)]

p   = multiprocessing.Pool()

res = p.map_async(hi,a)

track_job(res)