0

I'm trying to execute the code on Python in parallel using asyncio. The idea is to run multiple jobs in parallel.

Here is my code:

import asyncio
import threading

async def print_thread():
    for n in range(5):
        print("Number: {}".format(threading.get_ident()))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(print_thread())
    finally:
        loop.close()

The output is:

Number: 4599266752
Number: 4599266752
Number: 4599266752
Number: 4599266752
Number: 4599266752

As far as I understand the code has been executed on a single thread. Is there a way to parallelize it?

PS

If I change the code to:

async def print_thread():
    print("Number: {}".format(threading.get_ident()))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        for n in range(5):
            loop.run_until_complete(print_thread())

I get the same result.

Finkelson
  • 2,921
  • 4
  • 31
  • 49

2 Answers2

1

Your for loop is inside your coroutine, so it cannot be in different threads. But even if you put the loop outside of your async function it will still run in the same thread:

import asyncio
import threading


async def print_thread():
    print("Thread: {}".format(threading.get_ident()))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = []
    for i in range(10):
        tasks.append(asyncio.ensure_future(print_thread()))
    loop.run_until_complete(asyncio.gather(*tasks))
    

Which will still output the same id:

Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864

The solution is to use a ThreadPoolExecutor, but it needs a function, not a coroutine, so you have to remove async from the definition:

import asyncio
import threading
import concurrent.futures


def print_thread():
    print("Thread: {}".format(threading.get_ident()))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        for i in range(10):
            loop.run_in_executor(pool, print_thread)

Which outputs:

Thread: 140446369556224
Thread: 140446361163520
Thread: 140446369556224
Thread: 140446361163520
Thread: 140446369556224
Thread: 140446352508672
Thread: 140446361163520
Thread: 140446344115968
Thread: 140446369556224
Thread: 140446335723264

As you can see, there are less threads than calls, it's normal. But if you have large batchs, you can change the thread count with max_workers parameter in ThreadPoolExecutor constructor.

If you still want to use a coroutine, there is a solution there: https://stackoverflow.com/a/46075571/7414475

Elektordi
  • 355
  • 2
  • 8
0

Another answer with result collection, as requested in comments:

import asyncio
import threading
import concurrent.futures


def get_thread():
    return "Thread: {}".format(threading.get_ident())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        tasks = []
        for i in range(10):
            tasks.append(loop.run_in_executor(pool, get_thread))
        print(loop.run_until_complete(asyncio.gather(*tasks)))

Output:

['Thread: 139740266125056', 'Thread: 139740266125056', 'Thread: 139740266125056', 'Thread: 139740183525120', 'Thread: 139740266125056', 'Thread: 139740175132416', 'Thread: 139740183525120', 'Thread: 139740166739712', 'Thread: 139740266125056', 'Thread: 139740158347008']
Elektordi
  • 355
  • 2
  • 8