15

I wrote a script that uses a nursery and the asks module to loop through and call an API based upon the loop variables. I get responses but don't know how to return the data like you would with asyncio.

I also have a question on limiting the APIs to 5 per second.

from datetime import datetime
import asks
import time
import trio

asks.init("trio")
s = asks.Session(connections=4)

async def main():
    start_time = time.time()

    api_key = 'API-KEY'
    org_id = 'ORG-ID'
    networkIds = ['id1','id2','idn']

    url = 'https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600'
    headers = {'X-Cisco-Meraki-API-Key': api_key, 'Content-Type': 'application/json'}

    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers)

    print("Total time:", time.time() - start_time)



async def fetch(url, headers):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)




if __name__ == "__main__":
    trio.run(main)

When I run nursery.start_soon(fetch...) , I am printing data within fetch, but how do I return the data? I didn't see anything similar to asyncio.gather(*tasks) function.

Also, I can limit the number of sessions to 1-4, which helps get down below the 5 API per second limit, but was wondering if there was a built in way to ensure that no more than 5 APIs get called in any given second?

Mark E. Haase
  • 25,965
  • 11
  • 66
  • 72
jleatham
  • 456
  • 8
  • 17
  • Thanks everyone. Take a look at initial attemp on github: https://github.com/jleatham/Meraki-findClient-async/blob/master/findClient.py Doesn't use queues, but limits the calls to below 5 APIs per second and retries any 429 responses. Code is very specific/ugly but could be modified for general API handling. – jleatham Oct 11 '18 at 19:08
  • 1
    A gather function https://github.com/python-trio/trio/issues/1089#issuecomment-521916733 – Tronic Jan 21 '20 at 08:55

5 Answers5

7

Returning data: pass the networkID and a dict to the fetch tasks:

async def main():
    …
    results = {}
    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers, results, i)
    ## results are available here

async def fetch(url, headers, results, i):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)
    results[i] = response

Alternately, create a trio.Queue to which you put the results; your main task can then read the results from the queue.

API limit: create a trio.Queue(10) and start a task along these lines:

async def limiter(queue):
    while True:
        await trio.sleep(0.2)
        await queue.put(None)

Pass that queue to fetch, as another argument, and call await limit_queue.get() before each API call.

Matthias Urlichs
  • 2,301
  • 19
  • 29
  • I am trying to unpack your answer, not quite understanding how to integrate into my code. I'll take a look at the .Event / .queue / and .sleep functions in the documentation. Thanks! – jleatham Oct 05 '18 at 18:44
  • OK, I got the results back using this method, I can work with that. Still working on getting the queue integrated into my code. I googled and didn't see a single example of trio.queue being used. How do I pass the limiter(queue) to fetch? Thanks so far! – jleatham Oct 05 '18 at 21:30
  • Sorry, bad wording. You don't pass `limiter(queue)` to `fetch` – you simply pass the queue which you use for limiting to `fetch`, as another argument. – Matthias Urlichs Oct 06 '18 at 10:03
  • Love your limiter queue! That's a model I hadn't seen before. – Quentin Stafford-Fraser Jun 04 '20 at 07:46
  • Note that Trio deprecated Queue in favour of open_memory_channel: https://trio.readthedocs.io/en/stable/history.html?highlight=trio.Queue#id40 See this answer instead: https://stackoverflow.com/a/53650112/5428609 – laker93 Mar 03 '23 at 09:24
6

Technically, trio.Queue has been deprecated in trio 0.9. It has been replaced by trio.open_memory_channel.

Short example:

sender, receiver = trio.open_memory_channel(len(networkIds)
async with trio.open_nursery() as nursery:
    for i in networkIds:
        nursery.start_soon(fetch, sender, url.format(i), headers)

async for value in receiver:
    # Do your job here
    pass

And in your fetch function you should call async sender.send(value) somewhere.

Adrien Clerc
  • 2,636
  • 1
  • 21
  • 26
  • 1
    also have to call the channels as a context manager with `async with sender` and `async with receiver` before sending and receiving values. See here: https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels – laker93 Mar 03 '23 at 09:34
6

Based on this answers, you can define the following function:

async def gather(*tasks):

    async def collect(index, task, results):
        task_func, *task_args = task
        results[index] = await task_func(*task_args)

    results = {}
    async with trio.open_nursery() as nursery:
        for index, task in enumerate(tasks):
            nursery.start_soon(collect, index, task, results)
    return [results[i] for i in range(len(tasks))]

You can then use trio in the exact same way as asyncio by simply patching trio (adding the gather function):

import trio
trio.gather = gather

Here is a practical example:

async def child(x):
    print(f"Child sleeping {x}")
    await trio.sleep(x)
    return 2*x

async def parent():
    tasks = [(child, t) for t in range(3)]
    return await trio.gather(*tasks)

print("results:", trio.run(parent))
cglacet
  • 8,873
  • 4
  • 45
  • 60
  • Trio allows for task communication by using `open_memory_channel`: https://trio.readthedocs.io/en/stable/reference-core.html?highlight=trio.Queue#using-channels-to-pass-values-between-tasks. It'd be better to use this functionality rather than monkey patching in a work around. The channels have the advantage of passing values generatively, rather than your `gather` implementation which hold all values in memory before returning to the caller. – laker93 Mar 03 '23 at 09:39
  • @Luke Purnell That misses the point, the point is to gather the results into a single place *after* the concurrency is completed and exited, *outside* the nursery execution scope (with statement). By definition, memory channels are between tasks inside nursery scopes, so it's a completely different thing. – Dubslow Apr 17 '23 at 15:58
  • 1
    I still don't see the need for this gather function. If you need the behaviour you've describer, then you could use a list created outside the nursery and append to it from within the memory channel receiver. – laker93 Apr 17 '23 at 16:05
4

When I run nursery.start_soon(fetch...) , I am printing data within fetch, but how do I return the data? I didn't see anything similar to asyncio.gather(*tasks) function.

You're asking two different questions, so I'll just answer this one. Matthias already answered your other question.

When you call start_soon(), you are asking Trio to run the task in the background, and then keep going. This is why Trio is able to run fetch() several times concurrently. But because Trio keeps going, there is no way to "return" the result the way a Python function normally would. where would it even return to?

You can use a queue to let fetch() tasks send results to another task for additional processing.

To create a queue:

response_queue = trio.Queue(capacity=len(networkIds))

When you start your fetch tasks, pass the queue as an argument and send a sentintel to the queue when you're done:

async with trio.open_nursery() as nursery:
    for i in networkIds:
        nursery.start_soon(fetch, url.format(i), headers, response_queue)
await response_queue.put(None)

After you download a URL, put the response into the queue:

async def fetch(url, headers, response_queue):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    # Add responses to queue
    await response_queue.put(response)
    print("Finished: ", url, len(response.content), response.status_code)

With the changes above, your fetch tasks will put responses into the queue. Now you need to read responses from the queue so you can process them. You might add a new function to do this:

async def process(response_queue):
    async for response in response_queue:
        if response is None:
            break
        # Do whatever processing you want here.

You should start this process function as a background task before you start any fetch tasks so that it will process responses as soon as they are received.

Read more in the Synchronizing and Communicating Between Tasks section of the Trio documentation.

laker93
  • 498
  • 4
  • 9
Mark E. Haase
  • 25,965
  • 11
  • 66
  • 72
  • 1
    This is a cleaner explanation. Except that `trio.Queue` always need a capacity. So instead of a marker, you can use a capacity bounded by the number of tasks (here `len(networkIds)`). – Adrien Clerc Dec 06 '18 at 10:45
  • Note that Trio deprecated `Queue` in favour of `open_memory_channel`: https://trio.readthedocs.io/en/stable/history.html?highlight=trio.Queue#id40 – laker93 Mar 03 '23 at 09:22
  • `where would it even return to?` seems to me that it should be returned to the nursery object after it exits its `with` block. its resources/tasks are cleaned up but the user could still call `nursery.results()` to get retvals from the now-closed tasks. – Dubslow Apr 17 '23 at 08:19
0

As @Adrien Clerc answer states: trio.Queue has been deprecated: https://trio.readthedocs.io/en/stable/history.html?highlight=trio.Queue#id40

For task communication in Trio see: https://trio.readthedocs.io/en/latest/reference-core.html#using-channels-to-pass-values-between-tasks

Here's a full working minimal (removing the async url get request and replacing with a sleep) example for your use case using open_memory_channel

import datetime
import trio


async def main():
    network_ids = ["id1", "id2", "idn"]
    url = "https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600"
    send_channel, receive_channel = trio.open_memory_channel(len(network_ids))
    async with trio.open_nursery() as nursery:
        nursery.start_soon(producer, send_channel, url, network_ids)
        nursery.start_soon(consumer, receive_channel)


async def producer(send_channel, url, network_ids):
    async with send_channel:
        async with trio.open_nursery() as nursery:
            for i in network_ids:
                nursery.start_soon(fetch, send_channel, url.format(i))


async def consumer(receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            # Do your job here
            print(f"value received: {value} at time {datetime.datetime.utcnow()}")


async def fetch(send_channel, url):
    print(f"Start: {datetime.datetime.utcnow()}")
    await trio.sleep(1)
    response = f"response for {url}"
    await send_channel.send(response)
    print(f"Finished: {datetime.datetime.utcnow()}")


if __name__ == "__main__":
    trio.run(main)


This prints:

Start: 2023-03-03 10:21:24.883787
Start: 2023-03-03 10:21:24.883787
Start: 2023-03-03 10:21:24.883787
Finished: 2023-03-03 10:21:25.887040
Finished: 2023-03-03 10:21:25.887040
Finished: 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/id1/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/id2/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/idn/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
laker93
  • 498
  • 4
  • 9