My project has a sensor with a Python driver which collects readings and generates a numpy array, which needs to have some statistical processing done and return one or more processed arrays. Both the raw data and processed data are saved to files. To increase data throughput, we want to process a set of data and have the sensor driver collect the next set concurrently.
The driver is comprised of two classes with one class SensorInterface
that acts as a main interface to the sensor and the other DataCaptureThread
with its main method running in a thread which constantly reads the socket for data. In the high level module, a call to a SensorInterface
method initiates the DataCaptureThread
to record data into the array and save it to a file after a preset duration has elapsed.
My contribution has been to develop the data processor, consisting of a single class DataHandler
which should receive the numpy array after it is generated and perform its routines. After the array is generated, it is no longer important to the driver.
A very abbreviated version of the code is as such (classes are actually contained in different modules):
class DataCaptureThread():
def __init__(self,duration,parameters):
self.duration = 0
self.parameters = parameters
self.capture_flag = False
self.outgoing_data_open_flag = True
self.thread = Threading.thread(target=self.read_data, args=(duration,))
def read_data(self, duration):
if self.capture_flag:
while True:
#Pretend this function gets entire dataset from socket and saves file
self.data_array = ReadSocket()
#Wait for other process to indicate it's ready to receive array
while not self.outgoing_data_open_flag:
continue
class SensorInterface():
def __init__(self,parameters):
self.parameters = parameters
def data_stream_start(self):
#instantiate object of capture thread in this class
self.capture_stream = DataCaptureThread()
def collect_data(duration):
self.capture_stream.duration = duration
self.capture_stream.capture_flag = True
class DataHandler():
def __init__(self):
#flag to indicate that sensor process is not ready to transfer data
self.incoming_data_ready_flag = False
#Flag to indicate that process is currently busy with data, cannot accept more
self.data_full = False
self.data_array = np.array([])
def run_processing(self):
while True:
#Wait until data is ready
while not self.incoming_data_ready_flag:
continue
#set flag to indicate process is busy with data
self.data_full = True
#Pretend this function encapsulates all data processing and saving
DataProcessing(self.data_array)
#Reset flag
self.data_full = False
if __name__ == '__main__':
import multiprocessing as mp
#Create objects
sensor = SensorInterface(params1)
data_processor = DataHandler(params2)
duration = 3.0
#Creating processes for data analysis
#collect_proc = mp.Process(target=sensor.run_processing, args=(duration,)
data_proc = mp.Process(target=data_processor, args=())
#start and join processes
data_proc.start()
data_proc.join()
This is almost psuedocode, and I may have neglected to initialize variables or made slight mistakes.
The intention behind the flags outgoing_data_ready_flag
,data_full
, and incoming_data_ready_flag
is to block the transfer of the array from one process to the other before the receiver is ready.
My question is, how do I communicate the flags and array between the processes? Python's multiprocessing module has been opaque to me, and I have been unable to figure out a good way of communicating these data since both processes are class methods and the flags/data are class attributes.
Note that I would like to avoid excessive change to the structure of the driver classes; they are very complex and time consuming. I have more direct control over the data processing class and the main function.