I am using requests to subscribe to a high frequency data stream. During busy times, there is a significant latency between the timestamp of the messages being sent to me and the timestamp of when I am able to process them in my script. During quiet times, this latency is consistently around 500 ms. During busy times, it rises to over 50 seconds. Here are some more observations:
I looked at the resource usage of my machine during busy times and CPU load and memory hardly rise.
The latency during busy times begins (when I started my script) at around <1s but as the script runs, the latency increases to 50s. Therefore, this latency is not inherent to the sender of the data but to some processing going on in my script. As my script runs, the latency gets higher and higher.
Therefore, I am concluding that the problem is with my processing of the data. Here is what I am doing to process the data.
The function essentially sends dict objects back to a callback for further processing. Each dict is a JSON dict being sent by the streaming API.
def receive_stream(callback):
s = requests.Session()
with s.get(...stream=True) as resp:
buffer = ''
for line in resp.iter_lines():
line = line.decode('utf-8')
json_dict = None
if line == '}':
json_dict = buffer + line
buffer = ''
else:
buffer = buffer + line
if json_dict is not None:
parsed_json = json.loads(json_dict)
if parsed_json['type'] != 'hb':
t = Thread(target=callback, args=(parsed_json))
t.start()
Note: The callback function measures the latency over every 50 messages (takes a mean) or so and calculates it as date time.datetime.now() - the timestamp in the json dict being sent to it.
If I measure the latency in this function above AND remove the callback, it makes little difference -- same observations apply. Therefore, the downstream processing is not the issue (plus I am sending it off to another thread, so it shouldn't be)
My questions:
Is the way I am processing the incoming lines of data inherently inefficient, so that during busy times, there is a big backlog of lines that are unprocessed? Could it be the json.loads() or line.decode() <--- I have to call the latter?
Is the way I am using threads, the problem? I don't think the downstream processing is particularly costly, it just sends a message using zmq and measures latency and removing the callback altogether, makes little difference to this problem. Should I be using a queue?
Thanks