0

I have a stream of real time data using requests:

request = requests.get($URL,stream=True)
stream = request.iter_lines()

We can assume next(stream) returns something like

"alice"
"charlie"
"bob"
"charlie"
...

Every n seconds, I want to bin all the data. Then I want to use data as a parameter for some algorithm foo that takes nonzero time. Assume foo is separate and doesn't affect stream at all.

If foo took a trivial amount of time to execute, or if I didn't care about missing inputs from stream, this would be easy:

while True:
    time0 = time.time() + n
    data = {}
    for name in stream:
        data[name] = data.get(name,0) + 1
        if time.time() > time0:
            break
    foo(data) #could take a while

But I don't want to miss a single entry of stream. I also want to prevent errors if foo happens to take longer than n seconds. How do I achieve this? I assume I need to use concurrent.futures, but maybe it's possible using another method.

Edit: the temporal resolution of stream is very small, with dozens of outputs per second. This makes my problem distinct from similar questions that involve a coarser resolution, say 1 output per second.

ant11
  • 101
  • 3
  • 2
    If `foo` takes longer than `n` seconds, you're screwed. No matter how you process it, you will get farther and farther behind until Python runs out of memory. One possible solution is to have your `requests` stuff in a different thread, and have it feed output into a `queue`, which your processing loop reads from. Remember, however, that if `foo` is pure Python code, the PIL is going to interfere with the web requests. – Tim Roberts May 12 '21 at 00:45
  • Currently I'm not too worried about the case where `foo` takes longer than n seconds. How would I set up a queue to have these two things happen? – ant11 May 12 '21 at 02:28
  • 1
    You use `Queue.put` to add something to the queue, and `Queue.get` to remove something. Any object. It's guaranteed thread-safe. `Queue.get` will block if there's nothing there, so you get synchronization, too. – Tim Roberts May 12 '21 at 02:48
  • I have tried adapting two methods from this post: https://stackoverflow.com/questions/41648103 However they don't seem to work since my tasks depend on a collection/binning of outputs instead of a single one. I'm not sure what modifications to make. – ant11 May 12 '21 at 19:25
  • 1
    Well, if you have to have the results in order, then it's not clear how much parallelization you can really do. You could pull results from the queue and sort them into bins, and when you have a "complete" collection, then process it. Maybe. – Tim Roberts May 12 '21 at 19:30

0 Answers0