0

I have a method that fetches some data from an API using IDs that are passed to it. Before this happens, the IDs are placed in a queue so that multiple (up to 100) can be used per API call. API calls are carried out by a method called flush_queue, which must be called at the end of my program to guarantee that all of the IDs that were added to the queue will have been used. I wrote an async function that takes in an ID, creates a future that flush_queue will eventually set the result of, and then returns the data obtained for that ID when it's available:

async def get_data(self, id):
    self.queued_ids.append(id)
    self.results[id] = asyncio.get_running_loop().create_future()

    if len(self.queued_ids) == 100:
        await self.flush_queue()

    return await self.results[id]

(The future created and placed in the results dict will have the data obtained from the API that corresponds to that ID set as its result by flush_queue so that the data can be returned by the above method.) This works well for the case where the queue is flushed because there are enough IDs to make an API request; the problem is, I need to ensure that each user id is added to the queue before I call flush_queue at the completion of my program. Therefore, I need each call to get_data to have started before I do that; however, I can't wait for each coroutine object returned by each call to finish, because they won't finish until after flush_queue completes. Is there any way to ensure that a series of coroutines has started (but not necessarily finished) before doing something like calling flush_queue?

  • I read your essay twice, and still not clear what is it you’re looking for. First of all, why are you creating a future with the ids you already have in hand? Second, although your code doesn’t indicate so, my guess is that `flush_queue` Is to use all the items in the queue, but you don’t know if there’re more items to be added yet. Is that right? – Abhijit Sarkar Nov 26 '20 at 00:21
  • @AbhijitSarkar I'm creating a future because when flush_queue is called and it obtains the results of the API call, it finds the future corresponding to each ID and calls set_result with the data obtained from the API so that it can be returned by get_data. flush_queue is to use all items in the queue, but when I call it at the end of the program to ensure that all currently queued items will be used before the program finishes, I do know that there are no more items to be added. – giantpredatorymollusk Nov 26 '20 at 00:29
  • 1
    Although creating future of ids seems unnecessary, that aside, can you not increment a counter as soon as you enter `get_data` and wait for it to become zero? Basically, use a counting semaphore. – Abhijit Sarkar Nov 26 '20 at 00:32
  • @AbhijitSarkar I am curious as to how you would send back the data obtained from the API when it comes in for 100 ids at once. That said, the counting semaphore idea is perfect, I can simply suspend my main program until get_data has been entered as many times as it has been called, thank you! – giantpredatorymollusk Nov 26 '20 at 00:50
  • _how you would send back the data obtained from the API_ Let's not conflate the issue. If you'd like, you may open a new question about that. I'll post my previous comment as an answer for you here. – Abhijit Sarkar Nov 26 '20 at 00:52

1 Answers1

0

As discussed in the comments, just use a countdown latch/counting semaphore. Increment the counter when you enter get_data, and wait for it become zero.

Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
  • Is there a countdown latch for asyncio? The threading module has a [barrier](https://docs.python.org/3/library/threading.html#barrier-objects) for that purpose, but I'm not aware of an equivalent asyncio primitive. – user4815162342 Nov 26 '20 at 10:10
  • @user4815162342 I'm not familiar with Python asyncio very much, I referred to the construct, not any actual class. A quick search brought up [this](https://stackoverflow.com/q/10236947/839733) as the top hit, there are others. – Abhijit Sarkar Nov 26 '20 at 11:15
  • Note that that question and its answers refer to threading, which would block asyncio. (Also, Python already has `threading.Barrier` mentioned in the previous comment.) I understand that the principle is the same, but implementation details might differ significantly. The asyncio `Lock` implementation is radically different from `threading.Lock`, despite them being exactly the same on an abstract level. – user4815162342 Nov 26 '20 at 11:40
  • @user4815162342 If you care to write an implementation, I’ll be interested in seeing it. Perhaps the OP will change his acceptance for a more practical answer, if he hasn’t already implemented one himself. – Abhijit Sarkar Nov 26 '20 at 11:56
  • I don't need it now, but if I write one, I'll add it here for future reference. I was just wondering if you happened to know of one, because I haven't seen it anywhere and it crops up every now and then in questions like this one. – user4815162342 Nov 26 '20 at 12:06