4

From this github link: https://github.com/pyxll/pyxll-examples/blob/master/bitmex/bitmex.py

When I run this code, I get the message saying the explicit passing of coroutine objects to asyncio.wait() is deprecated. I've pinpointed this to line 71: await asyncio.wait(tasks), but can't figure out how to resolve the issue.

Code below for reference:

from pyxll import xl_func, RTD, get_event_loop
import websockets
import asyncio
import json


class BitMex:
    """Class to manage subscriptions to instrument prices."""

    URI = "wss://www.bitmex.com/realtime"

    def __init__(self, loop=None):
        self.__websocket = None
        self.__running = False
        self.__running_task = None
        self.__subscriptions = {}
        self.__data = {}
        self.__lock = asyncio.Lock(loop=loop)

    async def __connect(self):
        # Connect to the websocket API and start the __run coroutine
        self.__running = True
        self.__websocket = await websockets.connect(self.URI)
        self.__connecting_task = None
        self.__running_task = asyncio.create_task(self.__run())

    async def __disconnect(self):
        # Close the websocket and wait for __run to complete
        self.__running = False
        await self.__websocket.close()
        self.__websocket = None
        await self.__running_task

    async def __run(self):
        # Read from the websocket until disconnected
        while self.__running:
            msg = await self.__websocket.recv()
            await self.__process_message(json.loads(msg))

    async def __process_message(self, msg):
        if msg.get("table", None) == "instrument":
            # Extract the data from the message, update our data dictionary and notify subscribers
            for data in msg.get("data", []):
                symbol = data["symbol"]
                timestamp = data["symbol"]

                # Update the latest values in our data dictionary and notify any subscribers
                tasks = []
                subscribers = self.__subscriptions.get(symbol, {})
                latest = self.__data.setdefault(symbol, {})
                for field, value in data.items():
                    latest[field] = (value, timestamp)

                    # Notify the subscribers with the updated field
                    for subscriber in subscribers.get(field, []):
                        tasks.append(subscriber(symbol, field, value, timestamp))

                # await all the tasks from the subscribers
                if tasks:
                    await asyncio.wait(tasks)

    async def subscribe(self, symbol, field, callback):
        """Subscribe to updates for a specific symbol and field.

        The callback will be called as 'await callback(symbol, field, value, timestamp)'
        whenever an update is received.
        """
        async with self.__lock:
            # Connect the websocket if necessary
            if self.__websocket is None:
                await self.__connect()

            # Send the subscribe message if we're not already subscribed
            if symbol not in self.__subscriptions:
                msg = {"op": "subscribe", "args": [f"instrument:{symbol}"]}
                await self.__websocket.send(json.dumps(msg))

            # Add the subscriber to the dict of subscriptions
            self.__subscriptions.setdefault(symbol, {}).setdefault(field, []).append(callback)

            # Call the callback with the latest data
            data = self.__data.get(symbol, {})
            if field in data:
                (value, timestamp) = data[field]
                await callback(symbol, field, value, timestamp)

    async def unsubscribe(self, symbol, field, callback):
        async with self.__lock:
            # Remove the subscriber from the list of subscriptions
            self.__subscriptions[symbol][field].remove(callback)
            if not self.__subscriptions[symbol][field]:
                del self.__subscriptions[symbol][field]

            # Unsubscribe if we no longer have any subscriptions for this instrument
            if not self.__subscriptions[symbol]:
                msg = {"op": "unsubscribe", "args": [f"instrument:{symbol}"]}
                await self.__websocket.send(json.dumps(msg))
                del self.__subscriptions[symbol]
                self.__data.pop(symbol, None)

            # Disconnect if we no longer have any subscriptions
            if not self.__subscriptions:
                async with self.__lock:
                    await self.__disconnect()


class BitMexRTD(RTD):
    """RTD class for subscribing to BitMEX prices using the
    BitMex class above.
    """

    # Use a single BitMex object for all RTD functions
    _bitmex = BitMex(get_event_loop())

    def __init__(self, symbol, field):
        super().__init__(value="Waiting...")
        self.__symbol = symbol
        self.__field = field

    async def connect(self):
        # Subscribe to BitMix updates when Excel connects to the RTD object
        await self._bitmex.subscribe(self.__symbol, self.__field, self.__update)

    async def disconnect(self):
        # Unsubscribe to BitMix updates when Excel disconnects from the RTD object
        await self._bitmex.unsubscribe(self.__symbol, self.__field, self.__update)

    async def __update(self, symbol, field, value, timestamp):
        # Update the value in Excel
        self.value = value


@xl_func("string symbol, string field: rtd", recalc_on_open=True)
def bitmex_rtd(symbol, field="lastPrice"):
    """Subscribe to BitMEX prices for a given symbol."""
    return BitMexRTD(symbol, field)


if __name__ == "__main__":

    async def main():
        # This is the callback that will be called whenever there's an update
        async def callback(symbol, field, value, timestamp):
            print((symbol, field, value, timestamp))

        bm = BitMex()

        await bm.subscribe("XBTUSD", "lastPrice", callback)

        await asyncio.sleep(60)

        await bm.unsubscribe("XBTUSD", "lastPrice", callback)

        print("DONE!")

    # Run the 'main' function in an asyncio event loop
    loop = asyncio.get_event_loop()
    loop.create_task(main())
    loop.run_forever()
mkrieger1
  • 19,194
  • 5
  • 54
  • 65
  • What did you find in the documentation of the `asyncio.wait` function about this? – mkrieger1 Mar 07 '23 at 20:19
  • That's just a warning. It won't affect the script execution. – Tim Roberts Mar 07 '23 at 20:26
  • It will be depreciated October this year though. – r_automated Mar 07 '23 at 20:57
  • I tried to change it based on these 2 links, but couldn't quite get it. https://stackoverflow.com/questions/69889075/asyncio-wait-confusion-when-passed-a-coroutine https://stackoverflow.com/questions/66029772/alternative-to-asyncio-wait – r_automated Mar 07 '23 at 20:58
  • Passing coroutines to `wait` was deprecated in Python 3.8. As of Python 3.11 (released this *past* October), passing coroutines to `wait` is forbidden. – chepner Mar 07 '23 at 22:28

2 Answers2

0

It is nothing more than a warning. There is no need to fix it.

If you still want to fix it, dont pass the tasks list to asyncio.wait(). The documentation for asyncio.wait states "Run Future and Task instances in the aws iterable concurrently and block until the condition specified by return_when.". The type returned by an async def ...(...)-like defined function is a coroutine(type(async_function)==types.coroutine#true). Since this is neither a Future nor a Task, the warning is outputted. To fix it, just leave asyncio.wait() and the tasks list inside of __process_message out entirely:

    async def __process_message(self, msg):
    if msg.get("table", None) == "instrument":
        # Extract the data from the message, update our data dictionary and notify subscribers
        for data in msg.get("data", []):
            symbol = data["symbol"]
            timestamp = data["symbol"]

            # Update the latest values in our data dictionary and notify any subscribers
            subscribers = self.__subscriptions.get(symbol, {})
            latest = self.__data.setdefault(symbol, {})
            for field, value in data.items():
                latest[field] = (value, timestamp)

                # Notify the subscribers with the updated field
                for subscriber in subscribers.get(field, []):
                    await subscriber(symbol, field, value, timestamp)
HelpfulHelper
  • 226
  • 2
  • 5
  • Thank you so much. This works and now I see how this aligns with the other suggestions I was reading about. Thanks again! – r_automated Mar 07 '23 at 21:23
0

For anyone coming here from Google this is a warning that you need to fix, because the full warning is:

DeprecationWarning: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8, and scheduled for removal in Python 3.11.

The asyncio.wait() documentation obviously says nothing about this or what you're supposed to do instead, but as far as I can figure out you replace asyncio.wait([a, b]) with asyncio.gather(a, b).

Timmmm
  • 88,195
  • 71
  • 364
  • 509
  • 1
    `wait` and `gather` have different purposes. A more suitable solution might be along the lines of `asyncio.wait([asyncio.create_task(a), asyncio.create_task(b)])` if `wait` is truly needed—although I myself am struggling right now with finding a way to turn an `Awaitable` (like the result of an `anext()` call) into a `Task`. – Alex Peters Jul 08 '23 at 12:15
  • Okay, worked it out: basically had to wrap the `anext` call inside something like `async def a_coroutine(awaitable): return await anext(awaitable)`, which itself is a coroutine and can be wrapped within `create_task`. – Alex Peters Jul 08 '23 at 12:32
  • `wait` and `gather` have the same purpose as far as I can tell. Read the docs of `gather`. It automatically makes tasks. – Timmmm Jul 08 '23 at 13:39
  • 1
    I promise you, I've read the docs more times than I'd like to admit. `gather` is your go-to if you need to wait until **all** tasks complete (or one fails). If you need to be able to intercept when **some** tasks complete but not others, you need `wait`. – Alex Peters Jul 08 '23 at 14:20
  • Ah right, yeah in this case I wanted all of them. – Timmmm Jul 08 '23 at 21:04
  • I use `asyncio.wait(my_tasks, return_when=asyncio.FIRST_COMPLETED`, such functionality isn't available with `asyncio.gather()`. – omegastripes Aug 02 '23 at 16:34