I am trying to understand how to best build a programme doing the following:
Consider multiple analyses. Each analysis requests data from multiple data sources (REST API's). In each analysis, when all the data is collected from the data sources, the data is checked for one or multiple conditions. If these conditions are met, another request is made to another data source.
The goal is to collect data for all analyses async, check the conditions for each analysis, request if the conditions are met and then repeat. Thus, the following requirements:
- The data is checked for the conditions after all the data is collected in the specific analysis, not after data is collected in all analyses.
- If the conditions are met, the request is made first thing - not after the conditions are checked for all the analyses.
- The get data -> check for conditions -> maybe request something loop is scheduled to run each X minutes or hours.
I have made the following demo:
import asyncio
import random
async def get_data(list_of_data_calls):
tasks = []
for l in list_of_data_calls:
tasks.append(asyncio.ensure_future(custom_sleep(l)))
return await asyncio.gather(*tasks)
async def custom_sleep(time):
await asyncio.sleep(time)
return random.randint(0, 100)
async def analysis1_wrapper():
while True:
print("Getting data for analysis 1")
res = await get_data([5, 3])
print("Data collected for analysis 1")
for integer in res:
if integer > 80:
print("Condition analysis 1 met")
else:
print("Condition analysis 1 not met")
await asyncio.sleep(10)
async def analysis2_wrapper():
while True:
print("Getting data for analysis 2")
res = await get_data([5, 3])
print("Data collected for analysis 2")
for integer in res:
if integer > 50:
print("Condition analysis 2 met")
else:
print("Condition analysis 2 not met")
await asyncio.sleep(10)
loop = asyncio.get_event_loop()
tasks = analysis1_wrapper(), analysis2_wrapper()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
This produces the following output:
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 not met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 not met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
This seems to work as I want. However, due to my limited experience with asyncio and aiohttp, I am not sure whether this is a good way to do it. I want to be able to add steps to the pipeline in the future, e.g. doing something based on logic around the request being made if the conditions are met. Also, it should be scalable to many analyses without losing too much speed.