0

I have a simulation running inside a class "Simulation", and a class "DataRecorder" in charge of saving data on disk (after several manipulations). Here is a simplified mockup:

class DataRecorder(object):
    """
    Fill in an internal buffer, and flush it on disk when it reaches
    a given data amount.
    """
    _f = r'n:\99-tmp\test_async\toto.txt'
    _events_buffer = []
    _events_buffer_limit = 10    
    flushing_cpt = 0        
    def __init__(self):
        with open(self._f, 'w') as fh:
            fh.write('new sim')

    def save_event(self, ix, val):
        """ append data to internal buffer, and flush it when limit is reached
        """   
        if len(self._events_buffer)>self._events_buffer_limit:
            self._flush_events_buffer()
            self.flushing_cpt += 1
        self._events_buffer.append((ix, val))

    def _flush_events_buffer(self): 
        """ write bunch of data on disk """
        # here, in reality, deal with numpy arrays and HDF5 file 
        buf = [str(i) for i in self._events_buffer]
        _s = '\n'.join(buf)
        with open(self._f, 'a') as fh:
            fh.write(_s)
        self._events_buffer = []

    def stop_records(self):
        self._flush_events_buffer()


class Simulation(object):
    def __init__(self):      
        self.dr = DataRecorder()

    def run(self, nb=10000):
        """ long-term simulation (could be 10min calculations generating about 1Gb of data) """
        for ix in range(nb):
            sol = ix * 3.14
            self.dr.save_event(ix, sol)
        self.dr.stop_records()

if __name__ == '__main__':
    sim = Simulation()    
    sim.run()

Although this works quite well, disk IO is currently my bottleneck since DataRecorder stops simulation, time for it to dump data on the disk (HDF5 file) each time the buffer is full.

My aim is to transform DataRecorder into an asynchronous class, writing on disk in background and letting simulation continue to run while filling a data buffer.

I'm not (from far) a multiprocessing super-hero, and here is my first failing attempt using pool:

I got inspiration from Write data to disk in Python as a background process and also tried Queue from Solving embarassingly parallel problems using Python multiprocessing

class MPDataRecorder(object):
    _f = r'n:\99-tmp\test_async\toto_mp.txt'
    _events_buffer = []
    _events_buffer_limit = 10 
    flushing_cpt = 0    
    numprocs = mp.cpu_count() 

    def __init__(self):
        with open(self._f, 'w') as fh:
            fh.write('new sim')
        self.record = True    
        self.pool = mp.Pool() 
        self._watch_buffer()                                 

    def save_event(self, ix, val):
        """ append data to internal buffer, and flush it when limit is reached
        """   
        self._events_buffer.append((ix, val))

    def _flush_events_buffer(self): 
        """ write bunch of data on disk """
        # here, in reality, deal with numpy arrays and HDF5 file 
        buf = [str(i) for i in self._events_buffer]
        _s = '\n'.join(buf)
        with open(self._f, 'a') as fh:
            fh.write(_s)
        self._events_buffer = []

    def _watch_buffer(self):        
        # here, in reality, deal with numpy arrays and HDF5 file 
        while self.record:
            self.pool.apply_async(self._flush_events_buffer)



    def stop_records(self):
        self.record = False
        self.pool.close()
        self.pool.join()

This leads to following TraceBack, then Memory Error:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed         

Any chance to encapsulate such an asynchronous data writer feature in a generic class ?

Community
  • 1
  • 1
Nic
  • 3,365
  • 3
  • 20
  • 31
  • This may be useful: http://stackoverflow.com/questions/21375509/in-a-pickle-how-to-serialise-legacy-objects-for-submission-to-a-python-multipro?lq=1 – jonrsharpe Feb 16 '14 at 10:17
  • What did you try with `Queue` and how was it? – Janne Karila Feb 17 '14 at 14:17
  • Hi @Karila! I do not have anymore this first code, but I used this [post](http://stackoverflow.com/a/2364667/2069099) to write it. As soon as I have time, I'll try to retry in this direction. – Nic Feb 17 '14 at 15:31

1 Answers1

0

If your disk I/O is the bottleneck, no amount of clever buffering will solve the problem that you have to keep the entire output in memory. If the disk writer cannot keep up, how will it ever "catch up" with your simulation process?

However, if this is just a problem during some intensive "peaks", you might solve your problem with buffering. Before trying anything more fancy, I would recommend at least starting with a very simple solution: use two separate processes and pipe output between them. The simplest way in Python is to use the subprocess module. A more beautiful solution might be to use a framework around it, like Parallel python (but I cannot vouch for it since I never did anything more than toying with it).

Krumelur
  • 31,081
  • 7
  • 77
  • 119
  • Sure! My point is the following: pure calculations take, let say, 20 sec. to run, then, writing on disk is about 10sec. Total is thus 30sec. The key-point is that I write on disk all the intermediate data (that won't fit in memory), so, If this could be done in a parallel process, I could save thos 10sec overhead. – Nic Feb 16 '14 at 10:29