2

I have added django.channels to a django project in order to support long running processes that notify users of progress via websockets.

Everything appears to work fine except for the fact that the implementation of the long running process doesn't seem to respond asynchronously.

For testing I have created an AsyncConsumer that recognizes two types of messages 'run' and 'isBusy'.

The 'run' message handler sets a 'busy flag' sends back a 'process is running' message, waits asynchronously for 20 seconds resets the 'busy flag' and then sends back a 'process complete message'

The 'isBusy' message returns a message with the status of the busy flag.

My expectation is that if I send a run message I will receive immediately a 'process is running' message back and after 20 seconds I will receive a 'process complete' message. This works as expected.

I also expect that if I send a 'isBusy' message I will receive immediately a response with the state of the flag.

The observed behaviour is as follows:

  • a message 'run' is sent (from the client)
  • a message 'running please wait' is immediately received
  • a message 'isBusy' is sent (from the client)
  • the message reaches the web socket listener on the server side
  • nothing happens until the run handler finishes
  • a 'finished running' message is received on the client
  • followed immediately by a 'process isBusy:False' message

Here is the implementation of the Channel listener:

class BackgroundConsoleConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.busy = False

    async def run(self, message):
        print("run got message", message)
        self.busy = True
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"consoleResponse",
                    "text":"running please wait"
                })
        await asyncio.sleep(20)
        self.busy = False
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"consoleResponse",
                    "text": "finished running"
                })

    async def isBusy(self,message):
        print('isBusy got message', message)
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"consoleResponse",
                    "text":  "process isBusy:{0}".format(self.busy)
                })

The channel is set up in the routing file as follows:

application = ProtocolTypeRouter({
    "websocket": AuthMiddlewareStack(
        URLRouter([
            url("^console/$", ConsoleConsumer),
        ])

    ),
    "channel": ChannelNameRouter({
        "background-console":BackgroundConsoleConsumer,
    }),
})

I run the channel with one worker (via ./manage.py runworker ).

The experiment was done with the django test server (via runserver).

Any ideas as to why the channel consumer does not appear to work asynchronously would be appreciated.

raduw
  • 1,008
  • 1
  • 9
  • 19
  • Can you show us the definition of `run`? The [documentation](https://channels.readthedocs.io/en/latest/topics/channel_layers.html) doesn't seem to mention it, so I assume it is defined in your code and not inherited. – user4815162342 Feb 20 '18 at 11:48
  • Hello, I am mentioning two functions `run` and `isBusy`, they are the functions shown in the code above `BackgroundConsoleConsumer`. I also mention running the channel with one worker... that is I start from a console a process for the channel with: `./manage.py runworker background-console` where `background-console` is the name associated with the channel ( the second script in the description above) – raduw Feb 20 '18 at 11:51
  • The messages (run and isBusy) come to the `BackgroundConsoleConsumer` via a `AsyncJsonWebsocketConsumer` which listens for strings from connected clients and then sends messages to the `background-console` channel. So upon receiving a socket message I just do the following: await self.channel_layer.send('background-console', { 'type': 'run', 'data': { 'some-data': 1} }) – raduw Feb 20 '18 at 11:58
  • 1
    Sorry, I meant the _call sites_ of `run`. The problem could be that `run` is being awaited instead of being started in the background, which causes `isBusy` to wait until it finishes. Maybe at some point you should be using `loop.create_task` instead of `await`. This is just a guess, since I'm not familiar with the channels architecture. – user4815162342 Feb 20 '18 at 12:21
  • `run` and `isBusy` are called by the AsyncConsumer. In it's __call__ implementation it gets the expected name of the handler function (`run` and `isBusy` ) from the message `type` and then calls dispatch via await_many_dispatch see: [github AsyncConsumer code](https://github.com/django/channels/blob/master/channels/consumer.py) ... AsyncConsumer is probably called from worker.py (but I didn't dig that deep yet) – raduw Feb 20 '18 at 12:30
  • After digging a bit in the source code for channels here's what happens and how one should go about to run long running processes: 1. All messages sent to a channel are placed in a Queue so a new message will not be processed by a channel (presuming only one runner) before the previous one is finished (regardless if the previous has temporarily released control). One must finish the processing of the current message in order to receive another one. To fix the example above all the 'long running' code from the run method should be place in a separate corutine function (say longRunning) and ... – raduw Feb 20 '18 at 16:54
  • ... from the run method one needs to schedule the longRunning corutine. So in the example above the `run` function would need to call `asyncio.ensure_future(self.longRunning())` in order to schedule the `longRunning` corutine without blocking the channel. – raduw Feb 20 '18 at 16:56
  • 1
    Nice work! Please write that up as an answer, it might be quite useful to others. Note that you might want to use `loop.create_task` (or the new `asyncio.create_task`) in preference to `asyncio.ensure_future` (as [explained by Guido](https://github.com/python/asyncio/issues/477#issuecomment-268709555)). – user4815162342 Feb 20 '18 at 17:03

1 Answers1

5

After a bit of digging around here is the problem and one solution to it.

A channel adds messages sent to it to a asyncio.Queue and processes them sequentially.

It is not enough to release the coroutine control (via a asyncio.sleep() or something similar), one must finish processing the message handler before a new message is received by the consumer.

Here is the fix to the previous example that behaves as expected (i.e. responds to the isBusy messages while processing the run long running task)

Thank you @user4815162342 for your suggestions.

class BackgroundConsoleConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.busy = False

    async def run(self, message):
        loop = asyncio.get_event_loop()
        loop.create_task(self.longRunning())

    async def longRunning(self):
        self.busy = True
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"the.type",
                    "text": json.dumps({'message': "running please wait", 'author': 'background console process'})
                })
        print('before sleeping')
        await asyncio.sleep(20)
        print('after sleeping')
        self.busy = False
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"the.type",
                    "text": json.dumps({'message': "finished running", 'author': 'background console process'})
                })

    async def isBusy(self,message):
        print('isBusy got message', message)
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"the.type",
                    "text":  json.dumps({'message': "process isBusy:{0}".format(self.busy),
                                         'author': 'background console process'})
                })
raduw
  • 1,008
  • 1
  • 9
  • 19
  • However `ensure_future()` - or starting with Python 3.7 `asyncio.create_task` [] – sebhaase Jul 11 '18 at 17:41
  • However `ensure_future()` should be preferred over loop.create_task - or starting with Python 3.7 `asyncio.create_task` [https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task] - - ref. https://stackoverflow.com/questions/36342899/asyncio-ensure-future-vs-baseeventloop-create-task-vs-simple-coroutine – sebhaase Jul 11 '18 at 17:49