0

I have an async function that is being called (FuncA). This async function call another async function (FuncB). Now I'm within an executor, and I want to run FuncB. The executor is synchronous, so can't use wait.

What I've tried so far:

async def funcB():
   # do stuff
   pass

async def funcA():
   execute(funcB)

# This was called from an Async function (FuncA)
def execute(fn):
    
    # fn is FuncB
    result = fn(*self.args, **self.kwargs)
    
    # I can get the current running loop
    loop = asyncio.get_running_loop()
    
    # I can create a task
    task = asyncio.create_task(result)
    
    # I can't "run until complete" since FuncA is not going to complete so is going to get stuck
    loop.run_until_complete(task) # this doesn't work.

    # I can't access the task result
    task.done() == False
    task.result() # Raises InvalidStateError

    # I tried making a `regular` loop to wait until it's finished, but it obviously doesn't work since it's blocking
    while not task.done(): pass

    # Tried running in a Thread, but it won't wait
    # Can't use asyncio.run since there's another loop
    import threading
    x = threading.Thread(target=asyncio.create_task, args=(result,))
    x.run()

Running out of ideas. Is this even possible? I read somewhere (can't find the source) that I can't control with task executes first in the loop (I wanted funcB to execute immediately, while funcA will follow it's natural path (whatever that is).

lowercase00
  • 1,917
  • 3
  • 16
  • 36
  • asyncio is cooperative multitasking within a single thread. You are trying to start two functions, funcB and funcA, and then to have funcA wait until funcB is finished. But funcA is synchronous so it cannot yield control of the thread back to funcB, so where is funcB supposed to run? The only logical possibilities are to make funcA async (so it can yield to another task) or to use threads. You have found a solution, which is great, but I don't agree that this is simple. – Paul Cornelius Aug 17 '22 at 22:35
  • For an alternative way of handling the initialization of a secondary thread where async functions can be executed from sync code, see https://stackoverflow.com/questions/70231451/wrapping-python-async-for-synchronous-execution/70254610#70254610. – Paul Cornelius Aug 17 '22 at 22:37

1 Answers1

0

Made it work with a ThreadPoolExecutor:

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submits a method that deals with a new loop
        future = executor.submit(self._run_task_in_async_context, result)
        # Return the results normally.
        return_value = future.result()
        return return_value

Having the runner:

    def _run_task_in_async_context(self, funcB: t.Coroutine) -> t.Any:
        # Inside the thread executor
        import threading
        print(threading.current_thread())
        loop = asyncio.new_event_loop()
        result = loop.run_until_complete(funcB)
        print("Result is...", result)
        loop.close()
        # Closing the second loop.
        return result

This doesn't seem ideal. Depends on the ThreadPool, couldn't make it work with the ProcessPoolExecutor, and if far from obvious. But seems to work fine.

The reason why this works (I think) is because you can't have two loops in the same thread, but you can have two loops in different threads, so when there's an existing loop that you can't wait, but need to wait for a specific task, start a new thread, create a new loop over there, runs until complete, and then closes the loop (and exit the Thread).

Not sure how safe this is. Would be great if there was a simpler way to do that, seems overly complicated how it is now.

lowercase00
  • 1,917
  • 3
  • 16
  • 36