1

My question is hopefully particular enough to not relate to any of the other ones that I've read. I'm wanting to use subprocess and multiprocessing to spawn a bunch of jobs serially and return the return code to me. The problem is that I don't want to wait() so I can spawn the jobs all at once, but I do want to know when it finishes so I can get the return code. I'm having this weird problem where if I poll() the process it won't run. It just hangs out in the activity monitor without running (I'm on a Mac). I thought I could use a watcher thread, but I'm hanging on the q_out.get() which is leading me to believe that maybe I'm filling up the buffer and deadlocking. I'm not sure how to get around this. This is basically what my code looks like. If anyone has any better ideas on how to do this I would be happy to completely change my approach.

def watchJob(p1,out_q):
    while p1.poll() == None:
        pass
    print "Job is done"
    out_q.put(p1.returncode)

def runJob(out_q):
    LOGFILE = open('job_to_run.log','w')
    p1 = Popen(['../../bin/jobexe','job_to_run'], stdout = LOGFILE)
    t = threading.Thread(target=watchJob, args=(p1,out_q))
    t.start()

out_q= Queue()
outlst=[]
for i in range(len(nprocs)):
    proc = Process(target=runJob, args=(out_q,))
    proc.start()
    outlst.append(out_q.get()) # This hangs indefinitely
    proc.join()
fatalaccidents
  • 192
  • 1
  • 2
  • 13
  • Any particular reason to have both threading and multiprocessing? –  Sep 10 '14 at 19:15
  • why are you piping the jobs' stdout if you have no intention of reading the contents? depending on whether the jobs produce a lot of output or not, the jobs may simply block on writing to stdout. – isedev Sep 10 '14 at 19:15
  • I would look into [Multiprocessing Pools](https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool). They have several different ways to launch processes synchronously or asynchronously, and retrieving the return code (either by waiting or checking later) is pretty flexible. Also, there are a few ways to attach callbacks that execute when your subprocess completes. – skrrgwasme Sep 10 '14 at 19:45
  • I will definitely look into Pools. That may be the approach that I end up going with. I still would like to know where I'm going wrong here just for my understanding. Thank you to everyone who has answered so far. – fatalaccidents Sep 10 '14 at 20:38

2 Answers2

2

You don't need neither multiprocessing nor threading here. You could run multiple child processes in parallel and collect their statutes all in a single thread:

#!/usr/bin/env python3
from subprocess import Popen

def run(cmd, log_filename):
    with open(log_filename, 'wb', 0) as logfile:
        return Popen(cmd, stdout=logfile)

# start several subprocesses
processes = {run(['echo', c], 'subprocess.%s.log' % c) for c in 'abc'}
# now they all run in parallel
# report as soon as a child process exits
while processes: 
    for p in processes: 
        if p.poll() is not None:
           processes.remove(p) 
           print('{} done, status {}'.format(p.args, p.returncode))
           break

p.args stores cmd in Python 3.3+, keep track of cmd yourself on earlier Python versions.

See also:

To limit number of parallel jobs a ThreadPool could be used (as shown in the first link):

#!/usr/bin/env python3
from multiprocessing.dummy import Pool # use threads
from subprocess import Popen

def run_until_done(args):
    cmd, log_filename = args
    try:
        with open(log_filename, 'wb', 0) as logfile:
            p = Popen(cmd, stdout=logfile)
        return cmd, p.wait(), None
    except Exception as e:
        return cmd, None, str(e)

commands = ((('echo', str(d)), 'subprocess.%03d.log' % d) for d in range(500))
pool = Pool(128) # 128 concurrent commands at a time
for cmd, status, error in pool.imap_unordered(run_until_done, commands):
    if error is None:
       fmt = '{cmd} done, status {status}'
    else:
       fmt = 'failed to run {cmd}, reason: {error}'
    print(fmt.format_map(vars())) # or fmt.format(**vars()) on older versions

The thread pool in the example has 128 threads (no more, no less). It can't execute more than 128 jobs concurrently. As soon as any of the threads frees (done with a job), it takes another, etc. Total number of jobs that is executed concurrently is limited by the number of threads. New job doesn't wait for all 128 previous jobs to finish. It is started when any of the old jobs is done.

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • This will raise an exception because the size of `processes` changes while you're iterating over it. You'd have to iterate over a copy of `processes`. – dano Sep 12 '14 at 00:55
  • No problem. Out of curiosity, why'd you also switch from a set to a list? The `remove` is more efficient with the `set`, and I'd assume a `set.copy()` is O(n), the same as a `list[:]`. Granted, for iterables this small it doesn't really make much difference anyway... – dano Sep 12 '14 at 01:27
  • @dano: no reason. The main task is to "print the exit statuses in a near completion order". `[:]` is less visually distracting than `.copy()`. Performance doesn't matter here (n is small). I've only mentioned it, in case the code is copy-pasted without understanding. – jfs Sep 12 '14 at 01:58
  • @dano: ok. I've made it O(n). – jfs Sep 12 '14 at 02:01
  • @J.F.Sebastian This seems to do the trick for a set amount of processes. The thing that is kind of throwing me for a loop is that I'm trying to run for example 128 jobs... and then as one finishes pull another one from a database so I'm constantly keeping 128 jobs running. So once the set is iterated over and the jobs start, I'm unsure of how to monitor it to add another job into the mix. Thanks for your answer, it is very close and most likely only lacking because of my poor description. – fatalaccidents Sep 12 '14 at 15:20
  • @fatalaccidents read the links in the answer; they describe how to maintain a fixed number of parallel jobs e.g., using a simple Pool – jfs Sep 12 '14 at 20:13
  • @fatalaccidents: I've added the explicit code example on how to limit number of parallel jobs using a Pool. – jfs Sep 13 '14 at 00:58
  • @J.F.Sebastian I thought seeing a small example would help me figure out my larger problem. This is a very elegant and nice code, but it doesn't solve one problem I have. Basically I have a database of 100,000's of jobs and I want to run as many as possible in the time I have. Since they take different times, I want all (for example) 128 processors running the jobs. I could wait and reiterate with new jobs and a pool of 128, but I would rather add in new jobs as old jobs terminate. Is this possible with pool or will I need to use a different function inside multiprocessing? Thanks again. – fatalaccidents Sep 16 '14 at 16:33
  • Also, since this might be a large and complicated question, would it be proper SO etiquette to ask the question again but with specifics and a code example of my attempt? – fatalaccidents Sep 16 '14 at 16:34
  • @fatalaccidents: look at the last code example that I've mentioned in my previous comment. *It solves your exact problem*. Do you see *"128 concurrent commands at a time"* comment? I've added an explanation at the end of the answer. Is it more clear now? – jfs Sep 16 '14 at 19:26
  • @J.F.Sebastian Sorry, you're absolutely right. I misunderstood the way Pool worked, but I get it now. Thanks a lot! – fatalaccidents Sep 16 '14 at 20:17
1

If you're going to run watchJob in a thread, there's no reason to busy-loop with p1.poll; just call p1.wait() to block until the process finishes. Using the busy loop requires the GIL to constantly be released/re-acquired, which slows down the main thread, and also pegs the CPU, which hurts performance even more.

Also, if you're not using the stdout of the child process, you shouldn't send it to PIPE, because that could cause a deadlock if the process writes enough data to the stdout buffer to fill it up (which may actually be what's happening in your case). There's also no need to use multiprocessing here; just call Popen in the main thread, and then have the watchJob thread wait on the process to finish.

import threading
from subprocess import Popen
from Queue import Queue

def watchJob(p1, out_q):
    p1.wait()
    out_q.put(p1.returncode)

out_q = Queue()
outlst=[]
p1 = Popen(['../../bin/jobexe','job_to_run'])
t = threading.Thread(target=watchJob, args=(p1,out_q))
t.start()
outlst.append(out_q.get())
t.join()

Edit:

Here's how to run multiple jobs concurrently this way:

out_q = Queue()
outlst = []
threads = []
num_jobs = 3
for _ in range(num_jobs):
    p = Popen(['../../bin/jobexe','job_to_run'])
    t = threading.Thread(target=watchJob, args=(p1, out_q))
    t.start()
    # Don't consume from the queue yet.

# All jobs are running, so now we can start
# consuming results from the queue.
for _ in range(num_jobs):
    outlst.append(out_q.get())
    t.join()
dano
  • 91,354
  • 19
  • 222
  • 219
  • Sorry to not have the complete code here. It needs multiprocessing because when it is finished it will be spawning multiple jobs across multiple processors on an HPC. I tried the code with p1.wait() and with stdout going to a logfile and not directing it at all. It still just hangs after the program is done on the outlst.append(out_q.get()). So your solution technically works if I didn't need to spawn multiple jobs, but just looping around Popen will cause the program to wait. That's why I had the watcher thread, because I was hoping it wouldn't cause the program to wait. – fatalaccidents Sep 10 '14 at 20:32
  • @fatalaccidents Did it hang with my code above, or with your code, which is using a `multiprocessing.Process`? What kind of `Queue` are you using in the code you're running? – dano Sep 11 '14 at 03:30
  • Your code does not hang. If you add Process (and Queue from multiprocessing) in the mix you can get it to not hang, but if you put it in a loop it just runs one job and then the next. So it isn't running them in parallel. Would you be able to write an example that runs the jobs at the same time? Thanks for your help on this. – fatalaccidents Sep 11 '14 at 14:14
  • @fatalaccidents It's running one job at a time because you're consuming from the queue in each iteration. I've edited my answer to show how to avoid doing that. I'm also still not sure what you're using `multiprocessing.Process` for...using `Popen` to run each job will allow them run run concurrently across multiple processors; you don't need `multiprocessing` for that. – dano Sep 11 '14 at 14:23
  • Thank you, you are right indeed. It worked that way just as expected. The only problem is that I'm wanting to use this queue information to know when jobs have completed and if they completed properly to spawn new jobs. Reading the queue afterwards wouldn't help me, as jobs might finish at different times. Maybe I'm approaching this the wrong way. I basically need to consume the results of some kind of queue and feed it back to a spawning process. I'm actually reading from a database, but I was trying to get a simple example to work first. You have been very helpful. – fatalaccidents Sep 11 '14 at 14:41
  • @fatalaccidents Do the jobs you want to spawn need to know *which* job has finished successfully, or just need to know that some job has finished? What do you want to happen if a job fails? – dano Sep 11 '14 at 17:26
  • [you don't need neither multiprocessing nor threading here](http://stackoverflow.com/a/25798906/4279) – jfs Sep 12 '14 at 00:38