0

I have three functions (two loops) defined in such a way that i want to process a chunk of file from command_1 through command_3, once finished, go back process another chunk using the same work flow.

pseudo code showing here

the actual code is longer and working:

def run(cmd):
  try:
    subprocess.Popen(command,shell='True')
  except:
    exit()

def run_chunk(chunk,command,flag=False)
  for file in chunk
    cmd = eval(command+'("' + bam + ')"') 
    run(cmd)
def main():
  chunks = [[chunk1],[chunk2]...]
  for chunk in chunks:
    run_chunk(chunk, command_1, True)
    os.waitpid(-1,0)
    run_chunk(chunk, command_2, True)
    os.waitpid(-1,0)
    run_chunk(chunk, command_3, True)
    os.waitpid(-1,0)

note: eval will return a string, which is a command for the "run" function

my problem is that, when i run command_1, os.waitpid() seems working; once command_1 finishes, the program goes to command_2, it seems to me that command_2 will wait itself before goes to command_3, but the outer loop in the main function will execute command_1 immediately (which i do not wanted)

Can anyone spot any bug in the code? Many thanks!

Community
  • 1
  • 1
iiiir.com
  • 325
  • 5
  • 9
  • 1
    the current code you have posted does not work. `chunks` is undefined in the `main` scope. – IT Ninja Jul 01 '13 at 17:30
  • thanks for catching that. i am not showing the whole code since it is much longer. i can certainly copy paste the whole code here if that is better. chunks is a list of lists. each nested-list contains 4 file names to be processed. – iiiir.com Jul 01 '13 at 17:34
  • Why not just wait for the specific process (i.e. `suprocess.wait()` instead of waiting for *all* processes? – Krumelur Jul 01 '13 at 17:40
  • i want to handle, let say, 4 files simultaneously. and there are 3 steps (command 1 -3), each step based on the result of the previous step. within each step i want the all 4 files to be handled simultaneously. Once all tasks finished, start command_2 for all 4 files. – iiiir.com Jul 01 '13 at 17:55

2 Answers2

1

By looking at the API, I think the problem may be related with the way you are waiting for the child processes. I would suggest to actually try to wait for the specific pid (waitpid(child1)) of the child. You can get that information from the Popen call.

If pid is greater than 0, waitpid() requests status information for that specific process. If pid is 0, the request is for the status of any child in the process group of the current process. If pid is -1, the request pertains to any child of the current process. If pid is less than -1, status is requested for any process in the process group -pid (the absolute value of pid).

Daniel H.
  • 1,782
  • 16
  • 18
1

Each call to run_chunk may be spawning many child subprocesses. os.waitpid(-1, 0) will wait for any child subprocess to end. If there are many files in chunk, then os.waitpid(-1, 0) will return before all the child subprocesses complete. Thus, subsequent calls to run_chunk may occur too early.

If you want each call to run to happen sequentially, then add a call to proc.communicate() in run:

def run(cmd):
    try:
        proc = subprocess.Popen(cmd, shell=True)
        proc.communicate()
    except:
        exit()

If you want all the calls to run generated by run_chunk to occur concurrently, then perhaps the simplest way is to use a multiprocessing ThreadPool:

import multiprocessing.pool as mpool

def run(cmd):
    try:
        proc = subprocess.Popen(cmd, shell=True)
        proc.communicate()
    except:
        exit()

def run_chunk(chunk, command, flag=False):
    for file in chunk:
        cmd = eval(command + '("' + bam + ')"')
        pool.apply_async(run, args=(cmd,))
    pool.join()  # wait until all the calls to run have completed.

def main():
    chunks = [[chunk1], [chunk2]...]
    for chunk in chunks:
        run_chunk(chunk, command_1, True)
        run_chunk(chunk, command_2, True)
        run_chunk(chunk, command_3, True)

if __name__ == '__main__':
    pool = mpool.ThreadPool() 

I chose to use a ThreadPool instead of a regular multiprocessing Pool here because each worker in the pool merely calls subprocess.Popen, which in turn, spawns a new subprocess. The worker in the pool simply waits for that subprocess to finish. So it seemed a waste to run the worker in its own subprocess. I think a thread, which is lighter-weight will do.

If you don't specify a number when instantiating mpool.ThreadPool, then you will get a pool with as many worker threads as you have CPU cores. That sounds optimal to me since each worker thread will be spawning a subprocess which will naturally require a core. So there is point in having more worker threads (and thus more subprocesses) than cores, since the surplus subprocesses would just have to wait for an available core anyway.

Community
  • 1
  • 1
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • Thanks! this seems to be the reason. any suggestions on how to design the loops so between steps, it wait for previous step to finish. but within steps, it process all the files in the chunk simultaneously. – iiiir.com Jul 01 '13 at 17:58