2

I have to run a program on 200 files in a round robin.

Right now I have them running like this:

for combo in it.combinations(files, 2):
    cmd = ["command", combo[0], combo[1]]
    subprocess.Popen(cmd)

I would like to run only say 60 at a time as not to overwhelm the computer, the command is pretty processor intensive. What's the best way to pause the loop once 60 processes are running and then start again once one has finished so that there are always 60 processes running?

helicase
  • 354
  • 1
  • 5
  • 13
  • Check out threading.Thread, you could create 60 threads and then have them draw their commands from a list until it's empty (then have them exit) – max k. Mar 11 '13 at 15:11
  • @maxk. using python's threads will not improve performance in the general case. OP has already included code for using the subprocess module, which is recommended for parallel execution in python. – Wilduck Mar 11 '13 at 15:15
  • It won't improve performance, but it's a lot more customisable and the ability to write your own thread subclass gives you a lot more options for problems like this. Just a thought though – max k. Mar 11 '13 at 15:35
  • @Wilduck - threads do improve performance in this case. Something has to wait for the process to complete before dispatching new processes. A thread per process doing p.communicate() or p.wait() is a perfectly good solution. I think max k is right, this is a great use for a thread pool. – tdelaney Mar 11 '13 at 15:40

4 Answers4

4
#!/usr/bin/env python
import itertools
import subprocess
from multiprocessing.dummy import Pool # use threads

def run(combo):
    cmd = ["command", combo[0], combo[1]]
    return combo, subprocess.call(cmd)

def main():
    p = Pool(60) # 60 subprocesses at a time
    for combo, rc in p.imap_unordered(run, itertools.combinations(files, 2)):
        print("%s exited with %s" % (combo, rc))
    p.close()
    p.join()

if __name__ == "__main__":
    main()

This answer demonstrates various techniques to limit number of concurrent subprocesses: it shows multiprocessing.Pool, concurrent.futures, threading + Queue -based solutions.

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
1

This might help:

import itertools as it
import time
import subprocess

files = range(5)
max_load = 3
sleep_interval = 0.5

pid_list = []
for combo in it.combinations(files, 2):
  # Random command that takes time
  cmd = ['sleep', str(combo[0]+combo[1])]

  # Launch and record this command
  print "Launching: ", cmd
  pid = subprocess.Popen(cmd)
  pid_list.append(pid)

  # Deal with condtion of exceeding maximum load
  while len(filter(lambda x: x.poll() is None, pid_list)) >= max_load:
    time.sleep(sleep_interval)
Robᵩ
  • 163,533
  • 20
  • 239
  • 308
0

You want something like this:

import socket
import threading
import Queue
import subprocess

class IPThread(threading.Thread):
    def __init__(self, queue, num):
        super(IPThread, self).__init__()
        self.queue = queue
        self.num = num
    def run(self):
        while True:
            try:
                args = self.queue.get_nowait()
                cmd = ["echo"] + [str(i) for i in args]
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                out, err = p.communicate()
                print out
            except Queue.Empty:
                # Nothing left in the Queue -- we are done
                print "Queue %d done" % self.num
                break
            except Exception as err:
                # Handle exception
                print err
            self.queue.task_done()

def create_threads(q, size):
    for i in range(size):
        thread = IPThread(q, i)
        thread.setDaemon(True)
        thread.start()
    q.join()

def fill_queue(q):
    # Call q.put(args) in a loop to populate Queue with arguments
    from itertools import permutations
    x = list(range(20))
    for arg1, arg2 in permutations(x, 2):
        q.put([arg1, arg2])
    print q.qsize()

def main():
    q = Queue.Queue()
    fill_queue(q)
    create_threads(q, 60)
    print "Done"

if __name__ == '__main__':
    main()

Create a queue of things to work on. Specialize your Thread-derived class. Spin up your threads. Wait for them to be done.

You can tell that the tasks are running concurrently because their output interferes with each other. It's a feature!

hughdbrown
  • 47,733
  • 20
  • 85
  • 108
  • This example doesn't wait for the process to complete and doesn't deal with the possibility that the process will generate output and stall because its pipe fills. As written, it will launch all of the processes at once. That's not what the poster wants. – tdelaney Mar 11 '13 at 15:37
  • This runs at most 60 processes at a time, one for each thread created. It does not launch all the processes at once. You're right about filling the output buffer. This takes the poster most of the way. For example, I don't know how what is supposed to go in the queue, so I am just using integers. I don't have any idea how big a buffer the OP's output might fill up, so I could only guess at the `bufsize` to use. But this is most of the problem, right -- runs multiple processes concurrently? – hughdbrown Mar 11 '13 at 16:01
  • Its been edited to include p.communicate() so now it works fine. – tdelaney Mar 11 '13 at 16:03
0

You could do something really simple like:

from time import sleep

count = 0
for combo in it.combinations(files, 2):
    while count < 60:
        cmd = ["command", combo[0], combo[1]]
        subprocess.Popen(cmd)
        count = count + 1
        if subprocess_is_done:
            count = count - 1
    sleep(5)

Obviously you'd need to figure out how to get subprocess_is_done from your command.

This works for trivial cases as far as I can tell, but have no clue what you're trying to run...

hughdbrown
  • 47,733
  • 20
  • 85
  • 108
tkone
  • 22,092
  • 5
  • 54
  • 78
  • I was thinking along these lines but I'm not sure how to tell when subprocess is done. Maybe wait() or poll(), not sure how they would work in this loop though. – helicase Mar 11 '13 at 15:25
  • You'd need to record all of currently running processes, and use `.poll()` to determine their state. – Robᵩ Mar 11 '13 at 15:27