0

This is going to be my first question on stack and please bear with me if the question is not detailed enough or missing information....

I have a coroutine task handling real time tick data (including events of disconnections and reconnections) coming into the program and I am collecting these data. I want to be able to save the data when the market data disconnection event is raised but the part of the code for saving data is quite large and time it takes will grow alongside with the amount of data that is stored. This saving data process is actually blocking the reconnection.

I am using a dictionary key:queue.Queue to cache the data where ric is the (400~1000) ric code of the data and the objects are put into the queue with put_no_wait

self.DataGrid[ric]
DataGrid[ric].put_nowait(df)

and saving the data by essentially popping every item from the queue into a dataframe and then saving it using pd.to_csv().

    def SaveDataHandler(self):
        if len(self.DataGrid) > 0:
            self.logger.info('Beginning to save data')
            for ric in self.DataGrid.keys():
                if self.DataGrid[ric].qsize() > 0:
                    self.logger.info('Saving data for ric=' + ric)
                    self.write_file(ric)
            self.logger.info('Save data completed')
        else:
            return

    def queue_to_frame(self,ric):
        df = pd.DataFrame()
        while self.DataGrid[ric].qsize() > 0:
            df = df.append(self.DataGrid[ric].get_nowait())
        return df

    def write_file(self,ric):
        df = self.queue_to_frame(ric)
        self.store.collection(ric).item(today_date_str).write_data(df,saveIndex=False)

----------------------------------------------------------------------------------------------

    def write_data(self,df,saveIndex=False):
        if len(df) > 0:
            num = len(self.get_files()) + 1
            df.to_csv(self.path + str(num), sep="\t", quoting=csv.QUOTE_NONE,index=False)

I have tried using _thread.start_new_thread(SaveDataHandler) when the market data connection disconnects but the process still blocks the reconnection check which delays the time from connecting the market data which isn't ideal.

So back to my question, is there a way to start a new thread so that I don't run into blocking the coroutine task for checking reconnections? (Or simply I am just doing it wrong and there is a way to just append data to a database or filesystem? I tried pystore but their append function is flawed and can't be used)

  • Maybe ? https://stackoverflow.com/questions/29296064/python-asyncio-how-to-create-and-cancel-tasks-from-another-thread – Russ J Dec 09 '20 at 03:36
  • I am note sure yet because the object for handling the tick data and connections are actually instantiated within the asyncio event loop. It seems I need to move the savehandler away from the market data object class and then pass the dictionary to the savehandler. Giving it a try. – K' Chau Dec 09 '20 at 04:01
  • In the end I upgraded to python 3.9.1 and there is a nice built-in in asyncio that allows to create a new thread outside of the coroutine. [https://docs.python.org/3/library/asyncio-task.html#running-in-threads](https://docs.python.org/3/library/asyncio-task.html#running-in-threads) – K' Chau Dec 14 '20 at 03:46

0 Answers0