0

I am trying to run a multi-threaded program with asyncio, but I am failing at the part of adding threads. The program runs ok with asyncio as it is:

async def main(var1, var2):
    tasks = list()
    for z in var1:
        for x in range(5):
            tasks.append(get_ip(z, var2))
    return await asyncio.gather(*tasks)


loop = asyncio.get_event_loop()
start_time = time.time()
for x in list:
    result = loop.run_until_complete(main(x, list2))
    loop.run_until_complete(release_main(result))
loop.close()

I want to have the for x in list: in threads, I have 8 CPUs so I would want to run it with 8 threads like for example using: with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:.

I have been reading posts and everything but I either mess the result from result, or break something and doesn't work. Help/tips needed.

await loop.run_in_executor() doesn't work if I don't have that in a function, is it really needed? when I add the above code in a function and call it it breaks everything

  • Asyncio loops are single threaded. You cannot share a single loop between multiple threads, except for a very few threadsafe functions like `call_soon_threadsafe`. So you would need to manually spawn threads, manually spawn a separate loop in each thread, run whatever you need and finally synchronize results afterwards. But really, due to GIL this doesn't seem to be worth it anyway. Python is quite bad at parallel execution. – freakish Mar 10 '22 at 09:07
  • If I am able to get differente asyncio loops in different threads, it should make my code x5 or x6 times faster, as right now it runs with 1 of 8 cpu cores, running throw the loop woth 7-8 cores should make a big difference, and that's what I want. Any idea how to code it? I'm a begginer and this kind of things really confuse me – Tomi Begher Mar 10 '22 at 16:28
  • What makes you think it will run x5 or x6 times faster? What makes you think it will run faster at all? Again: Python has a big limitation called Global Interpreter Lock (GIL), which makes Python threads only usable for i/o. But asyncio already handles i/o efficiently. It is extremely unlikely that threads will give you any performance benefit compared to single-threaded asyncio. – freakish Mar 10 '22 at 17:04
  • I run this same exact program using multi-proccesing and it took 150 seconds (using 8 of 8 cpu cores). running the same program with asyncio took 300 seconds (using 1 of 8 cores). I really think combining asyncio with multi-threading using all cores would be much much faster, but I don't know how to implement it correctly. I tried `asyncio.run_coroutine_threadsafe()` but it just doesn't run my program with that. – Tomi Begher Mar 10 '22 at 17:19
  • At the end of the day I am not sure if it would be faster or not, but I want to try it myself somehow :) – Tomi Begher Mar 10 '22 at 17:20
  • Something like this: https://stackoverflow.com/q/32059732/11393178 – Tomi Begher Mar 10 '22 at 17:48
  • Fair enough, I've answered your question. A bit complicated, but at least you can check if it actually affects performance. – freakish Mar 10 '22 at 18:23
  • I modified my python program and run a bash script running the python program in threads, it wasn't 5 or 6 times faster, but definitely way more faster, takes around 110 seconds right now (running the python program 5 times at the same time, different cpus cores at 100% tho) – Tomi Begher Mar 14 '22 at 01:10

1 Answers1

0

Asyncio loops are not thread safe and are supposed to be used in a single thread, except for few thread safe function e.g. run_coroutine_threadsafe. And so one thing you can do is to spawn multiple threads and then create a separate loop in each thread:

import asyncio
import multiprocessing
import threading
import queue

# Spawn threads and run new loop in each thread
loops = queue.Queue()
def thread_job():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loops.put(loop)
    loop.run_forever()

for _ in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=thread_job)
    t.start()

# Our main
async def main(x, ev):
    try:
        await asyncio.sleep(1)
        print(x, threading.current_thread().ident)
    finally:
        ev.set()  # events are used to signalize that we are done

# Our job, that only runs main
def job(x, ev):
    loop = asyncio.get_event_loop()
    loop.create_task(main(x, ev))


# Main thread that synchronizes everything
lst = [i for i in range(3)]

events = []
for x in lst:
    ev = threading.Event()
    events.append(ev)

    # by putting the loop back to queue
    # we effectively shift the queue and
    # distribute the load equally
    loop = loops.get()
    loops.put(loop)

    loop.call_soon_threadsafe(job, x, ev)

for ev in events:
    ev.wait()

# Once all events are set, i.e. we are done, cleanup
while not loops.empty():
    loop = loops.get()
    loop.call_soon_threadsafe(loop.stop)

I use a FIFO queue for loops here, in order to distribute load equally between threads.

freakish
  • 54,167
  • 9
  • 132
  • 169
  • That looks perfect, yet I still can't understand most of it (due to lack of knowledge, not because of your code). if it doesn't bother you, can you explain/rewrite how would you use this code with the 5 lines of code I wrote in the question? that way would be much easier for me to understand the code. This looks like what I am looking for, but im sure I will break something when implementing it.. (gonna update in half an hour). – Tomi Begher Mar 10 '22 at 19:10
  • I added main(), to show more of what it looks like – Tomi Begher Mar 10 '22 at 19:16
  • @TomiBegher I don't know how to do that in 5 lines, sorry. With my code you have to replace my main with your main (and add an additional ev parameter). – freakish Mar 10 '22 at 19:17