15

I'm having the following problem in python.

I need to do some calculations in parallel whose results I need to be written sequentially in a file. So I created a function that receives a multiprocessing.Queue and a file handle, do the calculation and print the result in the file:

import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation   

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file

def work(queue, fh):
while True:
    try:
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
    except:
        break


if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fh = open("foo", "w")
    workQueue = Queue()
    parList = # list of conditions for which I want to run doCalculation()
    for x in parList:
        workQueue.put(x)
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
    for p in processes:
       p.start()
    for p in processes:
       p.join()
    fh.close()

But the file ends up empty after the script runs. I tried to change the worker() function to:

def work(queue, filename):
while True:
    try:
        fh = open(filename, "a")
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
        fh.close()
    except:
        break

and pass the filename as parameter. Then it works as I intended. When I try to do the same thing sequentially, without multiprocessing, it also works normally.

Why it didn't worked in the first version? I can't see the problem.

Also: can I guarantee that two processes won't try to write the file simultaneously?


EDIT:

Thanks. I got it now. This is the working version:

import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform

def doCalculation(par):
    t = uniform(0,2)
    sleep(t)
    return par * par  # just to simulate some calculation

def feed(queue, parlist):
    for par in parlist:
            queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "dealing with ", par, "" 
            res = doCalculation(par)
            queueOut.put((par,res))
        except:
            break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
        try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
        except:
            break
    fhandle.close()

if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fname = "foo"
    workerQueue = Queue()
    writerQueue = Queue()
    parlist = [1,2,3,4,5,6,7,8,9,10]
    feedProc = Process(target = feed , args = (workerQueue, parlist))
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target = write, args = (writerQueue, fname))


    feedProc.start()
    for p in calcProc:
        p.start()
    writProc.start()

    feedProc.join ()
    for p in calcProc:
        p.join()
    writProc.join ()
Ryan M
  • 50
  • 7
Rafael S. Calsaverini
  • 13,582
  • 19
  • 75
  • 132
  • 3
    Please focus. One set of code **only**. Please remove obsolete or irrelevant code. Please avoid using "Edit". Just get the question to be perfectly clear, complete and consistent, please. – S.Lott Jun 29 '11 at 18:16

3 Answers3

20

You really should use two queues and three separate kinds of processing.

  1. Put stuff into Queue #1.

  2. Get stuff out of Queue #1 and do calculations, putting stuff in Queue #2. You can have many of these, since they get from one queue and put into another queue safely.

  3. Get stuff out of Queue #2 and write it to a file. You must have exactly 1 of these and no more. It "owns" the file, guarantees atomic access, and absolutely assures that the file is written cleanly and consistently.

S.Lott
  • 384,516
  • 81
  • 508
  • 779
  • 1
    +1 for worker and consumer queues. Remember to set a maxsize on the queue or your workers may eat your memory and starve the writer. – Bittrance Jun 29 '11 at 17:29
  • 1
    Oh nevermind about the multiple runs... I'm stupid enough not to notice that I launched the feedProc and writProc multiple times. ¬¬ I corrected the code. But I still got an empty file. – Rafael S. Calsaverini Jun 29 '11 at 18:12
  • Ok. I think I got it. I launched the process who would write in the file before I had any results... it works now. Thanks. – Rafael S. Calsaverini Jun 29 '11 at 18:21
9

If anyone is looking for a simple way to do the same, this can help you. I don't think there are any disadvantages to doing it in this way. If there are, please let me know.

import multiprocessing 
import re

def mp_worker(item):
    # Do something
    return item, count

def mp_handler():
    cpus = multiprocessing.cpu_count()
    p = multiprocessing.Pool(cpus)
    # The below 2 lines populate the list. This listX will later be accessed parallely. This can be replaced as long as listX is passed on to the next step.
    with open('ExampleFile.txt') as f:
        listX = [line for line in (l.strip() for l in f) if line]
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, listX):
            # (item, count) tuples from worker
            f.write('%s: %d\n' % result)

if __name__=='__main__':
    mp_handler()

Source: Python: Writing to a single file with queue while using multiprocessing Pool

Community
  • 1
  • 1
Menezes Sousa
  • 1,448
  • 2
  • 18
  • 18
0

There is a mistake in the write worker code, if the block is false, the worker will never get any data. Should be as follows:

par, res = queue.get(block = True)

You can check it by adding line

 print "QSize",queueOut.qsize()

after the queueOut.put((par,res))

With block=False you would be getting ever increasing length of the queue until it fills up, unlike with block=True where you get always "1".