0

I am very pleased with the progressbar module, and I use it a lot with the StdOut redirect functionality. Recently, I started using (pathos) multiprocessing, but I cannot get the two combined to work.

I also had some problems with keyboard interrupts, and I read that this is caused by a bug in Python2. I added the code I use to deal with, in case it is relevant to this problem.

Furthermore, I noticed that playing around with the different map functions is can solve many problems. I am using imap because I want to write intermediate results to a csv file and of course to display the progressbar.

I played around with StdOut myself, and tried some suggestions on the internet. However, I always end up in two undesirable situations.

Either:

  1. StdOut gets not redirected and the progressbar is repeated after each print statement.
  2. StdOut gets redirected but the output of the workers is not shown.

Here is some toy code demonstrating my problem:

import time, signal, multiprocessing
import progressbar


def do_work(number):
    if not number % 500:
        print 'Special log occasion ...'
    time.sleep(0.1)

def example(redirect_stdout):
    workers = multiprocessing.cpu_count()
    num_tasks = 1000
    pbar = progressbar.ProgressBar(widgets=[progressbar.Bar()], max_value=num_tasks, redirect_stdout=redirect_stdout)
    pbar.start()

    # Start a with SIGINT turned of, so that workers can be interrupted
    original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = multiprocessing.Pool(processes=workers)
    signal.signal(signal.SIGINT, original_sigint_handler)


    for i, _ in enumerate(pool.imap(do_work, xrange(num_tasks)), 1):
        pbar.update(i)

    pool.close()
    pool.join()
    pbar.finish()

print "Case1: Progressbar without redirecting output:"
example(False)
print "\nCase1: Progressbar without redirecting output:"
example(True)

Output:

Case1: Progresspar without redirecing output:
Special log occasion ...
|######################                       |
Special log occasion ...
|#############################################|


Case2: Progresspar with redirecing output:
|#############################################|
Community
  • 1
  • 1
Tivaro
  • 177
  • 12

1 Answers1

1

Writing to the same output stream using multiple processes is always prone to synchronisation issues, or worse, overwritten/missing data. Luckily it's not that difficult to work around this issue :)

# vim: set fileencoding=utf-8
import six
import sys
import time
import signal
import multiprocessing
import progressbar


def do_work(number):
    if not number % 50:
        print 'Special log occasion ...'
        sys.stdout.flush()
    time.sleep(0.1)


class IOQueue(six.StringIO):

    '''
    Very poor and simple IO wrapper which only functions for simple print
    statements
    '''

    def __init__(self, queue, *args, **kwargs):
        six.StringIO.__init__(self, *args, **kwargs)
        self.queue = queue

    def write(self, value):
        self.queue.put(value)


def example(redirect_stdout):
    workers = multiprocessing.cpu_count()
    num_tasks = 1000
    pbar = progressbar.ProgressBar(
        widgets=[progressbar.Bar()],
        max_value=num_tasks,
        redirect_stdout=redirect_stdout,
    )
    # Start a with SIGINT turned of, so that workers can be interrupted
    original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)

    stdout_queue = multiprocessing.Queue()

    def initializer(queue):
        sys.stdout = IOQueue(queue)

    pool = multiprocessing.Pool(
        processes=workers, initializer=initializer, initargs=[stdout_queue])
    signal.signal(signal.SIGINT, original_sigint_handler)

    for i, _ in enumerate(pool.imap(do_work, xrange(num_tasks)), 1):
        while not stdout_queue.empty():
            sys.stdout.write(stdout_queue.get())

        pbar.update(i)

    pool.close()
    pool.join()
    pbar.finish()

example(True)

The code above makes all workers write the stdout data to a multiprocessing queue which is written to the regular stdout before updating the progress bar.

Wolph
  • 78,177
  • 11
  • 137
  • 148