7

I am trying to make a program to make a lot of web-socket connections to the server I've created:

class WebSocketClient():

    @asyncio.coroutine
    def run(self):
        print(self.client_id, 'Connecting')
        ws = yield from aiohttp.ws_connect(self.url)
        print(self.client_id, 'Connected')
        print(self.client_id, 'Sending the message')
        ws.send_str(self.make_new_message())

        while not ws.closed:
            msg = yield from ws.receive()

            if msg.tp == aiohttp.MsgType.text:
                print(self.client_id, 'Received the echo')
                yield from ws.close()
                break

        print(self.client_id, 'Closed')


@asyncio.coroutine
def make_clients():

    for client_id in range(args.clients):
        yield from WebSocketClient(client_id, WS_CHANNEL_URL.format(client_id=client_id)).run()


event_loop.run_until_complete(make_clients())

The problem is that all the clients do their jobs one after another:

0 Connecting
0 Connected
0 Sending the message
0 Received the echo
0 Closed
1 Connecting
1 Connected
1 Sending the message
1 Received the echo
1 Closed
...

I've tried to use asyncio.wait, but all the clients start together. I want them to be created gradually and connected to the server immediately once each of them is created. At the same time continuing creating new clients.

What approach should I apply to accomplish this?

warvariuc
  • 57,116
  • 41
  • 173
  • 227

1 Answers1

9

Using asyncio.wait is a good approach. You can combine it with asyncio.ensure_future and asyncio.sleep to create tasks gradually:

@asyncio.coroutine
def make_clients(nb_clients, delay):
    futures = []
    for client_id in range(nb_clients):
        url = WS_CHANNEL_URL.format(client_id=client_id)
        coro = WebSocketClient(client_id, url).run()
        futures.append(asyncio.ensure_future(coro))
        yield from asyncio.sleep(delay)
    yield from asyncio.wait(futures)

EDIT: I implemented a FutureSet class that should do what you want. This set can be filled with futures and removes them automatically when they're done. It is also possible to wait for all the futures to complete.

class FutureSet:

    def __init__(self, maxsize, *, loop=None):
        self._set = set()
        self._loop = loop
        self._maxsize = maxsize
        self._waiters = []

    @asyncio.coroutine
    def add(self, item):
        if not asyncio.iscoroutine(item) and \
           not isinstance(item, asyncio.Future):
            raise ValueError('Expecting a coroutine or a Future')
        if item in self._set:
            return
        while len(self._set) >= self._maxsize:
            waiter = asyncio.Future(loop=self._loop)
            self._waiters.append(waiter)
            yield from waiter
        item = asyncio.async(item, loop=self._loop)    
        self._set.add(item)
        item.add_done_callback(self._remove)

    def _remove(self, item):
        if not item.done():
            raise ValueError('Cannot remove a pending Future')
        self._set.remove(item)
        if self._waiters:
            waiter = self._waiters.pop(0)
            waiter.set_result(None)

    @asyncio.coroutine
    def wait(self):
        return asyncio.wait(self._set)

Example:

@asyncio.coroutine
def make_clients(nb_clients, limit=0):
    futures = FutureSet(maxsize=limit)
    for client_id in range(nb_clients):
        url = WS_CHANNEL_URL.format(client_id=client_id)
        client = WebSocketClient(client_id, url)
        yield from futures.add(client.run())
    yield from futures.wait()
Vincent
  • 12,919
  • 1
  • 42
  • 64
  • `asyncio.Queue` is a *final* class not intended for inheritance. Thus users should never derive own classes from `asyncio.Queue` even if it's technically possible. – Andrew Svetlov Oct 17 '15 at 07:13
  • @AndrewSvetlov I guess users might want to inherit from `asyncio.Queue` to create different kind of queues (such as `asyncio.PriorityQueue` or `asyncio.LifoQueue`) but in that case I've just been lazy :p I got rid of it anyway. – Vincent Oct 17 '15 at 11:57
  • No, user cannot (at least should not). `LifoQueue` and `PriorityQueue` are `asyncio` classes, not intended for inheritance. The only `asyncio` classes intended for inheritance are `Protocol` and family. The state was pronounced by Guido van Rossum several times when we designed the library. – Andrew Svetlov Oct 17 '15 at 13:49
  • @Vincent Your last version is working too. Thanks! Do you think adding `asyncio.sleep` somewhere in the `while len(self._set) >= self._maxsize` loop will reduce 100% CPU load? – warvariuc Oct 18 '15 at 13:33
  • @warvariuc A 100% CPU load means the clients manage to keep the event loop busy all the time. I'm not sure the `while` loop is the best place to add a `sleep` though. Maybe somewhere in `client.run()` instead, or decrease the `limit` value to have less clients running at the same time. – Vincent Oct 18 '15 at 23:09
  • @Vincent you are right. Now I understand how this works. `yield from asyncio.Future(loop=self._loop)` does not return until the future is marked as done in `FutureSet._remove`. I think using a single `self._waiter` instead of list would make it clearer. Otherwise in `FutureSet._remove` should be `if self._waiters` -> `while self._waiters` – warvariuc Oct 19 '15 at 06:38
  • @warvariuc Well you could have several items waiting to be placed in the set. And there's is no point in waking up all the waiting items when only one item is removed. This code is very similar to the [asyncio.Queue implementation](https://github.com/python/asyncio/blob/master/asyncio/queues.py#L29), so you might want to check it out. – Vincent Oct 19 '15 at 08:20
  • @Vincent you are right, though I've replaced `_waiters` list with a single waiter, and it worked too, because (I think) there cannot be several waiters in the list (at least in a single-threaded app). – warvariuc Oct 19 '15 at 08:29
  • @warvariuc In this example yes, because you only have one coroutine adding futures to the set. But you could imagine passing the future set to several coroutines like `make_clients`. Then several waiters might be needed. – Vincent Oct 19 '15 at 08:41
  • @Vincent I see. Thanks for the explanation (I'm kind of dumb in the recent days :). I've added `if len(self._set) < self._maxsize: while self._waiters: self._waiters.pop().set_result(None)` to clear all the waiters once the queue has room. – warvariuc Oct 19 '15 at 09:21
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/92705/discussion-between-vincent-and-warvariuc). – Vincent Oct 19 '15 at 09:30
  • 1
    I'd just like to add that `asyncio.wait` returns 2 sets of futures - complete and pending. This wasn't very obviously to me from this example and took me by surprise (until I read the docs). – ChrisWue Aug 08 '17 at 21:58