18

Documentation for Django 3.1 says this about async views:

The main benefits are the ability to service hundreds of connections without using Python threads. This allows you to use slow streaming, long-polling, and other exciting response types.

I believe that "slow streaming" means we could implement an SSE view without monopolizing a thread per client, so I tried to sketch a simple view, like so:

async def stream(request):

    async def event_stream():
        while True:
            yield 'data: The server time is: %s\n\n' % datetime.datetime.now()
            await asyncio.sleep(1)

    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')

(note: I adapted the code from this response)

Unfortunately, when this view is invoked, it raises the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/asgiref/sync.py", line 330, in thread_handler
    raise exc_info[1]
  File "/usr/local/lib/python3.7/site-packages/django/core/handlers/exception.py", line 38, in inner
    response = await get_response(request)
  File "/usr/local/lib/python3.7/site-packages/django/core/handlers/base.py", line 231, in _get_response_async
    response = await wrapped_callback(request, *callback_args, **callback_kwargs)
  File "./chat/views.py", line 144, in watch
    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 367, in __init__
    self.streaming_content = streaming_content
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 382, in streaming_content
    self._set_streaming_content(value)
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 386, in _set_streaming_content
    self._iterator = iter(value)
TypeError: 'async_generator' object is not iterable

To me, this shows that StreamingHttpResponse doesn't currently support async generators.

I tried to modify StreamingHttpResponse to use async for but I wasn't able to do much.

Any idea how I could do that?

wowkin2
  • 5,895
  • 5
  • 23
  • 66
Benoit Blanchon
  • 13,364
  • 4
  • 73
  • 81
  • I'm 99% sure it's not supported as nothing in the response object is `awaitable`. – Tom Wojcik Aug 08 '20 at 15:21
  • @BenoitBlanchon so what is your actual goal? Have some page that generates response chunk-by-chunk (for big response) or have ability to asynchronously send SSE (Server Side Events) when some other event occurs? Or something completely else? Because I see fixed `StreamingHttpResponse` from your question, but you still haven't accepted that response. – wowkin2 Aug 20 '20 at 08:23
  • My goal is to send SSE, and I'll accept the first response that works. – Benoit Blanchon Aug 20 '20 at 08:28

5 Answers5

6

Honestly it is not supported natively by Django, but I have a solution for you using Daphne (which is also using in Django channels).

Created own StreamingHttpResponse class that is able to retrieve data stream from async methods and provide it to synchronous part of Django.

import asyncio

# By design asyncio does not allow its event loop to be nested.
# Trying to do so will give the error "RuntimeError: This event loop is already running".
# This library solves that problem.
import nest_asyncio

from django.http.response import StreamingHttpResponse


class AsyncStreamingHttpResponse(StreamingHttpResponse):

    def __init__(self, streaming_content=(), *args, **kwargs):
        sync_streaming_content = self.get_sync_iterator(streaming_content)
        super().__init__(streaming_content=sync_streaming_content, *args, **kwargs)

    @staticmethod
    async def convert_async_iterable(stream):
        """Accepts async_generator and async_iterator"""
        return iter([chunk async for chunk in stream])

    def get_sync_iterator(self, async_iterable):
        nest_asyncio.apply()

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        result = loop.run_until_complete(self.convert_async_iterable(async_iterable))
        return result

Also, you'll need to run your Django web-server using Daphne to support Server Sent Events (SSE) properly. It is officially support by "Django Software Foundation" and has similar syntax to gunicorn, but uses asgi.py instead of wsgi.py.

To use it - you can install using: pip install daphne

And change command from: python manage.py runserver
to something like: daphne -b 0.0.0.0 -p 8000 sse_demo.asgi:application.

Not sure if it will work with gunicorn.

Let me know if you'll have any more questions.

wowkin2
  • 5,895
  • 5
  • 23
  • 66
  • sorry to ask, what does `SSE` stand for? – JPG Aug 17 '20 at 14:11
  • 1
    In code this is just a naming (idea from authors question), can be changed. But I believe it is about `Server Sent Events`. Will rename class from `SSEResponse` to `AsyncStreamingHttpResponse` to make it more clear for others. – wowkin2 Aug 17 '20 at 14:13
  • Thanks, @wowkin2. I tried to use this class but got `ValueError: loop argument must agree with Future`. [Here is the source code of the view](https://github.com/bblanchon/django31-sse-demo/blob/f3a1faa79cb3d8eef51e4b0c3073b6d7eb42d4b1/clock/views.py); can you check? – Benoit Blanchon Aug 18 '20 at 17:36
  • @BenoitBlanchon forgot to mention, you need to run django with `daphne` to use async and Server Sent Events. Added more details to answer, but here is the command to check: `daphne -b 0.0.0.0 -p 8000 sse_demo.asgi:application` – wowkin2 Aug 19 '20 at 12:53
  • I just tried with daphne, and it doesn't seem to work. `curl -N 127.0.0.1:8000` doesn't show anything and the daphne process gets stuck and doesn't even respond to `Ctrl-C`. – Benoit Blanchon Aug 20 '20 at 08:31
  • 1
    @BenoitBlanchon here is my full code in this repository: https://github.com/wowkin2/django-test-streaming-response – wowkin2 Aug 20 '20 at 09:12
  • Thanks, @wowkin2, but it's still not working. I opened a few issues, please check. – Benoit Blanchon Aug 21 '20 at 07:55
  • @BenoitBlanchon answered to all of them. – wowkin2 Aug 21 '20 at 08:38
  • Please tell me if I'm wrong, but since the generator never ends, `loop.run_until_complete()` will block forever, right? – Benoit Blanchon Aug 22 '20 at 07:46
  • @BenoitBlanchon it will only not allow you to stop script using `Ctrl+C` (if you will not check for that signal in generator explicitly). It will not block anything else. Please better describe what you want to achieve using StreamingHttpResponse or SSE. Because here I just helped you to fix StreamingHttpResponse not solve your problem (about which I have no idea, perhaps you even don’t need SSE ir SHR) – wowkin2 Aug 22 '20 at 08:17
  • 2
    I think I've been pretty clear, look at the top of this page: "I'm giving 200 reputation to the first person who finds the right way to do SSE with an async view in Django 3.1+". – Benoit Blanchon Aug 22 '20 at 08:27
  • @BenoitBlanchon sending webbooks is also SSE (server sent event), but has nothing with async :) That’s why you need to explain your goal better. Send what, to where, how often, how big amount of data, what data type, and etc. – wowkin2 Aug 22 '20 at 11:28
  • "Server-Sent Events is a standard describing how servers can initiate data transmission towards clients once an initial client connection has been established." ([Wikipedia](https://en.wikipedia.org/wiki/Server-sent_events)). I don't think you can include webhooks in it. – Benoit Blanchon Aug 22 '20 at 12:32
  • @BenoitBlanchon there you can find library [django-eventstream](https://github.com/fanout/django-eventstream) for that on wiki page. And it uses Django Channels (which was also recommened by @Adrien). – wowkin2 Aug 22 '20 at 14:56
2

This is an old question but it came up on a Google result since I was looking for a solution to the same issue. In the end I found this repo https://github.com/valberg/django-sse - which uses async views in Django 4.2 to stream via SSE (specifically see here).

I understand this is a recent addition to Django so I hope it helps anyone else looking for an answer.

Jarym
  • 2,046
  • 1
  • 15
  • 13
  • 3
    Indeed, [the code](https://github.com/bblanchon/django-sse-demo) that was not working with Django 3.2, now magically works with 4.2. In his repo, valberg went a step further by implementing notification with PostgreSQL, which is excellent. Thanks a lot for sharing this! – Benoit Blanchon May 30 '23 at 07:41
0

Another way to do SSE is to use special library django-eventstream:

Add following to HTML page that will consume data:

<script src="{% static 'django_eventstream/eventsource.min.js' %}"></script>
<script src="{% static 'django_eventstream/reconnecting-eventsource.js' %}"></script>

var es = new ReconnectingEventSource('/events/');

es.addEventListener('message', function (e) {
    console.log(e.data);
}, false);

es.addEventListener('stream-reset', function (e) {
    // ... client fell behind, reinitialize ...
}, false);

For backend you'll need to properly setup Django, and later you'll be able to call following method in any view/task/signal/method that needs to do Server Side Event (SSE):

Add following view that will produce data (events):

# from django_eventstream import send_event

send_event('test', 'message', {'text': 'hello world'})
wowkin2
  • 5,895
  • 5
  • 23
  • 66
  • `django-eventstream` relies on Django Channels and doesn't leverage the new async views of Django 3.1. – Benoit Blanchon Aug 25 '20 at 08:15
  • @BenoitBlanchon agree, anyhow you can call that from any async view. Or you can use pure Django Channels that should utilize it. – wowkin2 Aug 25 '20 at 08:19
0

I created a decorator named stream that can be used with a coroutine function to make it compatible with Django's StreamingHttpResponse. Here's an example:

import asyncio
import functools

from django.http import StreamingHttpResponse


def stream(coroutine_function):
    @functools.wraps(coroutine_function)
    def wrapper(*args, **kwargs):
        coroutine = coroutine_function(*args, **kwargs)
        try:
            while True:
                yield asyncio.run(coroutine.__anext__())
        except StopAsyncIteration:
            pass
    return wrapper


@stream
async def chunks():
    for char in 'Hello, world!':
        yield char
        await asyncio.sleep(1)


async def index(request):
    return StreamingHttpResponse(chunks())

I also needed to add nest_asyncio and call apply() at the top of the settings.py file like:

import nest_asyncio
nest_asyncio.apply()

The nest_asyncio dependency supports calling asyncio.run from the wrapper function created by the stream decorator.

Finally, Django's asgi can be run using uvicorn through gunicorn like:

$ gunicorn -k uvicorn.workers.UvicornWorker www.asgi:application
Patrik Beck
  • 2,455
  • 1
  • 19
  • 24
GrantJ
  • 8,162
  • 3
  • 52
  • 46
-1

It seems you have to use something like django-channel :

Channels augments Django to bring WebSocket, long-poll HTTP, task offloading and other async support to your code, using familiar Django design patterns and a flexible underlying framework that lets you not only customize behaviours but also write support for your own protocols and needs.

Adrien
  • 43
  • 1
  • 6
  • Thanks for this response @Adrien, but this doesn't answer the question. I know about Django channels, but I want to know if we can avoid it now that Django 3.1 supports async views. – Benoit Blanchon Aug 20 '20 at 08:16