1

I have a simple example problem that I am struggling with in Python. I am using multiprocess to execute an example where the function "Thread_Test()" will generate a uniform array of random numbers over the interval of 0 to 1, with "Sample_Size" number of data points in the array. Once I get this example down, I plan on generating multiple copies of the process in an attempt to accelerate code execution and then I will be putting a much more complex set of calculations in Thread_Test(). This example works fine as long as I keep the Sample_Size below 9,000. The execution time increases as I increase the Sample_Size from 10 to 8,000, but at 8,000 the code only takes 0.01 seconds to execute. However, as soon as I increase the Sample_Size to 9,000, the code just goes on in execution forever and never finishes the calculation. What is causing this?

from multiprocessing import Process, Queue
import queue
import random
import timeit
import numpy as np

def Thread_Test(Sample_Size):
    q.put(np.random.uniform(0,1,Sample_Size))
    return

if __name__ == "__main__":
    Sample_Size = 9000
    q = Queue()
    start = timeit.default_timer()
    p = Process(target=Thread_Test,args=(Sample_Size,))
    p.start()
    p.join()

    result = np.array([])
    while True:
        if not q.empty():
         result = np.append(result,q.get())
        else:
           break
    print (result)

    stop = timeit.default_timer()
    print ('{}{:4.2f}{}'.format("Computer Time: ", stop-start, " seconds"))
Jon
  • 1,621
  • 5
  • 23
  • 46
  • If you supply a timeout to the `join` function, it will return. This doesn't address the underlying issue though. – autodidacticon Mar 10 '17 at 06:35
  • Possible duplicate of [python multiprocessing - process hangs on join for large queue](http://stackoverflow.com/questions/21641887/python-multiprocessing-process-hangs-on-join-for-large-queue) – autodidacticon Mar 10 '17 at 06:37

1 Answers1

1

The issue happened because if you put sth in Queue (producer as you see) in sub process, you must promise main process (consumer) gets element at the same time. Otherwise, main process will wait in "p.join()", while sub process waits in "Queue.put" because too many elem in the queue and no consumer to make more room for new elem.

As doc here:

Bear in mind that a process that has put items in a queue will wait before terminating until 
all the buffered items are fed by the “feeder” thread to the underlying pipe

So in simple word, you need to call "get part" before "p.join()".

If you worry about main process exits before sub process works, you may change code a little like below:

while True:
    # check subprocess running before break
    running = p.is_alive()
    if not q.empty():
        result = np.append(result,q.get())
    else:
        if not running:
            break

The whole part likes below:

def Thread_Test(q, Sample_Size):
    q.put(np.random.uniform(0,1,Sample_Size))


if __name__ == "__main__":
    Sample_Size = 9000
    q = Queue()
    start = timeit.default_timer()
    p = Process(target=Thread_Test,args=(q, Sample_Size,))
    p.daemon = True
    p.start()

    result = np.array([])
    while True:
        running = p.is_alive()
        if not q.empty():
            result = np.append(result,q.get())
        else:
            if not running:
                break
    p.join()
    stop = timeit.default_timer()
    print ('{}{:4.2f}{}'.format("Computer Time: ", stop-start, " seconds"))
linpingta
  • 2,324
  • 2
  • 18
  • 36