1

Good day!

I have a python script, it creates a file list and processes it in multiprocess.Pool.map and thread function. Thread function uses outside executable and calls it via subprocess.check_call. This outter executable prints some information to stdout.

So I have problem with reading this output - sometimes it's messed and I can't get any useful information from it. I've read about printing and multithreading in python but I think it's not exactly my problem, because I don't explicitly call print function in my script.

How can I solve this problem? Thank you.

Also, I have noticed that if I redirect output from my script to a file, the output isn't messed at all.

[UPDATE]:

This works fine if I run script: python mp.py > mp.log

import time, argparse, threading, sys
from os import getenv
from multiprocessing import Pool

def f(x):
    cube = x*x*x
    print '|Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut %d|'%(cube)
    return cube

if __name__ == '__main__':

    #file = open('log.txt', 'w+')
    parser = argparse.ArgumentParser(description='cube', usage='%(prog)s [options] -n')
    parser.add_argument('-n', action='store', help='number', dest='n', default='10000', metavar = '')
    args = parser.parse_args()

    pool = Pool()

    start = time.time()
    result = pool.map(f, range(int(args.n)))
    end = time.time()
    print (end - start)
    #file.close()
kvv
  • 336
  • 1
  • 13
  • 3
    The reason is because different processes print to the same terminal, so you get one line of one thread, than a line of a second thread, than another line of the first thread, (or at least thats what i think your problem looks like concerning the "messy output") – usethedeathstar Jul 19 '13 at 09:51
  • How can I solve this problem? Lock is useless. I also tried to replace all print expr with sys.stdout.write, it didn't help, either. – kvv Jul 22 '13 at 13:01
  • In that case i guess the solution is to let your external executable print its output to a log.txt for each thread, that way it will work – usethedeathstar Jul 23 '13 at 10:27
  • do tell if that solves the problem – usethedeathstar Jul 23 '13 at 13:56
  • @usethedeathstar i've posted some code, i hope it's ok. – kvv Jul 23 '13 at 15:02

2 Answers2

3

To avoid a mixed output from multiple concurrent subprocesses, you could redirect the output of each subprocess to a different file:

from multiprocessing.dummy import Pool # use threads
from subprocess import call

def run(i):
    with open('log%d.txt' % i, 'wb') as file:
        return call(["cmd", str(i)], stdout=file)

return_codes = Pool(4).map(run, range(10)) # run 10 subprocesses, 4 at a time

Or collect the output and print it from a single thread in your code:

from functools import partial
from multiprocessing.dummy import Pool, Queue, Process # use threads
from subprocess import Popen, PIPE

def run(i, output):
    p = Popen(["cmd", str(i)], stdout=PIPE, bufsize=1)
    for line in iter(p.stdout.readline, b''):
        output((p.pid, line)) # collect the output 
    p.stdout.close()
    return p.wait()

def print_output(q):
    for pid, line in iter(q.get, None):
        print pid, line.rstrip()

q = Queue()
Process(target=print_output, args=[q]).start() # start printing thread
return_codes = Pool(4).map(partial(run, output=q.put_nowait),
                           range(10)) # run 10 subprocesses, 4 at a time
q.put(None) # exit printing thread

Or you could use a lock:

from __future__ import print_function
from multiprocessing.dummy import Pool, Lock # use threads
from subprocess import Popen, PIPE

def run(i, lock=Lock()):
    p = Popen(["cmd", str(i)], stdout=PIPE, bufsize=1)
    for line in iter(p.stdout.readline, b''):
        with lock:
            print(p.pid, line.rstrip())
    p.stdout.close()
    return p.wait()

return_codes = Pool(4).map(run, range(10)) # run 10 subprocesses, 4 at a time

Note: print() function is used to workaround the issue from the question: Why a script that uses threads prints extra lines occasionally?

To avoid mixing the lines from different subprocesses, you could collect units larger than a single line at a time depending on what the actual output is.

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • Many thanks, lock solution works nice! I didn't know thread function could take lock as parameter! – kvv Jul 24 '13 at 06:27
0

Another reasonably general solution, also using unique files:

import time, argparse, threading, sys
from os import getenv, getcwd, getpid
from os.path import join
from multiprocessing import Pool, cpu_count

logger = None  # Will be set by init() to give a unique logger for each process in the pool
def init(*initargs):
    global logger
    print(initargs)
    lpath = getcwd() if initargs is None or len(initargs) == 0 else initargs[0]
    name = 'log{!s}'.format(str(getpid()))
    logger = open(join(lpath, name), mode='wt')  # Get logger with unique name


def f(x):
    global logger
    cube = x*x*x
    logger.write('|Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut {}|\n'.format(cube))
    logger.flush()
    return cube

if __name__ == '__main__':

    #file = open('log.txt', 'w+')
    parser = argparse.ArgumentParser(description='cube', usage='%(prog)s [options] -n')
    parser.add_argument('-n', action='store', help='number', dest='n', default='10000', metavar = '')
    args = parser.parse_args()

    pool = Pool(cpu_count(), init)

    start = time.time()
    result = pool.map(f, range(int(args.n)))
    end = time.time()
    print (end - start)
    #file.close()
Jonathan
  • 2,635
  • 3
  • 30
  • 49