0

I am using python multiprocessing library to process information within a set of processes. These processes also contain processes that further divide the amount of work that has to be done. There is a single Manager.Queue that accumulates the results from all of the processes that are consuming the data.

In the main thread of the python script. I tried to use the join to block the main thread until we can reasonably determine if all the sub-processes were completed then write the output to a single file. However the system terminates and the file closes before all of the data is written to the file.

The following code is a simplified extraction of the implementation of the above described solution. for queue in inQueues: queue.join()

for p in processes:
    p.join() 
print "At the end output has: " + str(out_queue.qsize()) + " records" 

with open("results.csv", "w") as out_file:
    out_file.write("Algorithm,result\n")
    while not out_queue.empty():
        res = out_queue.get()
        out_file.write(res['algorithm'] + ","+res['result']+"\n")
        out_queue.task_done()
        time.sleep(0.05)
    out_queue.join()
    out_file.close()

The out_queue.qsize() will print an excess of 500 records available, however only 100 will be printed to the file. Also at this point I am not 100% certain if 500 records are the total generated by the system, but just the number reported at this point.

How do I ensure that all the results are written to the results.csv file?

kyleED
  • 2,327
  • 2
  • 18
  • 23
  • [qsize()](http://bugs.python.org/issue17985): "Return the approximate size of the queue. Because of multithreading/multiprocessing semantics, this number is not reliable." – Kijewski Oct 14 '15 at 14:35
  • I am aware that the size of the queue as indicated by the qsize method can change, however the section of code is the only part of the entire program that removes from the queue, so it is not expected that the number of records printed will be less than the size of the queue (which is what currently occurs). – kyleED Oct 14 '15 at 17:32

1 Answers1

0

Do not wait for all processes to finish before you consume the data, but process the data at the same time and memorize which processes are still running:

processes = []

"""start processes and append them to processes"""

while True:
    try:
        # get an item
        item = queue.get(True, 0.5)
    except Queue.Empty:
        # no item received in half a second
        if not processes:
            # there are no more processes and nothing left to process
            break
        else:
            proc_num = 0
            while proc_num < len(processes):
                process = processes[proc_num]
                exit_code = process.poll()
                if exit_code is None:
                    # process is still running, proceed to next
                    proc_num += 1
                elif exit_code == 0:
                    # process ended gracefully, remove it from list
                    processes.pop(proc_num)
                else:
                    # process ended with an error, what now?
                    raise Exception('Her last words were: "%r"' % exit_code)
    else:
        # got an item
        """process item"""

Do not test if processes is empty outside of the Queue.Empty case or you will have races.

But probably you would be happier with a higher level function:

pool = multiprocessing.Pool(8)
items = pool.map_async(producer_function, producer_arguments)
for item in items:
    """process item"""
Community
  • 1
  • 1
Kijewski
  • 25,517
  • 12
  • 101
  • 143