2

I am debugging an issue at the moment using multiprocessing.
I have the following child:

class Target(multiprocessing.Process):
    def __init__(self, work_queue, result_queue, suite_name, test_settings, html_log_dir, output_file, ip_address, fit_dir):
        multiprocessing.Process.__init__(self)

        # initialize other variables

    def run(self):
        print multiprocessing.current_process()
        suite_start_time = time.clock()
        while not self.kill_received:
            # get a task
            try:
                job = self.work_queue.get(True, 2)
            except Queue.Empty:
                self._log('Work queue empty, creating XML result file')
                self.create_xml_result_file(suite_start_time)
                break

            # the actual processing, run the test.

        fitnesse_common.log_output("\n(PID " + str(self.pid) + "): End of process")

    def create_xml_result_file(self, start_time):
        # generate result 

The parent process basically just launches several (12) targets and waits for them all to join.

The issue is that for some reason the child processes are running to the end of the run function (I see the end of process prints), and then not terminating for some reason, which prevents the parent process from continuing.

EDIT - Not all the spawned processes hang, only a couple of them. Of the 12 spawned processes, usually only 2-4 of them hang after completing their run function.

I considered calling terminate at the end of the run function, but the Python documentation indicates that it is a bad idea.

I have looked at several different articles on Stack Overflow regarding Python multprocessing, and most of them related to issues with the parent process.

Any thoughts or help would be much appreciated.

UPDATE: Here is a script that readily reproduced the problem:

import multiprocessing, Queue
import subprocess
import time
import sys

class Target(multiprocessing.Process):
    def __init__(self, work_queue, results_queue, delay_length):
        # base class initialization
        multiprocessing.Process.__init__(self)

        # job management stuff
        self.work_queue = work_queue
        self.results_queue = results_queue
        self.delay_length = delay_length
        self.kill_received = False

    def run(self):
        while not self.kill_received:
            # get a task
            try:
                job = self.work_queue.get(True, 2)
            except Queue.Empty:
                self._log('Work queue empty, prepare to terminate')
                break

            time.sleep(self.delay_length)
            self._log("Sleeping done")

            results = self._run_an_application(job)

            self.results_queue.put(results)
            self._log("Have put results on queue " + str(job) + "-" + results)
        self._log("\n(PID " + str(self.pid) + "): End of process")
    def _log(self, text):
        print ('PID ' + str(self.pid) + ' => ' + text)
        sys.stdout.flush()

    def _run_an_application(self, app):
        try: 
            test_output = subprocess.check_output(app)
        except subprocess.CalledProcessError, e:
            log_output('### Process check_output threw exception CalledProcessError')
            test_output = e.output

        return test_output

if __name__ == "__main__":

    test_jobs = []
    started_targets = []

    # run
    # load up work queue
    for i in range(500):
        test_jobs.append('spewage')
    work_queue = multiprocessing.Queue()

    for job in test_jobs:
        work_queue.put(job)

    # create a queue to pass to targets to store the results
    result_queue = multiprocessing.Queue()

    # spawn targets
    for i in range(12):
        started_targets.append(Target(work_queue, result_queue, i))

    # start all targets
    for i in range(len(started_targets)):
        started_targets[i].start()
        print "starting process no %s with id: %s" % (i, started_targets[i].pid)

    print 'Waiting for all processes to join'

    # wait for all targets to finish
    for i in range(len(started_targets)):
        started_targets[i].join()

    print 'All processes have joined'

    # collect the results off the queue
    while not result_queue.empty():
        target_result = result_queue.get()
        print "Test job - " + target_result
    print ('All Tests completed')

Here is the source code (it is C++) of the "spewage" application.

#include <iostream>
#include <windows.h>
using namespace std;

int main()
{
    for (int i = 0; i < 500; i++)
    {
        cout << "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" << endl;
        Sleep(20); 
    }
    return 0;
}

Since it appears to be related to the amount pushed to stdout, the C++ program could easily be replaced by another script that prints a lot of stuff.

  • This definitely is not a solution to your current issue, but do you have to use child processes in your current design? Or could you use threads instead? – Kevin Aug 12 '15 at 14:46
  • This was code that I inherited, and I personally don't know why they chose to use processes. The processes run FitNesse test runners, and create a results file which we parse. I personally don't see any reason it couldn't be done in threads though (which is probably the way I would have done it instead of creating monster process trees). – Allen Vandiver Aug 12 '15 at 14:55
  • If that's the case, maybe it would be worth trying to change the base class multiprocessing.Process to threading.Thread. The API should match for the most part depending on how complex your process objects are. – Kevin Aug 12 '15 at 15:04
  • Just to be sure: Both of you are aware of CPython's global interpreter lock (GIL) and its consequences for multithreaded code? That's the most likely reason to use processes instead of threads. – BlackJack Aug 12 '15 at 15:58
  • Any chance some children are waiting to put something in the result queue and the parent process doesn't empty that because it waits for the children to end? Can you provide a _runnable_ example that demonstrates the problem, so readers are able to reproduce it‽ – BlackJack Aug 12 '15 at 16:00
  • After reading this: http://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python?rq=1 I am not sure multi-threading is a viable long term solution due to the potentially extreme performance hit it would cause. The answers to that question make it sound like multi-threading would still limit me to one core of the host computer, whereas currently we are using all 12 logical cores (6 real and 6 hyper threaded). – Allen Vandiver Aug 12 '15 at 16:04
  • Yeah, I will update the question when I get a runnable example ready – Allen Vandiver Aug 12 '15 at 20:04
  • To expand a bit on the comment by @BlackJack -- here is an example where that [occurred](http://stackoverflow.com/questions/31708646/process-join-and-queue-dont-work-with-large-numbers) – Patrick Maupin Aug 12 '15 at 23:29

1 Answers1

1

I managed to figure out the issue. It seems to have been related to the amount of output in the subprocesses. At the end of the process' run() function, I needed to put self.results_queue.cancel_join_thread()

I am still curious as to why it works when there isn't a lot of stdout, but the processes hang when there is. According to the Python documentation, the way I was using the result_queue should have locked up consistently, even though it didn't.