0

There is a function that blocks event loop (f.e. that function makes an API request). I need to make continuous stream of requests which will run in parallel but not synchronous. So every next request will be started before the previous request will be finished.

So I found this solved question with the loop.run_in_executer() solution and use it in the beginning:

import asyncio
import requests

#blocking_request_func() defined somewhere

async def main():
    loop = asyncio.get_event_loop()
    future1 = loop.run_in_executor(None, blocking_request_func, 'param')
    future2 = loop.run_in_executor(None, blocking_request_func, 'param')
    response1 = await future1
    response2 = await future2
    print(response1)
    print(response2)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

this works well, requests run in parallel but there is a problem for my task - in this example we make group of tasks/futures in the beginning and then run this group synchronous. But I need something like this:

1. Sending request_1 and not awaiting when it's done.
(AFTER step 1 but NOT in the same time when step 1 starts):
2. Sending request_2 and not awaiting when it's done.
(AFTER step 2 but NOT in the same time when step 2 starts):
3. Sending request_3 and not awaiting when it's done.
(Request 1(or any other) gives the response)
(AFTER step 3 but NOT in the same time when step 3 starts):
4. Sending request_4 and not awaiting when it's done.
(Request 2(or any other) gives the response)

and so on...

I tried using asyncio.TaskGroup():

async def request_func():
    global result #the list of results of requests defined somewhere in global area
    loop = asyncio.get_event_loop()
    result.append(await loop.run_in_executor(None, blocking_request_func, 'param')
    await asyncio.sleep(0) #adding or removing this line gives the same result

async def main():
    async with asyncio.TaskGroup() as tg:
       for i in range(0, 10):
           tg.create_task(request_func())

all these things gave the same result: first of all we defined group of tasks/futures and only then run this group synchronous and concurrently. But is there a way to run all these requests concurrently but "in the stream"?

I tried to make visualization if my explanation is not clear enough.

What I have for now

What I need

================ Update with the answer ===================

The most close answer however with some limitations:

import asyncio
import random
import time

def blockme(n):
    x = random.random() * 2.0
    time.sleep(x)
    return n, x

def cb(fut):
    print("Result", fut.result())
    
async def main():

    #You need to control threads quantity
    pool = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    loop = asyncio.get_event_loop()
    futs = []

    #You need to control requests per second
    delay = 0.5

    while await asyncio.sleep(delay, result=True):
        fut = loop.run_in_executor(pool, blockme, n)
        fut.add_done_callback(cb)
        futs.append(fut)

        #You need to control futures quantity, f.e. like this:
        if len(futs)>40:
            completed, futs = await asyncio.wait(futs, 
                                                 timeout=5, 
                                                 return_when=FIRST_COMPLETED)

asyncio.run(main())

2 Answers2

0

This is directly referenced from Python documentation. The code snippet from documentation of asyncio library explains how you can run a blocking code concurrently using asyncio. It uses to_thread method to create task

you can find more here - https://docs.python.org/3/library/asyncio-task.html#running-in-threads

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())
Scarface
  • 359
  • 2
  • 13
  • Thank you for answer. However `asyncio.to_thread()` uses `loop.run_in_executor()` inside it. So this code do approximately the same things as the code that I wrote in my question and it doesn't solve my problem - first of all blocking functions are all blocked and than they run concurrently. It's not what I want. – Alexey Trukhanov Jan 20 '23 at 21:08
0

I think this might be what you want. You don't have to await each request - the run_in_executor function returns a Future. Instead of awaiting that, you can attach a callback function:

import asyncio
import random
import time

def blockme(n):
    x = random.random() * 2.0
    time.sleep(x)
    return n, x

def cb(fut):
    print("Result", fut.result())
    
async def main():
    loop = asyncio.get_event_loop()
    futs = []
    for n in range(20):
        fut = loop.run_in_executor(None, blockme, n)
        fut.add_done_callback(cb)
        futs.append(fut)
    await asyncio.gather(*futs)
    # await asyncio.sleep(10)

asyncio.run(main())

All the requests are started at the beginning, but they don't all execute in parallel because the number of threads is limited by the ThreadPool. You can adjust the number of threads if you want.

Here I simulated a blocking call with time.sleep. I needed a way to prevent main() from ending before all the callbacks occurred, so I used gather for that purpose. You can also wait for some length of time, but gather is cleaner.

Apologies if I don't understand what you want. But I think you want to avoid using await for each call, and I tried to show one way you can do that.

Paul Cornelius
  • 9,245
  • 1
  • 15
  • 24
  • Thank you, but it is not what I want. I don't need to avoid `await`. I need to run blocking functions one-by-one without necessity to wait for result of each function run. – Alexey Trukhanov Jan 21 '23 at 23:15
  • I just don't understand what you're trying to do. There never is a "necessity" to wait for a blocking function. Once you start a function running in another thread it will run to completion all by itself. In your code you have await expressions, which will suspend the main program until the function completes. You say that's not what you want, so why can't you simply remove those lines of code? Then your functions will just keep running until they finish. – Paul Cornelius Jan 22 '23 at 07:13
  • I'm very sorry for my english and for my programming skills as they not good enough to explane my problem clear for you. Thank you once again for trying to solve my problem and to understand me. So I'll try to explane it once more. I think, that as I can see in your comment - you understood my problem good enough. But I can't understand your solution. So: you said that once I start the function in another thread it will run by itself. It's OK. Then I need to avoid `await`. But It is the main problem for me - how can I avoid it and run the function in another thread without `await` calling?.. – Alexey Trukhanov Jan 22 '23 at 20:15
  • ...In your example code you first of all collect futures and then starts them by `gather`. If we don't use `gather` (or something else like `.TaskGroup` or just `creat_task`) how can we run the the function? For my problem I even doesn't need to get a future, I just need to call one blocking function in infinite loop moreover all these running functions don't need to wait for previous functions complete. And now I have 2 results - 1. Either all these functions run only then previous completed 2. Or I need to collect futures in the group and then run it at the same time (like in your example) – Alexey Trukhanov Jan 22 '23 at 20:25
  • OK, I'll try your example several times and it became a little bit clear for me. We don't need `await` to run functions - OK it's clear. But I still can't make an infinite loop like `while True:` because at the beginning we still need to wait all these functions to start. Yes? – Alexey Trukhanov Jan 22 '23 at 20:43
  • May be I need for another modules to do this. `threading` for example? – Alexey Trukhanov Jan 22 '23 at 20:54
  • The default executor in asyncio already uses threads (it's a ThreadPoolExecutor). It has all the functionality you need, I think. You don't need to wait for the functions to start, the executor starts them for you while your main program continues in parallel. There is no reason you can't put an infinite loop in your main program because the executor will run all the functions for you, in other threads. The functions run by themselves until they finish. If you have more functions than threads, the executor will start them as the previous ones finish. – Paul Cornelius Jan 23 '23 at 02:24
  • I used `gather` in my little example program for only one reason: I didn't want the main program to exit before the executor was done running all the functions. I just needed a way to pause the main script so you could see everything. If that's not the case for you, you don't need to do that. – Paul Cornelius Jan 23 '23 at 02:28
  • Hello! It's me again. I've carefully read all your explanations and I have made some tests. If I understand your explanations correctly I've made such conclusion: If I'll take your example code from your answer and change limit loop `for n in range(20)` to infinite loop, f.e. `while True` this code will works fine and I will begin to see output lines of completed functions. But it doesn't work. If I increase the `range()` f.e. `range(10000)` I observe a tiny delay before output starts. If I'm increasing `range()` the delay will increase too and with `range(1_000_000)` it'll be about 25 sec... – Alexey Trukhanov Jan 23 '23 at 11:15
  • ...and when I'll change limit loop to infinite loop `while True` the code shows nothing it just doing something. The only way to have the results of functions is to make the limit loop, wait for the delay and then call `gather` or `TaskGroup` or something like this. So - If on your computer this code with infinite loop works like you say then it's the problem of my settings/environment (f.e. python version or maybe OS settings or maybe IDE bug). Do you have any idea why it' going like this? – Alexey Trukhanov Jan 23 '23 at 11:21
  • And one more thing for your information that maybe help you to help me :). I've still tried `import threading` module. It works and do exactly what I want and what you said about running blocking functions in threads - run them one-by-one in infinite loop and I don't need to wait for every function completion. I leave your `blockme` example and call this statement `threading.Thread(target=blockme args=(n,), daemon=None).start()` in infinite loop. It works like expected! However, I ask you to spend a little bit more time to explane my problem with `asyncio()`. It is important for me. Thank you! – Alexey Trukhanov Jan 23 '23 at 11:35
  • `run_in_executor` doesn't start the execution of blockme immediately. It is just a request for the executor to run this function at the earliest opportunity. The executor has to keep track of these requests, and it feeds them to the threads as soon as an earlier one finishes. If you put this in a infinite loop with no time delays and with no way of regulating how many requests you have made, you build up a huge list of requests. Sooner or later the computer will run out of resources. When you said "infinite loop" I did not realize that this is what you intended to do. – Paul Cornelius Jan 24 '23 at 06:30
  • This is why you get better results if you periodically wait, which allows the executor time to catch up. This is normal. I see no reason to think that there is something wrong with your environment. You must limit the rate at which you feed items to the executor. There are ways to do that precisely: Semaphores, or run each item through a Queues of a fixed size, or just a simple counter that does a wait when it hits a certain value. – Paul Cornelius Jan 24 '23 at 06:36
  • I've just updated my question with the answer. I took your example and updated it with some code and comments. Please check it and write what do you think of it. Did I understand your advices correctly? – Alexey Trukhanov Jan 24 '23 at 11:54
  • Yes, that looks good. I think you need to add a keyword argument to your `asycnio.wait` call: `return_when=FIRST_COMPLETED`. Otherwise your routine will wait right here until ALL the calls finish. If one of them takes a long time then nothing will be happening until that one call is done. Your code is exactly the basic idea that I was trying to describe. – Paul Cornelius Jan 25 '23 at 20:11
  • Thanks! I've add `return_when...`. Thank you for all your answers and time you spend for me! – Alexey Trukhanov Jan 25 '23 at 20:26
  • You're welcome. I'm glad you got it working. – Paul Cornelius Jan 25 '23 at 20:42