9

This seems like a simple issue but I cant get my head around it.

I have a simulation which runs in a double for loop and writes the results to an HDF file. A simple version of this program is shown below:

import tables as pt

a = range(10)
b = range(5)

def Simulation():
    hdf = pt.openFile('simulation.h5',mode='w')
    for ii in a:
        print(ii)
        hdf.createGroup('/','A%s'%ii)
        for i in b:
            hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
        hdf.close()
    return
Simulation()

This code does exactly what I want but since the process can take quite a while to run I tried to use the multiprocessing module and use the following code:

import multiprocessing
import tables as pt

a = range(10)
b = range(5)

def Simulation(ii):
    hdf = pt.openFile('simulation.h5',mode='w')
    print(ii)
        hdf.createGroup('/','A%s'%ii)
        for i in b:
            hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
        hdf.close()
    return

if __name__ == '__main__':
    jobs = []
    for ii in a:
        p = multiprocessing.Process(target=Simulation, args=(ii,))
        jobs.append(p)       
        p.start()

This however only prints the last simulation to the HDF file, somehow it overrites all the other groups.

Guillaume Jacquenot
  • 11,217
  • 6
  • 43
  • 49
user2143958
  • 187
  • 1
  • 2
  • 6

1 Answers1

17

Each time you open a file in write (w) mode, a new file is created -- so the contents of the file is lost if it already exists. Only the last file handle can successfully write to the file. Even if you changed that to append mode, you should not try to write to the same file from multiple processes -- the output will get garbled if two processes try to write at the same time.

Instead, have all the worker processes put output in a queue, and have a single dedicated process (either a subprocess or the main process) handle the output from the queue and write to the file:


import multiprocessing as mp
import tables as pt


num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
sentinel = None


def Simulation(inqueue, output):
    for ii in iter(inqueue.get, sentinel):
        output.put(('createGroup', ('/', 'A%s' % ii)))
        for i in range(num_arrays):
            output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))


def handle_output(output):
    hdf = pt.openFile('simulation.h5', mode='w')
    while True:
        args = output.get()
        if args:
            method, args = args
            getattr(hdf, method)(*args)
        else:
            break
    hdf.close()

if __name__ == '__main__':
    output = mp.Queue()
    inqueue = mp.Queue()
    jobs = []
    proc = mp.Process(target=handle_output, args=(output, ))
    proc.start()
    for i in range(num_processes):
        p = mp.Process(target=Simulation, args=(inqueue, output))
        jobs.append(p)
        p.start()
    for i in range(num_simulations):
        inqueue.put(i)
    for i in range(num_processes):
        # Send the sentinal to tell Simulation to end
        inqueue.put(sentinel)
    for p in jobs:
        p.join()
    output.put(None)
    proc.join()

For comparison, here is a version which uses mp.Pool:

import multiprocessing as mp
import tables as pt


num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000


def Simulation(ii):
    result = []
    result.append(('createGroup', ('/', 'A%s' % ii)))
    for i in range(num_arrays):
        result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
    return result


def handle_output(result):
    hdf = pt.openFile('simulation.h5', mode='a')
    for args in result:
        method, args = args
        getattr(hdf, method)(*args)
    hdf.close()


if __name__ == '__main__':
    # clear the file
    hdf = pt.openFile('simulation.h5', mode='w')
    hdf.close()
    pool = mp.Pool(num_processes)
    for i in range(num_simulations):
        pool.apply_async(Simulation, (i, ), callback=handle_output)
    pool.close()
    pool.join()

It looks simpler doesn't it? However there is one signficant difference. The original code used output.put to send args to handle_output which was running in its own subprocess. handle_output would take args from the output queue and handle them immediately. With the Pool code above, Simulation accumulates a whole bunch of args in result and result is not sent to handle_output until after Simulation returns.

If Simulation takes a long time, there will be a long waiting period while nothing is being written to simulation.h5.

unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • As addition to this question I have used the code above with success but am now expandign this simulation, The for loop defined by a = range(1000) and also the for loop defined by b = range (100). This howerver results in an extensive use of my memory. I have 8 CPU with 16 Gb RAM but when I run the file (even without the real simulations) my RAM usage goes to 100% which results my system to stall. – user2143958 Apr 11 '13 at 07:17
  • I think we need to separate the number of subprocesses from the number of tasks. It sounds like you want 1000 tasks, but probably not 1000 subprocesses. I'll edit the post to suggest a way you could do that. – unutbu Apr 11 '13 at 08:35
  • Yes you are right , in the previous example for large iterations a equally large amount of subprocesses were created clogging all the memory. The file you edited works perfect! But just for clarification, I was also experimenting with the Pool() function and this function seems to work quite good as well although it gets harder when more than one variable needs to be passed. What is the main reason to choose the Process() function over the Pool() function? – user2143958 Apr 11 '13 at 09:25
  • 2
    Pool is a higher-level setup, while Process gives you bare-bones control. Pool has features like a [timeout parameter](http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult.get) which can be used to terminate tasks which are taking too long. The `apply_async` method also has a [callback parameter](http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.apply_async) which can be used to run code in the calling process whenever a task ends. You could use that instead of making `handle_output` a subprocess. – unutbu Apr 11 '13 at 10:10
  • 1
    If you need these features or just like the syntax better, use a `Pool`. I'm kind of ambivalent. If the Pool has features I need, I use a Pool. But sometimes I like just using Processes and Queues. – unutbu Apr 11 '13 at 10:10
  • how to pass multiple arguments to handle_output? – Eliethesaiyan Jun 19 '16 at 09:26
  • @Eliethesaiyan: The callback, `handle_output` is called by `pool.apply_async` (or, more accurately, the `_handle_results` thread) always with one and only one argument. To pass more information to `handle_output`, return a *tuple of values* from `Simulation`. The tuple (return value) will be passed to `handle_result`. You can then unpack the tuple inside `handle_result`. – unutbu Jun 19 '16 at 10:58
  • @unutbu thanks,for some reason i cant get the call back function to work ,i would appreciate if you could take a look http://stackoverflow.com/questions/37907350/apply-async-callback-function-not-being-called – Eliethesaiyan Jun 19 '16 at 12:46
  • upvoted! this looks like a nice setup if you are doing custom multiprocessing/threading, any ideas for celery? – PirateApp Apr 30 '18 at 03:43
  • 1
    @PirateApp: I'm not very familiar with celery. From what I understand, it is used for [farming out asynchronous tasks across a *distributed cluster*](https://stackoverflow.com/a/16234656/190597) and has [many features](http://docs.celeryproject.org/en/master/getting-started/introduction.html#features) not implemented above. – unutbu Apr 30 '18 at 07:17