5

I have a problem. So I have a task that runs every time when a user writes a chat message on my discord server - it's called on_message. So my bot has many things to do in this event, and I often get this kind of error:

Task was destroyed but it is pending!
task: <Task pending name='pycord: on_message' coro=<Client._run_event() done, defined at /Bots/gift-bot/discord/client.py:374> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f68a7bdfc10>()]>>

So I think if I want to fix this, I need to speedup my code. But sadly, I don't have any clue how i can do it to fix this error.

Edit: I integrated timings and this is what I get printed:

Task was destroyed but it is pending!
task: <Task pending name='pycord: on_message' coro=<Client._run_event() done, defined at /Bots/gift-bot/discord/client.py:374> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f01063f98e0>()]>>
2 if checks done - 7.867813110351562e-06
5 if checks done - 0.0061550140380859375
mysql checks done - 0.010785341262817383
task done - 0.13075661659240723
2 if checks done - 8.344650268554688e-06
5 if checks done - 0.011545896530151367
mysql checks done - 0.02138519287109375
task done - 0.11132025718688965
2 if checks done - 2.0503997802734375e-05
5 if checks done - 0.008122920989990234
mysql checks done - 0.012276411056518555
2 if checks done - 1.0728836059570312e-05
5 if checks done - 0.014346837997436523
mysql checks done - 0.040288448333740234
task done - 0.12520265579223633
2 if checks done - 1.0728836059570312e-05
5 if checks done - 0.0077972412109375
mysql checks done - 0.013320684432983398
task done - 0.1502058506011963
task done - 0.10663175582885742
2 if checks done - 9.775161743164062e-06
5 if checks done - 0.006486177444458008
mysql checks done - 0.011229515075683594
Task was destroyed but it is pending!
task: <Task pending name='pycord: on_message' coro=<Client._run_event() done, defined at /Bots/gift-bot/discord/client.py:374> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f010609a9d0>()]>>
2 if checks done - 6.67572021484375e-06
5 if checks done - 0.0049741268157958984
mysql checks done - 0.008575677871704102
task done - 0.10633635520935059

And this is the code for the integrated timings:

    @commands.Cog.listener("on_message")
    async def on_message(self, message):
        start = time.time()

        # Check ob Nachricht gezählt werden kann


        if message.author.bot:
            return

        if message.type != discord.MessageType.default:
            return
            
        print(f"2 if checks done - {time.time() - start}")

        if isinstance(message.channel, discord.channel.DMChannel):
            return await message.reply(f'Hey {message.author.name}!\nLeider bin ich der falsche Ansprechpartner, falls du Hilfe suchst.. <:pepe_hands:705896495601287320>\nBetrete den https://discord.gg/deutschland Bl4cklist-Discord und sende unserem Support-Bot <@671421220566204446> (`Bl4cklistSupport#7717`) eine Private-Nachricht, damit sich unser Support-Team um dein Problem so schnell es geht kümmern kann. <:pepe_love:759741232443949107>')

        # ENTFERNEN AM 30. APRIL
        prefix_now = await get_prefix(message)
        if message.content.startswith(str(prefix_now)):
            try:
                await message.reply("› <a:alarm:769215249261789185> - **UMSTIEG AUF SLASH-COMMANDS:** Ab **jetzt** laufen alle Befehle dieses Bots auf `/` - um Leistung zu sparen und die Erfahrung zu verbessern. Nutze `/help` um eine Befehlsliste zu sehen.")
            except discord.Forbidden:
                pass
            return

        if self.client.user in message.mentions:

                response = choice([
                "Mit mir kann man die coolsten Gewinnspiele starten! <a:gift:843914342835421185>",
                'Wird Zeit jemanden den Tag zu versüßen! <:smile:774755282618286101>',
                "Wer nicht auf diesem Server ist, hat die Kontrolle über sein Leben verloren! <a:lach_blue2:803693710490861608>",
                "Wann startet endlich ein neues Gewinnspiel? <:whut:848347703217487912>",
                "Ich bin der BESTE Gewinnspiel-Bot - Wer was anderes sagt, lügt! <:wyldekatze:842157727169773608>"
                ])

                try:
                    await message.reply(f"{response} (Mein Präfix: `/`)", mention_author=False)
                except (discord.Forbidden, discord.HTTPException, discord.NotFound):
                    pass
                return
                
        print(f"5 if checks done - {time.time() - start}")


        # Cooldown


        #self.member_cooldown_list = [i for i in self.member_cooldown_list if i[1] + self.cooldown_val > int(time.time())]
        #member_index = next((i for i, v in enumerate(self.member_cooldown_list) if v[0] == message.author.id), None)
        #if member_index is not None:
        #    if self.member_cooldown_list[member_index][1] + self.cooldown_val > int(time.time()):
        #        return

        #self.member_cooldown_list.append((message.author.id, int(time.time())))


        # Rollen-Check (Bonus/Ignore)


        count = 1
        mydb = await getConnection()
        mycursor = await mydb.cursor()
        await mycursor.execute("SELECT ignore_role_id, bonus_role_id FROM guild_role_settings WHERE guild_id = %s", (message.author.guild.id,))
        in_database = await mycursor.fetchone()
        if in_database:
            if in_database[0] is not None:
                role_list = in_database[0].split(" ")
                for roleid in role_list:
                    try:
                        int(roleid)
                    except ValueError:
                        continue

                    role = message.author.guild.get_role(int(roleid))
                    if role is None:
                        continue

                    if role in message.author.roles:
                        await mycursor.close()
                        mydb.close()
                        return

            if in_database[1] is not None:
                role_list = in_database[1].split(" ")
                for roleid in role_list:
                    try:
                        int(roleid)
                    except ValueError:
                        continue

                    role = message.author.guild.get_role(int(roleid))
                    if role is None:
                        continue

                    if role in message.author.roles:
                        count += 1


        # Kanal-Check (Bonus/Ignore)


        await mycursor.execute("SELECT ignore_channel_id FROM guild_channel_settings WHERE guild_id = %s", (message.author.guild.id,))
        in_database1 = await mycursor.fetchone()
        if in_database1:
            if in_database1[0] is not None:
                channel_list = in_database1[0].split(" ")
                for channelid in channel_list:

                    try:
                        int(channelid)
                    except ValueError:
                        continue

                    if int(message.channel.id) == int(channelid):
                        await mycursor.close()
                        mydb.close()
                        return
                        
        print(f"mysql checks done - {time.time() - start}")


        # In Datenbank eintragen

        await mycursor.execute("SELECT * FROM guild_message_count WHERE guild_id = %s AND user_id = %s",
                               (message.author.guild.id, message.author.id))
        in_database2 = await mycursor.fetchone()
        if in_database2:
            await mycursor.execute(
                "UPDATE guild_message_count SET user_id = %s, message_count = message_count + %s WHERE guild_id = %s AND user_id = %s",
                (message.author.id, count, message.author.guild.id, message.author.id))
        else:
            await mycursor.execute(
                "INSERT INTO guild_message_count (user_id, message_count, guild_id) VALUES (%s, %s, %s)",
                (message.author.id, count, message.author.guild.id))

        await mydb.commit()
        await mycursor.close()
        mydb.close()
        
        print(f"task done - {time.time() - start}")

If I try to start my bot with asyncio.run(client.start('token')) I'm getting this error multiple times:

Ignoring exception in on_guild_channel_delete
Traceback (most recent call last):
  File "/Bots/gift-bot/discord/client.py", line 382, in _run_event
    await coro(*args, **kwargs)
  File "/Bots/gift-bot/cogs/misc_events.py", line 738, in on_guild_channel_delete
    await self.client.wait_until_ready()
  File "/Bots/gift-bot/discord/client.py", line 978, in wait_until_ready
    await self._ready.wait()
  File "/usr/local/lib/python3.9/asyncio/locks.py", line 226, in wait
    await fut
RuntimeError: Task <Task pending name='pycord: on_guild_channel_delete' coro=<Client._run_event() running at /Bots/gift-bot/discord/client.py:382>> got Future <Future pending> attached to a different loop

I'm using Python3.9 on a Debian 10 vServer with pycord2.0.0b5.

Razzer
  • 492
  • 1
  • 4
  • 29
  • 2
    First meassure how much time is taking every part of your code. The more primitive profiling is just add print() lines with timestamps around suspicious lines, by example the queries you do on the database. Then, when you know what is taking time, you can see how to improve it. A query to the database could be improved, or you could need to add some index, etc. – Gonzalo Odiard Mar 20 '22 at 00:38
  • I will do that, but I think the MySQL part needs the most time. Before I worked with MySQL in this event, I never had such an error. – Razzer Mar 20 '22 at 00:40
  • maybe you should better organize data in database - so you wouldn't have to check `None` and use `split(" ")` but you could send `message.channel.id` directly to database and get simply result. It would need to keep every value in separated row instead of keeping as values in one row separated by space. – furas Mar 20 '22 at 04:48
  • Instead of using a select, and then doing an insert or update, you can try to use an UPSERT *INSERT ... ON DUPLICATE KEY*. Is much faster than what you are doing – nacho Mar 20 '22 at 10:29
  • I'm not familiar with the language and package being used. Are you getting a new MySQL connection unnecessarily? – Rick James Mar 20 '22 at 15:58
  • Is `guild_id` an integer in both tables? Is it the `PRIMARY KEY`? If so, why are `guild_role_settings` and `guild_channel_settings` separate tables? – Rick James Mar 20 '22 at 16:04
  • I updated my code. guild_id is always an integer. – Razzer Mar 21 '22 at 12:07
  • 2
    I think your problem is that the `asyncio` event loop is being terminated early before your tasks complete and resulting in the error. Have you tried using `discord.ext.tasks` https://discordpy.readthedocs.io/en/stable/ext/tasks/index.html# ? – Anwar Husain Mar 28 '22 at 17:35
  • for what should i use tasks? Its an event. – Razzer Mar 28 '22 at 19:25
  • What is your python code doing? There's the `message.author.guild.get_role()` method. But it's very possible you can do it completely with SQL instead of iterating through the Python objects. – Cole Mar 30 '22 at 01:05
  • Some ideas: Use only one DB connection. Try to express as much as possible in SQL - all logic shown here is possible to express in SQL. Parallelize all SQL execution: Put each in different `async` function, then launch each e.g. `chans = asyncio.create_task(get_channel_ids())`, and only check for the return conditions after all tasks were launched like this, e.g. `if message.channel.id in (await chans): return` (see [docs](https://docs.python.org/3/library/asyncio-task.html#awaitables)) – Nearoo Mar 30 '22 at 12:05
  • I got this error when querying an empty table. – vanya Jun 21 '23 at 12:33
  • @vanya check your code. For me it was the problem that my code was too slow and after improving it and create some functions where the code was moved, all worked. – Razzer Aug 13 '23 at 21:03

4 Answers4

5

The await expression blocks the containing coroutine until the awaited awaitable returns. This hinders the progress of the coroutine. But await is necessary in a coroutine to yield control back to the event loop so that other coroutines can progress.

Too many awaits can be problematic, it just makes progress slow.
I've refactored on_message coroutine method by breaking it into sub tasks.

async def _check_channel(self, message, pool):
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(
                "SELECT ignore_channel_id FROM guild_channel_settings WHERE guild_id = %s",
                (message.author.guild.id,),
            )
            in_database = await cursor.fetchone()

    if in_database and in_database[0] is not None:
        channel_list = in_database[0].split(" ")
        for channelid in channel_list:

            try:
                channel_id_int = int(channelid)
            except ValueError:
                continue

            if int(message.channel.id) == channel_id_int:
                return False


async def _get_role_count(self, message, pool):
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(
                "SELECT ignore_role_id, bonus_role_id FROM guild_role_settings WHERE guild_id = %s",
                (message.author.guild.id,),
            )
            in_database = await cursor.fetchone()
    if in_database:
        first_item, second_item, *_ = in_database
        if first_item is not None:
            role_list = first_item.split(" ")
            for roleid in role_list:
                try:
                    roleid_int = int(roleid)
                except ValueError:
                    continue

                role = message.author.guild.get_role(roleid_int)
                if role is None:
                    continue
                if role in message.author.roles:
                    return False

        if second_item is not None:
            role_list = second_item.split(" ")
            count = 0
            for roleid in role_list:
                try:
                    roleid_int = int(roleid)
                except ValueError:
                    continue

                role = message.author.guild.get_role(roleid_int)
                if role is None:
                    continue
                if role in message.author.roles:
                    count += 1
            return count


@commands.Cog.listener("on_message")
async def on_message(self, message):
    if message.author.bot:
        return
    if message.type != discord.MessageType.default:
        return
    if isinstance(message.channel, discord.channel.DMChannel):
        return

    # Cooldown

    self.member_cooldown_list = [
        i
        for i in self.member_cooldown_list
        if i[1] + self.cooldown_val > int(time.time())
    ]
    member_index = next(
        (
            i
            for i, v in enumerate(self.member_cooldown_list)
            if v[0] == message.author.id
        ),
        None,
    )
    if member_index is not None:
        if self.member_cooldown_list[member_index][1] + self.cooldown_val > int(
            time.time()
        ):
            return

    self.member_cooldown_list.append((message.author.id, int(time.time())))

    loop = asyncio.get_running_loop()
    db_pool = await aiomysql.create_pool(
        minsize=3,
        host="<host>",
        port=3306,
        user="<user>",
        password="<password>",
        db="<db_name>",
        autocommit=False,
        loop=loop,
    )
    count = 1

    check_channel_task = asyncio.create_task(
        self._check_channel(self, message, db_pool)
    )
    role_count_task = asyncio.create_task(self._get_role_count(self, message, db_pool))

    # write to database

    mydb = await db_pool.acquire()
    mycursor = await mydb.cursor()
    await mycursor.execute(
        "SELECT * FROM guild_message_count WHERE guild_id = %s AND user_id = %s",
        (message.author.guild.id, message.author.id),
    )
    in_database = await mycursor.fetchone()

    role_count = await role_count_task
    check_channel = await check_channel_task
    if False in (role_count, check_channel):
        await mycursor.close()
        db_pool.release(mydb)
        db_pool.close()
        await db_pool.wait_closed()
        return
    if role_count:
        count += role_count
    if in_database:
        await mycursor.execute(
            "INSERT INTO guild_message_count (user_id, message_count, guild_id) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE message_count = message_count + 1",
            (message.author.id, count, message.author.guild.id),
        )

    await mydb.commit()
    await mycursor.close()
    db_pool.release(mydb)
    db_pool.close()
    await db_pool.wait_closed()

I've created two private async methods with code from part of the on_message method to make progress concurrent. While on_message is blocked in an await, the refactored methods may progress independent of on_message method. To make this happen I create two tasks out of the two new coroutines. asyncio.create_tasks schedules tasks to be run negating the need for an await. These tasks may run as soon as on_message yields control back to event loop on any await following the tasks creation.

I didn't run the code. This is best effort. You have to try experimenting by moving the block which awaits the tasks around. And also run it with client.run to avoid got Future attached to a different loop error.

Nizam Mohamed
  • 8,751
  • 24
  • 32
  • Thank you for your work! But sadly, i'm getting this stacktrace error: https://www.toptal.com/developers/hastebin/uwizugonij.sql - I'm using aiomysql. I think pools would be working in that case, but I have no clue how to use them. – Razzer Apr 01 '22 at 17:32
  • Yes, you're right. Updated with pool. – Nizam Mohamed Apr 01 '22 at 22:57
3

This error could be because of improper shutdown of the event loop.

This code demonstrates improper shutdown.

Here as soon as the loop.run_until_complete method returns when task t1 is done, the loop is closed but the task t2 is yet to be completed. This raises the error.

import asyncio


async def delay(n):
    print(f"sleeping for {n} second(s)")
    await asyncio.sleep(n)
    print(f"done sleeping for {n} second(s)")


loop = asyncio.get_event_loop()
t1 = loop.create_task(delay(1))
t2 = loop.create_task(delay(2))
loop.run_until_complete(t1)
loop.close()

Output;

sleeping for 1 second(s)
sleeping for 2 second(s)
done sleeping for 1 second(s)
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<delay() running at aio.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fa794c5b970>()]>

The proper shutdown.

Here the second loop.run_until_complete waits for group of pending tasks before loop closure. This behavior ensures proper closure.

import asyncio


async def delay(n):
    print(f"sleeping for {n} second(s)")
    await asyncio.sleep(n)
    print(f"done sleeping for {n} second(s)")


loop = asyncio.get_event_loop()
t1 = loop.create_task(delay(1))
t2 = loop.create_task(delay(2))
loop.run_until_complete(t1)

pending = asyncio.all_tasks(loop=loop)
group = asyncio.gather(*pending)
loop.run_until_complete(group)

loop.close()

The easy way is with asyncio.run which handles proper shutdown of the loop.

import asyncio


async def delay(n):
    print(f"sleeping for {n} second(s)")
    await asyncio.sleep(n)
    print(f"done sleeping for {n} second(s)")


async def main():
    t1 = asyncio.create_task(delay(1))
    t2 = asyncio.create_task(delay(2))
    await t2


asyncio.run(main())

For the problem with pycord, try this solution for now where client is instance of discord.Client i.e. your class. Instead of calling client.run we create a coro and pass it to asyncio.run.

import asyncio

asyncio.run(client.start('token'))
Nizam Mohamed
  • 8,751
  • 24
  • 32
3

I met a bug with the same error, then I found this https://github.com/python/cpython/pull/29163: task would be garbage collected when there is no strong reference, maybe it is useful to you.

ppd0705
  • 51
  • 2
-2

IODKU lets you eliminate the separate SELECT:

await mycursor.execute("SELECT * FROM guild_message_count WHERE guild_id = %s AND user_id = %s", (message.author.guild.id, message.author.id))
        in_database2 = await mycursor.fetchone()
        if in_database2:
            await mycursor.execute("UPDATE guild_message_count SET user_id = %s, message_count = message_count + %s WHERE guild_id = %s AND user_id = %s", (message.author.id, count, message.author.guild.id, message.author.id))
        else:
            await mycursor.execute("INSERT INTO guild_message_count (user_id, message_count, guild_id) VALUES (%s, %s, %s)", (message.author.id, count, message.author.guild.id))

-->

INSERT INTO guild_message_count
        (user_id, message_count, guild_id)
        VALUES 
        (%s, %s, %s)
    ON DUPLICATE KEY UPDATE
        message_count = message_count + 1

Are you using autocommit = ON? Transactions? (The choice there may impact the performance; please provide details so we can advise.)

Rick James
  • 135,179
  • 13
  • 127
  • 222