I have a stream of real time data using requests
:
request = requests.get($URL,stream=True)
stream = request.iter_lines()
We can assume next(stream)
returns something like
"alice"
"charlie"
"bob"
"charlie"
...
Every n
seconds, I want to bin all the data. Then I want to use data
as a parameter for some algorithm foo
that takes nonzero time. Assume foo
is separate and doesn't affect stream
at all.
If foo
took a trivial amount of time to execute, or if I didn't care about missing inputs from stream
, this would be easy:
while True:
time0 = time.time() + n
data = {}
for name in stream:
data[name] = data.get(name,0) + 1
if time.time() > time0:
break
foo(data) #could take a while
But I don't want to miss a single entry of stream
. I also want to prevent errors if foo
happens to take longer than n
seconds. How do I achieve this? I assume I need to use concurrent.futures
, but maybe it's possible using another method.
Edit: the temporal resolution of stream
is very small, with dozens of outputs per second. This makes my problem distinct from similar questions that involve a coarser resolution, say 1 output per second.