2

This is my naive approach to call external commands in a worker process and append all the output of the commands to a single file. This is an example code.

from concurrent.futures import ProcessPoolExecutor
from functools import partial
import multiprocessing
import subprocess

def worker_process_write_output(fh, lock, mylist):
    output = subprocess.run("dir /b", shell=True, stdout=subprocess.PIPE, universal_newlines=True).stdout
    with lock:  # Need lock to prevent multiple processes writing to the file simultenously
        fh.write(mylist)
        fh.writelines(output)    

if __name__ == '__main__':
    with open("outfile.txt", "a") as fh: # I am opening file in main process to avoid the overhead of opening & closing the file multiple times in each worker process
        mylist = [1, 2, 3, 4]
        with ProcessPoolExecutor() as executor:
            lock = multiprocessing.Manager().Lock()
            executor.map(partial(worker_process_write_output, fh, lock), mylist)

This code hangs when run. What are the mistakes and corrections? Some of them I guess are 1. Can't pass file handle to worker process. Need to open and close file in worker process. Not sure of the reason 2. Can't use subprocess.run in worker process, need to use os.popen("dir /b").read() or something else 3. Not sure if lock is necessary and if necessary is this the right lock?

ggorlen
  • 44,755
  • 7
  • 76
  • 106
ontherocks
  • 1,747
  • 5
  • 26
  • 43

1 Answers1

1

File contexts can be passed between processes so I'm not sure why your code is deadlocking in the file handler. Having said that, I'm assuming you're doing a lot of work in your run() function, so the overhead of opening/closing the file once per process shouldn't be terribly significant. If it's not a lot of work that's being done, multiprocessing is probably not the best choice to begin with anyway since it involves serious overhead.

Additionally, fh.write(mylist) raises a TypeError: write() argument must be str, not int, so we need to cast with fh.write(str(mylist)).

Here's the workaround:

import multiprocessing
import subprocess
from concurrent.futures import ProcessPoolExecutor
from functools import partial

def worker_process_write_output(lock, mylist):
    output = subprocess.run("dir /b", shell=True, stdout=subprocess.PIPE,
                            universal_newlines=True).stdout

    with lock:
        with open("outfile.txt", "a") as fh:
            fh.write(str(mylist))
            fh.writelines(output)


if __name__ == '__main__':
    mylist = [1, 2, 3, 4]

    with ProcessPoolExecutor() as executor:
        lock = multiprocessing.Manager().Lock()
        executor.map(partial(worker_process_write_output, lock), mylist)
ggorlen
  • 44,755
  • 7
  • 76
  • 106
  • Thanks. On a sidenote, I had used `import subprocess`, just missed typing it in the question :(. How did you get the errors you mentioned in the bullet points? Did you use some kind of verbose output flag or something? Mine just hanged. – ontherocks Jun 19 '19 at 16:00
  • My processes also got stuck, but when I killed it, it gave me the serialization error. It looks like you can share file handles between processes--I wish I could give a better explanation of why it's not working in this case, so this is more of a workaround than a direct solution. Still, the writing IO and subprocess creation/calling is likely going to dwarf the opening/closing time, so I don't think it's much of a compromise. If `writelines` is a huge bottleneck, you might get one worker to handle all I/O and let the work processes handle `run()` using a work queue. – ggorlen Jun 19 '19 at 16:06
  • I updated the answer--I'd be curious to hear a better explanation for the deadlock as well, so feel free to unmark the answer as accepted if you're not satisfied and maybe someone will show up to shed some light on the situation. I'll keep looking at it when I have time and see what the trouble might be. [This answer](https://stackoverflow.com/a/11196615/6243352) offers some interesting perspectives in the meantime. – ggorlen Jun 19 '19 at 16:19
  • Delegating writelines to another worker process is a good idea. I will try in the next round of optimization. – ontherocks Jun 19 '19 at 16:45