0

Our Django 4.1 website is running on a WSGI server. We are not ready to convert the entire website to ASGI but we rewrote an io-intensive operation in asyncio mode and called it in the view function as follows:

async def utility_func(urls):    
    await asyncio.gather(download_from_urls)

class MyView(generic.edit.FormView)
    def form_valid(self, form):
        asyncio.run(utility_func(form.cleaned_data['urls']))

In our view, this has nothing to do with Django's async support because the view function is running in sync mode and the server is running on WSGI and there is no system event loop. However, when the view functions runs a little longer (e.g. > 5s), we notice some other parts of the website start to throw exceptions like

SynchronousOnlyOperation You cannot call this from an async context - use a thread or sync_to_async.

This happens to requests such as our health-check call

    path("is_alive/", lambda x: HttpResponse()),

They are all straight sync views and have never thrown the SynchronousOnlyOperation exceptions before.

Can anyone tell us what is happening here? The best guess we have is that when the asyncio.run() is running, the lambda function, which performs a completely unrelated task, is somehow executed in the async environment and causes the exception. If this is the case, how can we resolve the issue?

Note 1: We use gunicorn and gevent to start the server.

gunicorn --worker-class gevent config.wsgi --bind 0.0.0.0:8080 

Note 2: We could completely get rid of async code from the view by sending the time-consuming task to a celery server but I am still wondering why we cannot use asyncio.run in the view function.

Note 3: This google groups discussion tends to say WSGI is not compatible with asyncio.

POSSIBLE SOLUTION:

After reading more about Django's async support, we noticed that

async_to_sync() is essentially a more powerful version of the asyncio.run() function in Python’s standard library.

and decided to replace all calls to asyncio.run() with async_to_sync. We have not noticed any SynchronousOnlyOperation error since then.

user2283347
  • 670
  • 8
  • 12

1 Answers1

0

While the problem you describe is weird enough I can't give a straight answer, I have a pattern suggestion that you might use there, and maybe things could get working:

Using asyncio in the way you are doing is quite expensive: starting and cleaning up an asyncio-loop is not "free" - unlike running an async request in a loop.

A good pattern would be to have a side-thread per worker running an asyncio loop, and use loop.call_soon_threadsafe to dispatch your async tasks there. A data structure like a dict could be used to fetch return values if any:

from threading import Thread
import asyncio
import uuid

# Per worker Global state for the results
worker_async_thread = None
worker_async_loop = None
worker_async_tasks = set()
async_results = {}  # if results may be needed in another http request, which might hit another worker you should persist then in the database in the async-thread, or use something like redis.


async def utility_func(urls):    
    return await asyncio.gather(download_from_urls)

def _submit_async_in_thread(handler, awaitable):
    # this has to be sync, and can't simply call "await"
    def _task_done(task):
        worker_async_tasks.pop(task, None)
        try: # TBD: check for exception in tasks and handle it accordingly
            result = task.result()
        except Exception:
            logger.error(...)
        async_results[handle] = result
    task = asyncio.create_task(awaitable)
    worker_async_tasks.add(task)

def submit_async(awaitable):
    if worker_async_thread is None:
        worker_async_loop = asyncio.new_event_loop()
        worker_async_thread = Thread(target=worker_async_loop.run_forever)
        worker_async_thread.start()
    handler = uuid.uuid4()  # or any other unique id
    worker_async_loop.call_soon_threadsafe(_submit_async_in_thread, handler, awaitable)
    return handler
    
    
class MyView(generic.edit.FormView)
    def form_valid(self, form):
        result_handler = submit_async(utility_func(form.cleaned_data['urls']))
        while not result_handler in async_results:
            time.sleep(0.01)
        result = async_results.pop(result_handler)
        ...

jsbueno
  • 99,910
  • 10
  • 151
  • 209