0

I'm trying to get up to speed with Python and asyncio and I'm having some difficulty with setting things up so the main part of a program is not blocking when an asyncio process is started. For example, consider this mock client that uses threading to receive messages and insert them into a message queue that main can then process, along with main doing other stuff:

# mock_client.py

import threading
import time
import random
import collections

messageQueue = collections.deque()

def main():

    # start the thread running
    receiveMessagesThread = threading.Thread(target=receiveMessages)
    receiveMessagesThread.start()

    # in an actual program this would be a much longer while True loop, and getting the updated
    # messages from the message queue would be only one of many activities
    while True:
        # get and process all the messages in the message queue that have been queued up
        # since the last time around
        while len(messageQueue)  > 0:
            message = messageQueue.popleft()
            # an actual program would do something significant with these messages,
            # for this test app just print them out
            print('message = ' + str(message))
        # end while

        # an actual program would have many other activities to do here, use a random sleep to simulate this
        time.sleep(random.uniform(1.0, 2.0))
    # end while
# end main

def receiveMessages():
    while True:
        # there would be a time delay inbetween receiving messages in an actual program,
        # use a random sleep to simulate this
        time.sleep(random.uniform(0.1, 0.2))

        # an actual client would recive messages here and add them to the message queue,
        # to simulate this just make up a random number
        myRandInt = random.randint(1, 10)
        messageQueue.append(str(myRandInt))
    # end while
# end function

if __name__ == '__main__':
    main()

I'm trying to set up the equivalent with asyncio instead of threading and so far I haven't been able to work out how to do it. Most of the asyncio examples use loop.run_until_complete and loop.run_forever which I can't put at the beginning of main because that would block before the while True loop and prevent main from doing the other things it needs to do.

After having read the documentation and some other Stack Overflow posts, especially this one, I'm under the impression that calling run_in_executor is the answer but I'm unclear on how to set that up. Can anybody suggest how to modify the above to use asyncio / run_in_executor instead of threading?

--- Edit ---

Based on this video I worked out this example, which is getting substantially closer to fulfilling my criteria:

# asyncio4.py

import asyncio
import random
import collections

messageQueue = collections.deque()

async def main():
    # in an actual program this would be a much longer while True loop, and getting the updated
    # messages from the message queue would be only one of many activities
    while True:
        # get and process all the messages in the message queue that have been queued up
        # since the last time around
        while len(messageQueue) > 0:
            message = messageQueue.popleft()
            # an actual program would do something significant with these messages,
            # for this test app just print them out
            print('message = ' + str(message))
        # end while

        # an actual program would have many other activities to do here, use a random sleep to simulate this
        await asyncio.sleep(random.uniform(1.0, 2.0))
    # end while
# end function

async def receiveMessages():
    while True:
        # there would be a time delay inbetween receiving messages in an actual program,
        # use a random sleep to simulate this
        await asyncio.sleep(random.uniform(0.1, 0.2))

        # an actual client would recive messages here and add them to the message queue,
        # to simulate this just make up a random number
        myRandInt = random.randint(1, 10)
        messageQueue.append(str(myRandInt))
    # end while
# end function

loop = asyncio.get_event_loop()

try:
    asyncio.ensure_future(main())
    asyncio.ensure_future(receiveMessages())
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.close()
# end try

It seems the only thing left is replacing the two ensure_future calls with one run_in_executor call, at which time main won't have to be decorated with async. Can anybody suggest how to do this?

--- Edit ---

I probably should have mentioned earlier that in the receiveMessages function I need to use another library that requires receiveMessages to be async, but that I would prefer main not be decorated async as that may impose other limits I'm not aware of. Having to use await asyncio.sleep(random.uniform(1.0, 2.0)) instead of simply time.sleep(random.uniform(1.0, 2.0)) is one limit I'm aware of, but are there others?

Also as user4815162342 pointed out it seems concurrent.futures is often used in combination with run_in_executor, for example in this example, but I'm not sure how to adapt this to my case above.

cdahms
  • 3,402
  • 10
  • 49
  • 75
  • `run_in_executor` cannot replace `ensure_future`, its purpose is to adapt legacy blocking code to an asyncio code base. If all you want is to run things in the background using a thread pool, you don't need `async` and `run_in_executor` to begin with - just import `concurrent.futures` and submit functions to the thread pool directly. – user4815162342 Mar 14 '20 at 10:33
  • Why do you want to avoid having main be an async def function? There is no problem with doing that. If the program is doing what you want to do, you're done. This is a typical structure for a program with an event loop - you initialize some stuff, create some tasks, and then start the loop. – Paul Cornelius Mar 15 '20 at 00:42
  • @Paul Cornelius, does making main an async function impose any limits or cause any potential problems? For example, can I use time.sleep() in an async function? It's my understanding that I have to use await asyncio.sleep() instead, but are there any other more significant limits? In the actual program that I plan on using this in, main() and the other functions that main() calls are in the 1,000s of lines. How can I know what if any effect making main() async will have on everything else? Unfortunately this does not seem to be documented anywhere, leaving trial an error as the only option. – cdahms Mar 15 '20 at 15:39
  • @user4815162342 when you say "just import concurrent.futures and submit functions to the thread pool directly" that may be exactly what I'm looking for. Many of the examples I've found that use run_in_executor also use concurrent.futures, for example this: https://pymotw.com/3/asyncio/executors.html. The problem is I'm not sure how to adapt this for my example above. If you could explain this further or provide a working example that would be great. – cdahms Mar 15 '20 at 15:49
  • Yes, examples involving `run_in_executor` do use `concurrent.futures`, but those examples are written from the perspective of an asyncio program. The whole *point* of `run_in_executor` is to adapt futures-code to asyncio. But futures are a full-featured abstraction on its own which even predates asyncio, and if they fulfill your needs, you should just use them. Just look at the documentation for `concurrent.futures` and you'll find [examples](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) of doing things in the background. – user4815162342 Mar 15 '20 at 16:11
  • main() is, of course, just some function and has no special properties. If it's async def and you run it as an event loop task, it will run exactly as before with no changes. This will also confer no benefits. There is no magic here: the only reason to use async is if you have an application that will benefit from concurrency. Typically that will mean you have something to wait for, and something useful to do in the meantime. BTW you can definitely use time.sleep() in any function, but if you use it in an async def function it will block ALL the tasks until it times out. – Paul Cornelius Mar 15 '20 at 18:15

0 Answers0