I'm working on a script that needs to process a lot of data. I realized that the parallel component of the script doesn't actually work help for instances with lots of individual data points. I'm going to create temporary files and run them in parallel instead. I'm running this on qsub
so I will allocate a certain amount of threads via -pe threaded $N_JOBS
(in this case 4 for this small example).
My ultimate goal is to start each process using one of the threads I've allocated and then wait for ALL of the jobs to finish before proceeding.
However, I've only ever used process = subprocess.Popen
and process.communicate()
to run shell jobs. I've had some trouble in the past using process.wait()
because of zombie process.
How can I modify my run
function to start the job, not wait to finish, then start the next job, then once all jobs are running, wait for ALL jobs to finish?
Please let me know if this is not clear and I can explain better. In the example below (maybe a terrible example?), I would want to use 4 separate threads (I'm not sure how to set this up b/c I've only ever done joblib.Parallel
for simple parallelization) where each thread ran the command echo '$THREAD' && sleep 1
. So in the end it should take a little over 1 second instead of ~4 seconds.
I've found this post: Python threading multiple bash subprocesses? but I wasn't sure how I could adapt it for my situation with my run
script.
import sys, subprocess, time
# Number of jobs
N_JOBS=4
# Run command
def run(
cmd,
popen_kws=dict(),
):
# Run
f_stdout = subprocess.PIPE
f_stderr = subprocess.PIPE
# Execute the process
process_ = subprocess.Popen(cmd, shell=True, stdout=f_stdout, stderr=f_stderr, **popen_kws)
# Wait until process is complete and return stdout/stderr
stdout_, stderr_ = process_.communicate() # Use this .communicate instead of .wait to avoid zombie process that hangs due to defunct. Removed timeout b/c it's not available in Python 2
# Return code
returncode_ = process_.returncode
return {"process":process_, "stdout":stdout_, "stderr":stderr_, "returncode":returncode_}
# Commands
cmds = list(map(lambda x:"echo '{}' && sleep 1".format(x), range(1, N_JOBS+1)))
# ["echo '1'", "echo '2'", "echo '3'", "echo '4'"]
# Start time
start_time = time.time()
results = dict()
for thread, cmd in enumerate(cmds, start=1):
# Run command but don't wait for it to finish (Currently, it's waiting to finish)
results[thread] = run(cmd)
# Now wait until they are all finished
print("These jobs took {} seconds\n".format(time.time() - start_time))
print("Here's the results:", *results.items(), sep="\n")
print("\nContinue with script. .. ...")
# These jobs took 4.067937850952148 seconds
# Here's the results:
# (1, {'process': <subprocess.Popen object at 0x1320766d8>, 'stdout': b'1\n', 'stderr': b'', 'returncode': 0})
# (2, {'process': <subprocess.Popen object at 0x1320547b8>, 'stdout': b'2\n', 'stderr': b'', 'returncode': 0})
# (3, {'process': <subprocess.Popen object at 0x132076ba8>, 'stdout': b'3\n', 'stderr': b'', 'returncode': 0})
# (4, {'process': <subprocess.Popen object at 0x132076780>, 'stdout': b'4\n', 'stderr': b'', 'returncode': 0})
# Continue with script. .. ...
I've tried following the documentation on multiprocessing
https://docs.python.org/3/library/multiprocessing.html but it's really confusing to adapt this to my situation:
# Run command
def run(
cmd,
errors_ok=False,
popen_kws=dict(),
):
# Run
f_stdout = subprocess.PIPE
f_stderr = subprocess.PIPE
# Execute the process
process_ = subprocess.Popen(cmd, shell=True, stdout=f_stdout, stderr=f_stderr, **popen_kws)
return process_
# Commands
cmds = list(map(lambda x:"echo '{}' && sleep 0.5".format(x), range(1, N_JOBS+1)))
# ["echo '1'", "echo '2'", "echo '3'", "echo '4'"]
# Start time
start_time = time.time()
results = dict()
for thread, cmd in enumerate(cmds, start=1):
# Run command but don't wait for it to finish (Currently, it's waiting to finish)
p = multiprocessing.Process(target=run, args=(cmd,))
p.start()
p.join()
results[thread] = p