1

I'm using a third-party API to retrieve 10 minute data from a large number of days for different tags. The current data pull can take up to several minutes depending of course of the number of days and number of tags. I'm therefore trying my hand at multi threading which I understand can be useful for heavy IO operations.

The API call goes as follows (I've replaced the actual API name):

import numpy as N 
import requests as r 
import json 
import pandas as pd
from datetime import datetime 
import concurrent.futures

  
class pyGeneric: 
  
    def __init__(self, serverName, apiKey, rootApiUrl='/Generic.Services/api'): 
        """ 
        Initialize a connection to server, and return a pyGeneric server object 
        """ 
        self.baseUrl = serverName + rootApiUrl 
        self.apiKey = apiKey 
        self.bearer = 'Bearer ' + apiKey 
        self.header = {'mediaType':'application/json','Authorization':self.bearer} 
  
    def getRawMeasurementsJson(self, tag, start, end):
        apiQuery = '/measurements/' + tag + '/from/' + start + '/to/' + end + '?format=json' 
        dataresponse = r.get(self.baseUrl+apiQuery, headers=self.header) 
        data = json.loads(dataresponse.text) 
        return data 
                                                               
                                
    def getAggregatesPandas(self, tags, start, end):
        """        
        Return tag(s) in a pandas dataFrame
        """
        df = pd.DataFrame()
        if type(tags) == str:
            tags = [tags]
        for tag in tags:
            tempJson =  self.getRawMeasurementsJson(tag, start, end)
            tempDf = pd.DataFrame(tempJson['timeSeriesList'][0]['timeSeries'])
            name = tempJson['timeSeriesList'][0]['measurementName']
            df['TimeUtc'] = [datetime.fromtimestamp(i/1000) for i in tempDf['t']]
            df['TimeUtc'] = df['TimeUtc'].dt.round('min')
            df[name] = tempDf['v']
        return df
    

gener = pyGeneric('https://api.generic.com', 'auth_keymlkj9789878686')

An example call to the API would be : gener_df = gener.getAggregatesPandas('tag1.10m.SQL', '*-10d', '*')

This works OK for individual tags but for a list this takes longer which is why I've been trying the following:

tags = ['tag1.10m.SQL',
'tag2.10m.SQL',
'tag3.10m.SQL',
'tag4.10m.SQL',
'tag5.10m.SQL',
'tag6.10m.SQL',
'tag7.10m.SQL',
'tag8.10m.SQL',
'tag9.10m.SQL',
'tag10.10m.SQL']

startdate = "*-150d"
enddate = '*'

final_df = pd.DataFrame

with concurrent.futures.ThreadPoolExecutor() as executor:
    args = ((i,startdate, enddate) for i in tags)
    executor.map(lambda p: gener.getAggregatesPandas(*p), args)

However I'm unable to check if the gener.getAggregatesPandas is being properly executed. Ultimately I would like to get the results in a dataframe called final_df but also unsure of how to proceed. I've read in this post that append inside the context manager would lead to quadratic copies of the data frame so ultimately would slow things down.

amphinomos
  • 37
  • 6

2 Answers2

2

You can try the below, It will easily allow you to make a lot of requests in parallel provided the server can handle it as well;

# it's just a wrapper around concurrent.futures ThreadPoolExecutor with a nice tqdm progress bar!
from tqdm.contrib.concurrent import thread_map, process_map # for multi-threading, multi-processing respectively)

def chunk_list(lst, size):
    """
    From SO only; 
    Yield successive n-sized chunks from list.
    """
    for i in range(0, len(lst), size):
        yield lst[i:i + size]

for idx, my_chunk in enumerate(chunk_list(huge_list, size=2**12)):
    for response in thread_map(<which_func_to_call>, my_chunk, max_workers=your_cpu_cores+6)):
        # which_func_to_call -> wrap the returned response json obj in this, etc
        # do something with the response now..
        # make sure to cache the chunk results as well

Edit 1 :

from functools import partial
startdate = "*-150d"
enddate = '*'
my_new_func = partial(which_func_to_call, startdate=startdate, enddate=enddate)

And now we can use this function instead; NB -> my_new_func now accepts a single argument..

Edit 2 :

For caching, I would recommend use the csv module and write the responses you want to a csv file rather than using pandas etc; OR you can dump the JSON response etc as per your need; Sample code for a JSON/dict like response will look like,

import csv
import os

with open(OUTPUT_FILE_NAME, "a+", newline="") as csvfile:
    # fieldnames = [your_headers_list]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    # Make sure you write the header only once as we are opening the file in append mode (writer.writeheader())
    for idx, my_chunk in enumerate(chunk_list(<huge_list>, size=CHUNK_SIZE)):
            for response in thread_map(
                <my_partial_wrapped_func>, my_chunk, max_workers=min(32, os.cpu_count() + 6)
            ):
            # .......
            # .......
            writer.writerow(<row_of_the_csv_as_a_dict_with_fieldnames_as_keys>)
Aditya
  • 2,380
  • 2
  • 14
  • 39
  • Thanks for your reply and the tqdm library, I didn't know about ! I'm not entirely sure how to adapt your code to my question. I understand the chunk_list is a generator that will handle the input list. However actually has three parameters and I don't see how I can import them here. Finally how should I go about caching the chunk results ? – amphinomos Jun 24 '20 at 20:57
  • @amphinomos If your `start_date` and `enddate` are constants, then you can use `functools.partials` as well to create a pseudo function and use that in-place of `< which_function_to_call>`; I have updated the answer, take a look! – Aditya Jun 25 '20 at 02:38
  • Again I appreciate the effort in writing the code, I understand that it's geared more towards parallel processing rather than multi-threading. In all honesty, it's flying above my head and I haven't been able to make it work with the already existing code I have. – amphinomos Jun 27 '20 at 10:38
1

As I understand correctly your need is to understand if getAggregatesPandas executed properly.

you can do it like below.

with concurrent.futures.ThreadPoolExecutor() as executor:
    args = ((i,startdate, enddate) for i in tags)
    results = executor.map(lambda p: gener.getAggregatesPandas(*p), args)
    for result in results:
        final_df.append(result,ignore_index=False)
    #another approach is below
    #for f in concurrent.futures.as_completed(results):
    #     final_df.append(result,ignore_index=False)

REF Video:-video

  • I was able to make this work using: final_df = pd.DataFrame(columns=['TimeUtc']) before the context manager then final_df = pd.merge(final_df, result_df, on = 'TimeUtc', how = 'outer') in the for loop. – amphinomos Jun 27 '20 at 10:41
  • @amphinomos This is precisely what you want to do but this will lead to quadratic copying as you are already aware of; I think i have misread the requirement you have; On re-reading your question, this answer does what you have asked for; Rather than appending your results using pandas, move them to lists and then only once call df.append; The answer i shared, that will help you making // requests to the API much faster and then writing them to a csv file rather than using pandas; Because it's not the merger of the data frame that's slower, it's likely the response rate; That's what my answer – Aditya Jun 27 '20 at 10:47