I'm trying to get up to speed with Python and asyncio
and I'm having some difficulty with setting things up so the main part of a program is not blocking when an asyncio
process is started. For example, consider this mock client that uses threading
to receive messages and insert them into a message queue that main can then process, along with main doing other stuff:
# mock_client.py
import threading
import time
import random
import collections
messageQueue = collections.deque()
def main():
# start the thread running
receiveMessagesThread = threading.Thread(target=receiveMessages)
receiveMessagesThread.start()
# in an actual program this would be a much longer while True loop, and getting the updated
# messages from the message queue would be only one of many activities
while True:
# get and process all the messages in the message queue that have been queued up
# since the last time around
while len(messageQueue) > 0:
message = messageQueue.popleft()
# an actual program would do something significant with these messages,
# for this test app just print them out
print('message = ' + str(message))
# end while
# an actual program would have many other activities to do here, use a random sleep to simulate this
time.sleep(random.uniform(1.0, 2.0))
# end while
# end main
def receiveMessages():
while True:
# there would be a time delay inbetween receiving messages in an actual program,
# use a random sleep to simulate this
time.sleep(random.uniform(0.1, 0.2))
# an actual client would recive messages here and add them to the message queue,
# to simulate this just make up a random number
myRandInt = random.randint(1, 10)
messageQueue.append(str(myRandInt))
# end while
# end function
if __name__ == '__main__':
main()
I'm trying to set up the equivalent with asyncio
instead of threading and so far I haven't been able to work out how to do it. Most of the asyncio
examples use loop.run_until_complete
and loop.run_forever
which I can't put at the beginning of main
because that would block before the while True
loop and prevent main
from doing the other things it needs to do.
After having read the documentation and some other Stack Overflow posts, especially this one, I'm under the impression that calling run_in_executor
is the answer but I'm unclear on how to set that up. Can anybody suggest how to modify the above to use asyncio
/ run_in_executor
instead of threading?
--- Edit ---
Based on this video I worked out this example, which is getting substantially closer to fulfilling my criteria:
# asyncio4.py
import asyncio
import random
import collections
messageQueue = collections.deque()
async def main():
# in an actual program this would be a much longer while True loop, and getting the updated
# messages from the message queue would be only one of many activities
while True:
# get and process all the messages in the message queue that have been queued up
# since the last time around
while len(messageQueue) > 0:
message = messageQueue.popleft()
# an actual program would do something significant with these messages,
# for this test app just print them out
print('message = ' + str(message))
# end while
# an actual program would have many other activities to do here, use a random sleep to simulate this
await asyncio.sleep(random.uniform(1.0, 2.0))
# end while
# end function
async def receiveMessages():
while True:
# there would be a time delay inbetween receiving messages in an actual program,
# use a random sleep to simulate this
await asyncio.sleep(random.uniform(0.1, 0.2))
# an actual client would recive messages here and add them to the message queue,
# to simulate this just make up a random number
myRandInt = random.randint(1, 10)
messageQueue.append(str(myRandInt))
# end while
# end function
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(main())
asyncio.ensure_future(receiveMessages())
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
# end try
It seems the only thing left is replacing the two ensure_future
calls with one run_in_executor
call, at which time main
won't have to be decorated with async
. Can anybody suggest how to do this?
--- Edit ---
I probably should have mentioned earlier that in the receiveMessages
function I need to use another library that requires receiveMessages
to be async
, but that I would prefer main
not be decorated async
as that may impose other limits I'm not aware of. Having to use await asyncio.sleep(random.uniform(1.0, 2.0))
instead of simply time.sleep(random.uniform(1.0, 2.0))
is one limit I'm aware of, but are there others?
Also as user4815162342
pointed out it seems concurrent.futures
is often used in combination with run_in_executor
, for example in this example, but I'm not sure how to adapt this to my case above.