0

I'm working on a script that needs to process a lot of data. I realized that the parallel component of the script doesn't actually work help for instances with lots of individual data points. I'm going to create temporary files and run them in parallel instead. I'm running this on qsub so I will allocate a certain amount of threads via -pe threaded $N_JOBS (in this case 4 for this small example).

My ultimate goal is to start each process using one of the threads I've allocated and then wait for ALL of the jobs to finish before proceeding.

However, I've only ever used process = subprocess.Popen and process.communicate() to run shell jobs. I've had some trouble in the past using process.wait() because of zombie process.

How can I modify my run function to start the job, not wait to finish, then start the next job, then once all jobs are running, wait for ALL jobs to finish?

Please let me know if this is not clear and I can explain better. In the example below (maybe a terrible example?), I would want to use 4 separate threads (I'm not sure how to set this up b/c I've only ever done joblib.Parallel for simple parallelization) where each thread ran the command echo '$THREAD' && sleep 1. So in the end it should take a little over 1 second instead of ~4 seconds.

I've found this post: Python threading multiple bash subprocesses? but I wasn't sure how I could adapt it for my situation with my run script.

import sys, subprocess, time 

# Number of jobs
N_JOBS=4

# Run command
def run(
    cmd,
    popen_kws=dict(),
    ):

    # Run
    f_stdout = subprocess.PIPE
    f_stderr = subprocess.PIPE

    # Execute the process
    process_ = subprocess.Popen(cmd, shell=True, stdout=f_stdout, stderr=f_stderr, **popen_kws) 
    # Wait until process is complete and return stdout/stderr
    stdout_, stderr_ = process_.communicate() # Use this .communicate instead of .wait to avoid zombie process that hangs due to defunct. Removed timeout b/c it's not available in Python 2

    # Return code
    returncode_ = process_.returncode

    return {"process":process_, "stdout":stdout_, "stderr":stderr_, "returncode":returncode_}

# Commands
cmds = list(map(lambda x:"echo '{}' && sleep 1".format(x), range(1, N_JOBS+1)))
# ["echo '1'", "echo '2'", "echo '3'", "echo '4'"]

# Start time 
start_time = time.time()
results = dict()
for thread, cmd in enumerate(cmds, start=1):
    # Run command but don't wait for it to finish (Currently, it's waiting to finish)
    results[thread] = run(cmd)

# Now wait until they are all finished
print("These jobs took {} seconds\n".format(time.time() - start_time))
print("Here's the results:", *results.items(), sep="\n")
print("\nContinue with script. .. ...")

# These jobs took 4.067937850952148 seconds

# Here's the results:
# (1, {'process': <subprocess.Popen object at 0x1320766d8>, 'stdout': b'1\n', 'stderr': b'', 'returncode': 0})
# (2, {'process': <subprocess.Popen object at 0x1320547b8>, 'stdout': b'2\n', 'stderr': b'', 'returncode': 0})
# (3, {'process': <subprocess.Popen object at 0x132076ba8>, 'stdout': b'3\n', 'stderr': b'', 'returncode': 0})
# (4, {'process': <subprocess.Popen object at 0x132076780>, 'stdout': b'4\n', 'stderr': b'', 'returncode': 0})

# Continue with script. .. ...

I've tried following the documentation on multiprocessing https://docs.python.org/3/library/multiprocessing.html but it's really confusing to adapt this to my situation:

# Run command
def run(
    cmd,
    errors_ok=False,
    popen_kws=dict(),
    ):

    # Run
    f_stdout = subprocess.PIPE
    f_stderr = subprocess.PIPE

    # Execute the process
    process_ = subprocess.Popen(cmd, shell=True, stdout=f_stdout, stderr=f_stderr, **popen_kws) 

    return process_

# Commands
cmds = list(map(lambda x:"echo '{}' && sleep 0.5".format(x), range(1, N_JOBS+1)))
# ["echo '1'", "echo '2'", "echo '3'", "echo '4'"]

# Start time 
start_time = time.time()
results = dict()
for thread, cmd in enumerate(cmds, start=1):
    # Run command but don't wait for it to finish (Currently, it's waiting to finish)
    p = multiprocessing.Process(target=run, args=(cmd,))
    p.start()
    p.join()
    results[thread] = p
O.rka
  • 29,847
  • 68
  • 194
  • 309

1 Answers1

1

You're almost there. The simplest method of working with multiprocessing is to use the multiprocessing.Pool object, as shown in the introduction of the multiprocessing documentation, and then map() or starmap() your set of functions. The big difference between map() and starmap() is that map() assumes your function takes a single parameter (and you can therefore pass a simple iterable), and starmap() requires nested iterables of parameters.

For your example, this would work (run() function largely skipped, although I altered the signature to be a command and a list of parameters, since it's generally a bad idea to pass a string to system calls):

from multiprocessing import Pool

N_JOBS = 4

def run(cmd, *args):
    return cmd + str(args)

cmds = [
    ('echo', 'hello', 1, 3, 4),
    ('ls', '-l', '-r'),
    ('sleep', 3),
    ('pwd', '-P'),
    ('whoami',),
]

results = []
with Pool(N_JOBS) as p:
    results = p.starmap(run, cmds)

for r in results:
    print(r)

It's not necessary to have the same number of jobs as commands; the sub-processes in the Pool will be reused as needed to run functions.

A Richter
  • 128
  • 3
  • With `starmap` I got the following error: `# TypeError: run() takes from 1 to 3 positional arguments but 21 were given` and with `map` I got `MaybeEncodingError: Error sending result: '[{'process': , 'stdout': b'3\n', 'stderr': b'', 'returncode': 0}]'. Reason: 'TypeError("can't pickle _thread.lock objects",)'` – O.rka Dec 03 '19 at 19:05
  • You're using your original `run` function, which takes 1 required param. With `map`, you can't ever pass 2 params, so popen_kwds can't be passed, but it will work. `starmap` requires nested lists (e.g. `[['ls -l',{'shell':True}], ['pwd'],...]`). The return can only pass pickle-able objects, so Popen objects can't be the return. Just return the stdout, stdin, and returncode values; they'll pickle fine. – A Richter Dec 06 '19 at 22:01