0

I'm a bit of a Python newby here. I'm working on code that will request JSON data from a web URL, continue to update the request until data through a certain timeframe is reached, save all data to a file (it could be millions of lines so I'm trying to keep it out of memory), the compress the data to a single line of a CSV file after a statistical analysis. I've got that section of code down, but the program loops through a list of several thousand names that are used in a variable to call the URL. If I run it in a single loop, it takes longer than my timeframe and the program continues to fall behind.

I've attempted to run this as either an asyncio loop using ThreadPoolExecutor and as a pool with dozens of workers. I'm able to use substantially more threads than I have available processor cores because the bulk of the time is spent waiting on URL responses, which opens threads to make new requests.

That said, I can't get any form of a pool or loop to continue past a single iteration of the while loop. The code looks something like this:

variables = ['thousands', 'of', 'variables']
interval = 15 # in minutes

class DoSomething() 
    def dosomething(self, variable, date, initialtime, interval):
        callweburl(variable, date, initialtime, interval)
        runstatistics
        saveCSV


def worker(variable)
    try: 
        ds = DoSomething(variable, date, initialtime, interval).dosomething()
        api.ds(variable)
    except:
        return False
pool = Pool(100)

program to get date, initialtime and currenttime
while initialtime < currenttime:
    while initialtime < initialtime * multiple of interval
        if __name__ == '__main__'
            for variable in variables:
                pool.apply_async(worker, (variable,))
        initialtime = initialtime + interval
    program to get date, new initialtime and currenttime
    time_to_pause = initialtime - currenttime + interval
    if time_to_pause > 0.0:
        time.sleep(time_to_pause)

The loops run fine when I'm replacing the apply_async call with DoSomething(variable, date, initialtime, interval).dosomething(). When I run them with either a pool or loop, they become sporadic at best. Depending on where I place pool.close() and pool.join(), they either run for a single loop and close the program or will be all over the board for the intervals the pool is collecting for. Sometimes it will collect data for the same time interval twice and other times it will skip ahead by days at a time.

Is there a way to close out a loop or pool and reinitialize it? I've also tried moving the pool or loop initialization to before the while loops are called. Nothing seems to work quite right.

Thanks in advance for any help!

  • This seems to be a borderline [XY problem](http://xyproblem.info/) with a bit of confusion on your end regarding [`asyncio`](https://docs.python.org/3.8/library/asyncio.html) versus [`multithreading`](https://docs.python.org/3.8/library/multiprocessing.html) (by the tags). The question lacks some fundamental details to be able to adequately help. [Please ensure you are including reproducible code](https://stackoverflow.com/help/how-to-ask) so that we can run it, and debug it. – felipe Nov 19 '19 at 15:06
  • ```def tradehistory(self): self.histfile="C:\\PythonStockCode\\StockData\\TEMP\\"+str(self.symbols)+"_temp.csv" offset=self.starttimestamp - 1 counter=self.starttimestamp - 1 while offset < self.starttimestamp + self.interval: if counter > self.starttimestamp + self.interval: break offset=int(counter)''' – Robert Petrowsky Nov 20 '19 at 01:44
  • '''self.base_url='https://api.polygon.io/v1/historic/trades/'+str(self.symbols)+'/'+str(self.date)+'?offset='+str(offset)+'&limit=1000&apiKey=' self.ht = requests.get(self.base_url,) self.historic_trade=json.loads(self.ht.text) self.historic_trade=self.historic_trade['ticks'] i=0''' – Robert Petrowsky Nov 20 '19 at 01:44
  • '''for i in range(len(self.historic_trade)): for x in range(len(self.historic_trade)): if x==len(self.historic_trade) - 1: last=self.historic_trade[i] counter=last['t']''' – Robert Petrowsky Nov 20 '19 at 01:45
  • I cut a substantial amount of the code to make it fit. For obvious reasons, I'm hesitant to give my secret key to an API. `symbols = ['AAPL', 'GE']` The reason I didn't include the code to get all of the time is it would be another 50 or so lines of code and another secret key to include. For testing purposes, `date=2017-06-02` `starttimestamp=1496410200000` and `interval = 15 * 60000` Apparently I can only edit a comment for 5 minutes, so I can't change my ''' to ``` above to make it appear as code. – Robert Petrowsky Nov 20 '19 at 01:49
  • You can edit the post above by just clicking the `edit` button. Instead of posting your whole code, perhaps replicating your problem in a simpler context might be helpful. Use stuff like `time.sleep(int)` to simulate long actions. – felipe Nov 20 '19 at 01:56
  • I would, however, recommend you looking into different tools for your task altogether. Instead of multithreading, I would recommend writing a purely asynchronous solution ([multithreading vs asynchronous execution](https://stackoverflow.com/a/34681101/1305461)). Utilizing [`asyncio`](https://docs.python.org/3/library/asyncio.html) to execute asynchronous code, [`aiohttp`](https://aiohttp.readthedocs.io/en/stable/) to send and receive requests from external sources, and [`aiofiles`](https://github.com/Tinche/aiofiles) for saving the processed files. – felipe Nov 20 '19 at 02:01
  • What you are really missing is `aiohttp` (linked above) to speed up the processing power of your task. Scroll down to the bottom of a [previous answer](https://stackoverflow.com/a/58920865/1305461) I gave to see an example of `aiohttp` being used to pull 100+ websites from all over the world in less than 4 seconds (running from Miami, Florida. latency matters as some of the servers are from China, Japan, Australia, etc.) – felipe Nov 20 '19 at 02:05
  • Thanks! i'll look more into aiohttp. I don't really know a good substitute for the whole code right now. The problem is that I get a list of trades that may or may not have the total time range I'm looking for. I have to loop each individual request until I get the total amount. Even deleting the JSON data immediately after it is saved, I end up with enormous memory usage, so I'm concerned there too. I'll see if I can find a way to make the aiohttp tools work. – Robert Petrowsky Nov 21 '19 at 02:14
  • Oh, I see. If memory consumption is an issue, I would recommend instead using [`pandas`](https://pandas.pydata.org/) to save your data -- it should be able to handle extremely large data sets without much issue. You can load and append `.json` files easily to the [`DataFrame`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) (scroll down for docs). Regarding the total time range, can you calculate, from the a single independent request, the time range for the preceding requests? – felipe Nov 21 '19 at 13:50
  • I've worked on a problem almost like this one, and I calculated the timestamp difference of the first request (`end_timestamp - beginning_timestamp`) and essentially generated all the endpoints needed to reach my desired `end_time`. Here is the function, in case it might be of [help](https://github.com/Waultics/CryptoBook/blob/master/CryptoBook-py/CryptoBook/utils.py#L53-L183). You'll need to familiarize yourself a bit with `pandas`, and I should note `ex.fetch_ohlcv()` returns a `DataFrame`. It's very well documented otherwise, so you should be able to get some ideas from it. – felipe Nov 21 '19 at 13:52
  • Thanks for the added comments. I'm actually using pandas and numpy once I get the data. I think the big issue is the time it takes to get a response and the fact that I have to iterate through many symbols several times to get all data. I'm still looking into aiohtp to see where I can cut time out. – Robert Petrowsky Nov 22 '19 at 05:31

0 Answers0