This is going to be my first question on stack and please bear with me if the question is not detailed enough or missing information....
I have a coroutine task handling real time tick data (including events of disconnections and reconnections) coming into the program and I am collecting these data. I want to be able to save the data when the market data disconnection event is raised but the part of the code for saving data is quite large and time it takes will grow alongside with the amount of data that is stored. This saving data process is actually blocking the reconnection.
I am using a dictionary key:queue.Queue to cache the data where ric is the (400~1000) ric code of the data and the objects are put into the queue with put_no_wait
self.DataGrid[ric]
DataGrid[ric].put_nowait(df)
and saving the data by essentially popping every item from the queue into a dataframe and then saving it using pd.to_csv().
def SaveDataHandler(self):
if len(self.DataGrid) > 0:
self.logger.info('Beginning to save data')
for ric in self.DataGrid.keys():
if self.DataGrid[ric].qsize() > 0:
self.logger.info('Saving data for ric=' + ric)
self.write_file(ric)
self.logger.info('Save data completed')
else:
return
def queue_to_frame(self,ric):
df = pd.DataFrame()
while self.DataGrid[ric].qsize() > 0:
df = df.append(self.DataGrid[ric].get_nowait())
return df
def write_file(self,ric):
df = self.queue_to_frame(ric)
self.store.collection(ric).item(today_date_str).write_data(df,saveIndex=False)
----------------------------------------------------------------------------------------------
def write_data(self,df,saveIndex=False):
if len(df) > 0:
num = len(self.get_files()) + 1
df.to_csv(self.path + str(num), sep="\t", quoting=csv.QUOTE_NONE,index=False)
I have tried using _thread.start_new_thread(SaveDataHandler)
when the market data connection disconnects but the process still blocks the reconnection check which delays the time from connecting the market data which isn't ideal.
So back to my question, is there a way to start a new thread so that I don't run into blocking the coroutine task for checking reconnections? (Or simply I am just doing it wrong and there is a way to just append data to a database or filesystem? I tried pystore but their append function is flawed and can't be used)