0

New to asynchronous programming - I am trying to build a system where multiple jobs can be submitted and stored in async queue. We should be able to handle 2 jobs at the same time. This means that jobs submitted while processing is ongoing should wait their turn.

However, When I submit a job, for the second job I do not get an response from the api immediately, and have to wait before the first job has been executed completely. I want to be able to submit mutltiple jobs that are added to the queue. The jobs involve a lot of computations.

Lisa
  • 1
  • 2
  • Welcome to Stack Overflow. I suggest you read the articles on [how to ask a good question](https://stackoverflow.com/help/how-to-ask) and [how to create a MRE](https://stackoverflow.com/help/mcve), then use the _Edit_ function to modify your question accordingly. The practice of boiling the code down to the bare minimum needed to capture the essence of the problem not only motivates others to actually try and help you but more often than not gives you a deeper understanding to solve the problem yourself. Right now I can only guess that your "jobs" execute blocking code. – Daniil Fajnberg Jun 03 '23 at 16:57
  • Thank you @DaniilFajnberg. I will remove the import code and code related to the class, and keep only the 3 methods that I am confused about. My problem is that I want to send a post request via curl to submit a job, and I will get a job_id in response. Ideally, I should be able to submit jobs using post requests, and immediately get the job_id as reponse, meanwhile jobs are added to the queue. However, what's happening is that once I submit the first job, until that first job is done, no other jobs are being added to the queue. – Lisa Jun 03 '23 at 17:34
  • You are correct, render_area(render_job) is CPU intensive. And yes, all the jobs that are to be submitted will be intensive. So what can I do to resolve this? So that I will be able to add new jobs to the queue? – Lisa Jun 03 '23 at 17:44
  • @python_user I already implemented a working solution using threads, but wanted to learn about asyncio so that everything can be done in a single thread. – Lisa Jun 03 '23 at 17:57
  • 1
    @Lisa You misunderstood me (or did not read the article). Removing relevant imports and other _names_ used in the code you show is the **opposite** of what is required for a MRE. The point is there is highly application-specific code here that is completely irrelevant to the underlying issue and just makes it harder to grasp what the problem is. I understand it is a lot of work to reduce your code step by step to make it as small as possible while still retaining the same error/problem. But that is what makes a MRE and a good question. – Daniil Fajnberg Jun 03 '23 at 18:01
  • But it seems we hit upon the actual root of the issue here. You need to read up a bit about concurrency in Python in general. [This](https://stackoverflow.com/q/27435284/) is a good starting point. But there is a lot more. Without a doubt though, if you have two or more CPU intensive tasks that you want to work on concurrently, `asyncio` is absolutely the wrong tool (or rather insufficient). And you will very likely need to spawn additional processes. Doing two CPU-heavy tasks concurrently in a single thread honestly seems like a contradiction in terms anyway. – Daniil Fajnberg Jun 03 '23 at 18:06
  • Ah, sorry English is not my first language. It's more clear now, I got it. – Lisa Jun 03 '23 at 18:07

1 Answers1

1

You must distinguish CPU-intensive functions (computations, rendering, etc.), IO-bound functions (file reading, API requests with requests or urllib) and asynchronous functions (the ones that use async and await such as AIOHTTP with AsyncIO) as this will dictate how you'll design your concurrent program.

First, you should not perform blocking calls (CPU-intensive work or blocking IO function such as requests.get) inside an asynchronous functions as it will block the event loop of the asynchronous runtime, resulting in concurrency issues. The following example illustrate this problem:

import asyncio
import requests
from math import sqrt

# Asynchronous function
async def fetch(i: int) -> int:
    await asyncio.sleep(1.0)
    return i

# CPU-intensive function
def render(i: int) -> int:
    for _ in range(30_000_000):
        _ = sqrt(i) ** 2
    return i

# This function is flawed!
async def work(i: int) -> int:
   value = await fetch(i)
   return render(i)  # CPU-intensive work inside `async`!

# This function is also flawed!
async def work(i: int) -> int:
   value = await fetch(i)
   return requests.get(f"https://url{i}"). # Blocking IO work inside `async`!

If you want to perform CPU-intensive work inside an async function, you should offload it to a process pool (for instance the ProcessPoolExecutor from concurrent.futures):

import asyncio
from concurrent.futures import ProcessPoolExecutor

async def work(i: int, executor: ProcessPoolExecutor) -> int:
    i = await fetch(i)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, render, i) 

If you want to perform an IO-bound operation inside an async function you can simply offload the work to a separate thread:

import asyncio
import requests

async def work(i: int) -> int:
    i = await fetch(i)
    return await asyncio.to_thread(requests.get, f"https://someurl{i}/")

Given that all your functions are now correct and depending on your use case you can use different frameworks for limiting the concurrency:

  • AsyncIO queues for async functions,
  • ProcessPoolExecutor with max_workers = 2 for example for CPU-intensive functions,
  • or dedicated API's such as TCPConnector for AIOHTTP.

Here is how you would write a function that limits concurrent render tasks to two using AsyncIO and ProcessPoolExecutor:

import asyncio
from concurrent.futures import ProcessPoolExecutor

async def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        results = await asyncio.gather(
            *(work(i, executor) for i in range(20))
        )
        print(results)

Not that this will not limit the concurrency of the fetch operations but only the CPU-intensive tasks running in the process pool. To limit the number of concurrent fetch operations you could use AIOHTTP's TCPConnector, an AsyncIO queue or a tool such as the throttler Python package.

Louis Lac
  • 5,298
  • 1
  • 21
  • 36
  • Thank you for your explanation, it was very insightful. I am putting the render tasks in a queue, and at any point in time I want to allow only two tasks from the FIFO queue to run concurrently, So i was wondering if I can do that using gather. – Lisa Jun 04 '23 at 13:34