I have a simulation running inside a class "Simulation", and a class "DataRecorder" in charge of saving data on disk (after several manipulations). Here is a simplified mockup:
class DataRecorder(object):
"""
Fill in an internal buffer, and flush it on disk when it reaches
a given data amount.
"""
_f = r'n:\99-tmp\test_async\toto.txt'
_events_buffer = []
_events_buffer_limit = 10
flushing_cpt = 0
def __init__(self):
with open(self._f, 'w') as fh:
fh.write('new sim')
def save_event(self, ix, val):
""" append data to internal buffer, and flush it when limit is reached
"""
if len(self._events_buffer)>self._events_buffer_limit:
self._flush_events_buffer()
self.flushing_cpt += 1
self._events_buffer.append((ix, val))
def _flush_events_buffer(self):
""" write bunch of data on disk """
# here, in reality, deal with numpy arrays and HDF5 file
buf = [str(i) for i in self._events_buffer]
_s = '\n'.join(buf)
with open(self._f, 'a') as fh:
fh.write(_s)
self._events_buffer = []
def stop_records(self):
self._flush_events_buffer()
class Simulation(object):
def __init__(self):
self.dr = DataRecorder()
def run(self, nb=10000):
""" long-term simulation (could be 10min calculations generating about 1Gb of data) """
for ix in range(nb):
sol = ix * 3.14
self.dr.save_event(ix, sol)
self.dr.stop_records()
if __name__ == '__main__':
sim = Simulation()
sim.run()
Although this works quite well, disk IO is currently my bottleneck since DataRecorder
stops simulation, time for it to dump data on the disk (HDF5 file) each time the buffer is full.
My aim is to transform DataRecorder
into an asynchronous class, writing on disk in background and letting simulation continue to run while filling a data buffer.
I'm not (from far) a multiprocessing super-hero, and here is my first failing attempt using pool
:
I got inspiration from Write data to disk in Python as a background process
and also tried Queue
from
Solving embarassingly parallel problems using Python multiprocessing
class MPDataRecorder(object):
_f = r'n:\99-tmp\test_async\toto_mp.txt'
_events_buffer = []
_events_buffer_limit = 10
flushing_cpt = 0
numprocs = mp.cpu_count()
def __init__(self):
with open(self._f, 'w') as fh:
fh.write('new sim')
self.record = True
self.pool = mp.Pool()
self._watch_buffer()
def save_event(self, ix, val):
""" append data to internal buffer, and flush it when limit is reached
"""
self._events_buffer.append((ix, val))
def _flush_events_buffer(self):
""" write bunch of data on disk """
# here, in reality, deal with numpy arrays and HDF5 file
buf = [str(i) for i in self._events_buffer]
_s = '\n'.join(buf)
with open(self._f, 'a') as fh:
fh.write(_s)
self._events_buffer = []
def _watch_buffer(self):
# here, in reality, deal with numpy arrays and HDF5 file
while self.record:
self.pool.apply_async(self._flush_events_buffer)
def stop_records(self):
self.record = False
self.pool.close()
self.pool.join()
This leads to following TraceBack, then Memory Error:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
Any chance to encapsulate such an asynchronous data writer feature in a generic class ?