5

So, I set up a short script which executes an external program (written in Fortran 77). I want to run multiple instances of the program and since I have 8 cores on my computer the simplest solution I found was:

import subprocess


import os


i = n

while(i<n):
  dire = "dir/Run"+str(i)+"/"
  os.chdir(dire)
  p1 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+1)+"/"
  os.chdir(dire)
  p2 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+2)+"/"
  os.chdir(dire)
  p3 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+3)+"/"
  os.chdir(dire)
  p4 = subprocess.Popen(['./mej'])  
  dire = "dir/Run"+str(i+4)+"/"
  os.chdir(dire)
  p5 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+5)+"/"
  os.chdir(dire)
  p6 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+6)+"/"
  os.chdir(dire)
  p7 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+7)+"/"
  os.chdir(dire)
  p8 = subprocess.Popen(['./mej'])
  dire = "/Run"+str(i+8)+"/"
  os.chdir(dire)
  p3 = subprocess.Popen(['./mej'])
  exit_codes = [p.wait() for p in p1, p2, p3, p4, p5, p6, p7, p8]
  i = i + 8



print "Job's done!"

Now this worked mostly fine at first, however I just changed to a variable time step and in doing so the time each integration runs for varies significantly. Now the problem is that the script will wait for the slowest one to finish before starting up a new set of integrations. How can I write it so that I will always have 8 instances running?

Nathan Hughes
  • 94,330
  • 19
  • 181
  • 276

3 Answers3

4

You could use a thread pool, to keep all CPUs busy:

#!/usr/bin/env python
import os
import subprocess
from multiprocessing.pool import ThreadPool

def run(i):
    working_dir = "dir/Run/" + str(i + 1)
    return i, subprocess.call(os.path.join(working_dir, 'mej'), cwd=working_dir)

results = ThreadPool().map(run, range(n))

As soon as one mej process finishes, the next one is started. No more than os.cpu_count() concurrent worker processes are running at a time.

jfs
  • 399,953
  • 195
  • 994
  • 1,670
1

While the execution time for a given run may differ significantly, it is often safe to assume that the time taken by e.g. 10 sequential runs will have far less variance.

So the simple solution A is to start 8 processes each calling the external program 10 times, and then wait for these processes to finish. You'll still have to wait for the slowest process, but the overhead will be considerably smaller.

Of course there is an obvious solution B: to create a pool of pending runs with 8 processes picking a new run from the pool once they've finished their current run. This will truly minimize the ovehead, but you'll have to deal with syncronization primitives here.

Here's a small illustration of these 3 approaches (the one you use and the two I'm talking about):

enter image description here

The small red squares show where the room for improvement is. Basically, Approach A avoids stopping each thread but one after each run. Approach B goes even furter and enables a thread which has finished all its runs to take one from another thread.

Dmitry Grigoryev
  • 3,156
  • 1
  • 25
  • 53
  • 1
    I got a good solution from another poster, but thanks for the ideas/illustration, I'll keep it in mind in the future. – Giorgi Kokaia Jul 22 '15 at 09:40
0

You can write something looking like. Define the total number of runs and the number of available cores, as well as the delay to check if one is done. For the delay, just put the number of second that is reasonable. If one process run in 10 minutes average, 60 seconds of delay or less can be good enough.

import subprocess
import time
import os

def runIt(rootDir, prog, i):
    dire = "dir/Run/" + str(i + 1)
    os.chdir(dire)
    return subprocess.Popen(['./mej'])

n=16    #total number of runs
nProc = 8 # number of cores
i = 0
delay = 2 #delays in  second to check if one has returned

pList = [runIt(p) for p in range(min(nProc, n))]
i = len(pList)
while(i<n):
    time.sleep(delay) # delays for delay seconds
    for j in range(len(pList)):
        pList[j].poll()
        if pList[j].returncode is not None and i<n:
            pList[j] = runIt(i)
            i = i+1
print "Job's done!"
innoSPG
  • 4,588
  • 1
  • 29
  • 42