1

I am having a problem where child processes are hanging in my python application, only 4/16 processes have finished all of these processes are adding to a multiprocessing queue. https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues According to python docs:

Warning

As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

I believe this may be my problem, however I do a get() off the queue before I join. I am not sure what other alternatives I can take.

def RunInThread(dictionary):
    startedProcesses = list()
    resultList = list()
    output = Queue()
    scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads

    for item in scriptList:
        if __name__ == '__main__':
            proc = Process(target=CreateScript, args=(item, output))
            startedProcesses.append(proc)
            proc.start()

    while not output.empty():
        resultList.append(output.get())

    #we must wait for the processes to finish before continuing
    for process in startedProcesses:
        process.join()
        print "finished"

#defines chunk of data each thread will process
def ThreadChunk(seq, num):
  avg = len(seq) / float(num)
  out = []
  last = 0.0

  while last < len(seq):
    out.append(seq[int(last):int(last + avg)])
    last += avg

  return out

def CreateScript(scriptsToGenerate, queue):
    start = time.clock()
    for script in scriptsToGenerate:
    ...
        queue.put([script['timeInterval'], script['script']])

    print time.clock() - start
    print "I have finished"
Johnathon64
  • 1,280
  • 1
  • 20
  • 45
  • Why is that `if __name__ == '__main__'` inside your function and not at the module level exactly? (such as `if __name__ == '__main__': my_dict = {..}; RunInThread(my_dict)`) – 301_Moved_Permanently Aug 13 '15 at 11:33
  • Should I not have it there? I have only just started using python myself however it doesnt seem to be a problem as my processes still start – Johnathon64 Aug 13 '15 at 11:35
  • Even though it is harmless to have it there when you execute your script, you will start having unexpected behaviour as soon as you import it. See http://stackoverflow.com/questions/419163/what-does-if-name-main-do. – 301_Moved_Permanently Aug 13 '15 at 11:38
  • 1
    Anyway, back to the question. It seems that `ThreadChunk` is of your writing. You might need to post that too. Same goes for `CreateScript`. – 301_Moved_Permanently Aug 13 '15 at 11:41

1 Answers1

1

The issue with your code is that while not output.empty() is not reliable (see empty). You might also run into the scenario where the interpreter hits while not output.empty() before the processes you created finished their initialization (thus having the Queue actually empty).

Since you know exactly how much items will be put in the queue (i.e. len(dictionnary)) you can read that number of items from the queue:

def RunInThread(dictionary):
    startedProcesses = list()
    output = Queue()
    scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads

    for item in scriptList:
        proc = Process(target=CreateScript, args=(item, output))
        startedProcesses.append(proc)
        proc.start()

    resultList = [output.get() for _ in xrange(len(dictionary))]

    #we must wait for the processes to finish before continuing
    for process in startedProcesses:
        process.join()

    print "finished"

If at some point you're modifying your script and don't know anymore howmuch items will be produced, you can use Queue.get with a reasonnable timeout:

def RunInThread(dictionary):
    startedProcesses = list()
    resultList = list()
    output = Queue()
    scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads

    for item in scriptList:
        proc = Process(target=CreateScript, args=(item, output))
        startedProcesses.append(proc)
        proc.start()

    try:
        while True:
            resultList.append(output.get(True, 2)) # block for a 2 seconds timeout, just in case
    except queue.Empty:
        pass # no more items produced

    #we must wait for the processes to finish before continuing
    for process in startedProcesses:
        process.join()

    print "finished"

You might need to adjust the timeout depending on the actual time of the computation in your CreateScript.

301_Moved_Permanently
  • 4,007
  • 14
  • 28
  • I have tried your solution with the list comprehension, could it be that there is deadlock as I am trying to create files with all these threads and some of these files may have already been created? – Johnathon64 Aug 13 '15 at 13:01
  • Your threads might possibly be hanging in your `CreateScript` process. But it’s hard to tell since you shrinked it to `...`. – 301_Moved_Permanently Aug 13 '15 at 13:36