0

I am trying to understand how to best build a programme doing the following:

Consider multiple analyses. Each analysis requests data from multiple data sources (REST API's). In each analysis, when all the data is collected from the data sources, the data is checked for one or multiple conditions. If these conditions are met, another request is made to another data source.

The goal is to collect data for all analyses async, check the conditions for each analysis, request if the conditions are met and then repeat. Thus, the following requirements:

  1. The data is checked for the conditions after all the data is collected in the specific analysis, not after data is collected in all analyses.
  2. If the conditions are met, the request is made first thing - not after the conditions are checked for all the analyses.
  3. The get data -> check for conditions -> maybe request something loop is scheduled to run each X minutes or hours.

I have made the following demo:

import asyncio
import random


async def get_data(list_of_data_calls):
    tasks = []
    for l in list_of_data_calls:
        tasks.append(asyncio.ensure_future(custom_sleep(l)))
    return await asyncio.gather(*tasks)


async def custom_sleep(time):
    await asyncio.sleep(time)
    return random.randint(0, 100)


async def analysis1_wrapper():
    while True:
        print("Getting data for analysis 1")
        res = await get_data([5, 3])
        print("Data collected for analysis 1")
        for integer in res:
            if integer > 80:
                print("Condition analysis 1 met")
            else:
                print("Condition analysis 1 not met")
        await asyncio.sleep(10)


async def analysis2_wrapper():
    while True:
        print("Getting data for analysis 2")
        res = await get_data([5, 3])
        print("Data collected for analysis 2")
        for integer in res:
            if integer > 50:
                print("Condition analysis 2 met")
            else:
                print("Condition analysis 2 not met")
        await asyncio.sleep(10)


loop = asyncio.get_event_loop()
tasks = analysis1_wrapper(), analysis2_wrapper()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

This produces the following output:

Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 not met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 not met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met

This seems to work as I want. However, due to my limited experience with asyncio and aiohttp, I am not sure whether this is a good way to do it. I want to be able to add steps to the pipeline in the future, e.g. doing something based on logic around the request being made if the conditions are met. Also, it should be scalable to many analyses without losing too much speed.

mfvas
  • 115
  • 1
  • 11

1 Answers1

1

Yes it is basically like that. A few things to think about:

  1. Concurrency limit.

Although you may have unlimited number of concurrent tasks, but as the concurrency grows, the response time of each task grows too, with the throughput stops growing at some point or even decreases. Because there is only one main thread to execute everything, callbacks have to queue up when there're too many of them, even if the network response arrived several milliseconds ago. To balance this, you usually need a Semaphore to control the maximum concurrency for performance.

  1. CPU-intensive operations.

Your code didn't show, but the worry is that the condition checking might be CPU-intensive. In that case, you should defer the task into a thread pool (no GIL issue) or subprocesses (if GIL is an issue), for two reasons: 1. Stop blocked main thread from harming concurrency. 2. Make use of multiple CPUs more efficiently.

  1. Task control.

Your current code is sleeping 10 seconds in a loop for every analysis. This makes it difficult to gracefully shutdown the analysers, not to mention scaling on the fly. An ideal model would be the producer-consumer pattern, where you produce tasks with some kind of control into a Queue, and a bunch of workers retrieve tasks from the queue and works on them concurrently.

Fantix King
  • 1,414
  • 1
  • 14
  • 13
  • Great answer! Thanks. A few questions: Regarding 1) I understand that at some point the callbacks are queuing up. I figured the callbacks would just be run when it's "their turn". As I understand you, you are saying that too many callbacks can actually decrease the overall performance - not just create a queue of callbacks. Why is that? Regarding 2) What is the difference between sending the task to a thread pool or a subprocess? Regarding 3) The 10-second sleep was an example. But I will def. look into the producer-consumer pattern for this. What about using some kind of scheduler? – mfvas Jul 01 '18 at 08:45
  • Sure thing! 1) According to event loop [source code](https://github.com/python/cpython/blob/1bf9cc509326bc42cd8cb1650eb9bf64550d817e/Lib/asyncio/base_events.py#L1662), it uses [priority queue](https://en.wikipedia.org/wiki/Priority_queue) to maintain delayed callbacks, which has a `O(log n)` time complexity to `heappop()`. For I/O polling, it uses the most scalable implementation on different platforms, [epoll](https://stackoverflow.com/questions/6474602) on Linux for example, it also takes `O(log n)` to change the polling list. `O(log n)` is small, but it will take a bit of time. – Fantix King Jul 02 '18 at 01:52
  • still 1), it is not actually the `O(log n)` to worry about, it is still the growing response time. Because on a fixed physical machine, the maximum throughput is fixed. Therefore, the maximum concurrency without harming response time can be calculated based on the I/O time ratio in each analysis. For example, the getting data takes 10 seconds and condition checking is only 0.1 second, then you can have a lot more concurrent tasks waiting for data, while the CPU is busy checking the results of 1% tasks. Once you exceed that, stuff will queue up in the wrong place, making it hard to manage. – Fantix King Jul 02 '18 at 02:10
  • 2) Threads share the same [GIL](https://realpython.com/python-gil/#the-impact-on-multi-threaded-python-programs), while subprocesses don't. So if the tasks are pure Python written (no C-extension that could [manually release GIL](https://docs.python.org/3/c-api/init.html#releasing-the-gil-from-extension-code)), they'll probably use up to **one** CPU core with any number of threads, while with processes they may use up **all** CPU cores. – Fantix King Jul 02 '18 at 02:19
  • 3) Yeah custom scheduler is definitely fine. If tasks are centrally scheduled (instead of scheduled within each task runner), you'll need to deal with communication between coroutines. In this case, [Queue](https://docs.python.org/3/library/asyncio-queue.html) is probably an [elegant tool](https://doc.rust-lang.org/book/second-edition/ch16-02-message-passing.html). @mfvas – Fantix King Jul 02 '18 at 02:28
  • Thanks @fantix. Great answer, again. 1) Thanks. This makes sense. 2) Oh, so by subprocesses you just meant multiprocessing. I thought a subprocess was something new I did not know about.3) Do you by any chance know where to find examples of building this kind of architecture? – mfvas Jul 02 '18 at 07:57
  • @mfvas 2) There is [subprocess](https://docs.python.org/3/library/subprocess.html) module, and [asyncio version](https://docs.python.org/3/library/asyncio-subprocess.html). They are just processes as `multiprocessing`, just with different API. 3) Here is a [basic example](http://asyncio.readthedocs.io/en/latest/producer_consumer.html), I have some [incomplete example here](https://github.com/fantix/aintq/blob/master/aintq/worker.py) in real life. – Fantix King Jul 02 '18 at 09:41