2

I wrote django channels code to send api data from two different sources asynchronously through websockets. The different sources takes few seconds to 1 minute to compute and send back the data. I managed to call them asynchronously using asyncio event loop. But the issue is that they are not sending the response back asynchronously. The code just waits for the all the data to arrive and sends everything at the same time.

Channels Code:

class SearchHotelConsumer(AsyncWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.source1 = Source1()
        self.source2 = Source2()

    async def connect(self):
        await self.accept()

    def disconnect(self, close_code):
        pass

    async def _source1_handler(self, request, queue):
        source1_response = await self.source1.async_post(request)

        await queue.put(source1_response.data)

    async def _source2_handler(self, request, queue):
        source2_response = await self.source2.async_post(request)

        await queue.put(source2_response.data)

    async def _send_msg(self, queue):
        while True:
            message = await queue.get()
            if message is None:
                break
            print('got the message')
            await self.send(text_data=json.dumps({
                'message': message
            }, cls=DjangoJSONEncoder))

            queue.task_done()

    def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        request = HttpRequest()
        request.method = 'POST'
        request.session = self.scope["session"]
        request = Request(request)
        for key, val in message.items():
            request.data[key] = val

        queue = asyncio.Queue()
        sender = asyncio.ensure_future(self._send_msg(queue))

        await self._source1_handler(request, queue)
        await self._source2_handler(request, queue)

        await queue.join()

        sender.cancel()

How can I make the message sending part truly asynchronous?

memnonila
  • 359
  • 2
  • 12
  • why are you using an async Queue? why not `_send_msg` within the `_source2_handler` and `_source1_handler`? `await self._source1_handler(...)` you can call them get the future for both and then await those futures after you have called both of them. – Matthaus Woolard Dec 25 '18 at 17:37
  • @MatthausWoolard that's what I did at first. It also gave the same synchronous response... – memnonila Dec 26 '18 at 07:08
  • what do you mean by synchronous? so `s1 = self._source1_handler(...)` & `s2 = self._source2_handler(...)` then after calling both `await s1` `await s2` if that does not work then you may need to create a secondary nested async runloop: see https://github.com/hishnash/channelsmultiplexer/blob/master/channelsmultiplexer/demultiplexer.py#L67 but i dont think this should be needed. – Matthaus Woolard Dec 27 '18 at 07:10

1 Answers1

1

Your async_post code which is not shown is probably synchronous.

If you want async requests see this answer:

https://stackoverflow.com/a/22414756/10840818

To use requests (or any other blocking libraries) with asyncio, you can use BaseEventLoop.run_in_executor to run a function in another thread and yield from it to get the result.

Or an alternative to way to do an http get async

async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=HEADERS, params=params) as resp:
            data = await resp.json()
MarkReedZ
  • 1,421
  • 4
  • 10