pythonmysqltaskpython-asynciopycord

How can i fix Task was destroyed but it is pending?


I have a problem. So I have a task that runs every time when a user writes a chat message on my discord server - it's called on_message. So my bot has many things to do in this event, and I often get this kind of error:

Task was destroyed but it is pending!
task: <Task pending name='pycord: on_message' coro=<Client._run_event() done, defined at /Bots/gift-bot/discord/client.py:374> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f68a7bdfc10>()]>>

So I think if I want to fix this, I need to speedup my code. But sadly, I don't have any clue how i can do it to fix this error.

Edit: I integrated timings and this is what I get printed:

Task was destroyed but it is pending!
task: <Task pending name='pycord: on_message' coro=<Client._run_event() done, defined at /Bots/gift-bot/discord/client.py:374> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f01063f98e0>()]>>
2 if checks done - 7.867813110351562e-06
5 if checks done - 0.0061550140380859375
mysql checks done - 0.010785341262817383
task done - 0.13075661659240723
2 if checks done - 8.344650268554688e-06
5 if checks done - 0.011545896530151367
mysql checks done - 0.02138519287109375
task done - 0.11132025718688965
2 if checks done - 2.0503997802734375e-05
5 if checks done - 0.008122920989990234
mysql checks done - 0.012276411056518555
2 if checks done - 1.0728836059570312e-05
5 if checks done - 0.014346837997436523
mysql checks done - 0.040288448333740234
task done - 0.12520265579223633
2 if checks done - 1.0728836059570312e-05
5 if checks done - 0.0077972412109375
mysql checks done - 0.013320684432983398
task done - 0.1502058506011963
task done - 0.10663175582885742
2 if checks done - 9.775161743164062e-06
5 if checks done - 0.006486177444458008
mysql checks done - 0.011229515075683594
Task was destroyed but it is pending!
task: <Task pending name='pycord: on_message' coro=<Client._run_event() done, defined at /Bots/gift-bot/discord/client.py:374> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f010609a9d0>()]>>
2 if checks done - 6.67572021484375e-06
5 if checks done - 0.0049741268157958984
mysql checks done - 0.008575677871704102
task done - 0.10633635520935059

And this is the code for the integrated timings:

    @commands.Cog.listener("on_message")
    async def on_message(self, message):
        start = time.time()

        # Check ob Nachricht gezählt werden kann


        if message.author.bot:
            return

        if message.type != discord.MessageType.default:
            return
            
        print(f"2 if checks done - {time.time() - start}")

        if isinstance(message.channel, discord.channel.DMChannel):
            return await message.reply(f'Hey {message.author.name}!\nLeider bin ich der falsche Ansprechpartner, falls du Hilfe suchst.. <:pepe_hands:705896495601287320>\nBetrete den https://discord.gg/deutschland Bl4cklist-Discord und sende unserem Support-Bot <@671421220566204446> (`Bl4cklist🔥Support#7717`) eine Private-Nachricht, damit sich unser Support-Team um dein Problem so schnell es geht kümmern kann. <:pepe_love:759741232443949107>')

        # ENTFERNEN AM 30. APRIL
        prefix_now = await get_prefix(message)
        if message.content.startswith(str(prefix_now)):
            try:
                await message.reply("› <a:alarm:769215249261789185> - **UMSTIEG AUF SLASH-COMMANDS:** Ab **jetzt** laufen alle Befehle dieses Bots auf `/` - um Leistung zu sparen und die Erfahrung zu verbessern. Nutze `/help` um eine Befehlsliste zu sehen.")
            except discord.Forbidden:
                pass
            return

        if self.client.user in message.mentions:

                response = choice([
                "Mit mir kann man die coolsten Gewinnspiele starten! <a:gift:843914342835421185>",
                'Wird Zeit jemanden den Tag zu versüßen! <:smile:774755282618286101>',
                "Wer nicht auf diesem Server ist, hat die Kontrolle über sein Leben verloren! <a:lach_blue2:803693710490861608>",
                "Wann startet endlich ein neues Gewinnspiel? <:whut:848347703217487912>",
                "Ich bin der BESTE Gewinnspiel-Bot - Wer was anderes sagt, lügt! <:wyldekatze:842157727169773608>"
                ])

                try:
                    await message.reply(f"{response} (Mein Präfix: `/`)", mention_author=False)
                except (discord.Forbidden, discord.HTTPException, discord.NotFound):
                    pass
                return
                
        print(f"5 if checks done - {time.time() - start}")


        # Cooldown


        #self.member_cooldown_list = [i for i in self.member_cooldown_list if i[1] + self.cooldown_val > int(time.time())]
        #member_index = next((i for i, v in enumerate(self.member_cooldown_list) if v[0] == message.author.id), None)
        #if member_index is not None:
        #    if self.member_cooldown_list[member_index][1] + self.cooldown_val > int(time.time()):
        #        return

        #self.member_cooldown_list.append((message.author.id, int(time.time())))


        # Rollen-Check (Bonus/Ignore)


        count = 1
        mydb = await getConnection()
        mycursor = await mydb.cursor()
        await mycursor.execute("SELECT ignore_role_id, bonus_role_id FROM guild_role_settings WHERE guild_id = %s", (message.author.guild.id,))
        in_database = await mycursor.fetchone()
        if in_database:
            if in_database[0] is not None:
                role_list = in_database[0].split(" ")
                for roleid in role_list:
                    try:
                        int(roleid)
                    except ValueError:
                        continue

                    role = message.author.guild.get_role(int(roleid))
                    if role is None:
                        continue

                    if role in message.author.roles:
                        await mycursor.close()
                        mydb.close()
                        return

            if in_database[1] is not None:
                role_list = in_database[1].split(" ")
                for roleid in role_list:
                    try:
                        int(roleid)
                    except ValueError:
                        continue

                    role = message.author.guild.get_role(int(roleid))
                    if role is None:
                        continue

                    if role in message.author.roles:
                        count += 1


        # Kanal-Check (Bonus/Ignore)


        await mycursor.execute("SELECT ignore_channel_id FROM guild_channel_settings WHERE guild_id = %s", (message.author.guild.id,))
        in_database1 = await mycursor.fetchone()
        if in_database1:
            if in_database1[0] is not None:
                channel_list = in_database1[0].split(" ")
                for channelid in channel_list:

                    try:
                        int(channelid)
                    except ValueError:
                        continue

                    if int(message.channel.id) == int(channelid):
                        await mycursor.close()
                        mydb.close()
                        return
                        
        print(f"mysql checks done - {time.time() - start}")


        # In Datenbank eintragen

        await mycursor.execute("SELECT * FROM guild_message_count WHERE guild_id = %s AND user_id = %s",
                               (message.author.guild.id, message.author.id))
        in_database2 = await mycursor.fetchone()
        if in_database2:
            await mycursor.execute(
                "UPDATE guild_message_count SET user_id = %s, message_count = message_count + %s WHERE guild_id = %s AND user_id = %s",
                (message.author.id, count, message.author.guild.id, message.author.id))
        else:
            await mycursor.execute(
                "INSERT INTO guild_message_count (user_id, message_count, guild_id) VALUES (%s, %s, %s)",
                (message.author.id, count, message.author.guild.id))

        await mydb.commit()
        await mycursor.close()
        mydb.close()
        
        print(f"task done - {time.time() - start}")

If I try to start my bot with asyncio.run(client.start('token')) I'm getting this error multiple times:

Ignoring exception in on_guild_channel_delete
Traceback (most recent call last):
  File "/Bots/gift-bot/discord/client.py", line 382, in _run_event
    await coro(*args, **kwargs)
  File "/Bots/gift-bot/cogs/misc_events.py", line 738, in on_guild_channel_delete
    await self.client.wait_until_ready()
  File "/Bots/gift-bot/discord/client.py", line 978, in wait_until_ready
    await self._ready.wait()
  File "/usr/local/lib/python3.9/asyncio/locks.py", line 226, in wait
    await fut
RuntimeError: Task <Task pending name='pycord: on_guild_channel_delete' coro=<Client._run_event() running at /Bots/gift-bot/discord/client.py:382>> got Future <Future pending> attached to a different loop

I'm using Python3.9 on a Debian 10 vServer with pycord2.0.0b5.


Solution

  • The await expression blocks the containing coroutine until the awaited awaitable returns. This hinders the progress of the coroutine. But await is necessary in a coroutine to yield control back to the event loop so that other coroutines can progress.

    Too many awaits can be problematic, it just makes progress slow.
    I've refactored on_message coroutine method by breaking it into sub tasks.

    async def _check_channel(self, message, pool):
        async with pool.acquire() as conn:
            async with conn.cursor() as cursor:
                await cursor.execute(
                    "SELECT ignore_channel_id FROM guild_channel_settings WHERE guild_id = %s",
                    (message.author.guild.id,),
                )
                in_database = await cursor.fetchone()
    
        if in_database and in_database[0] is not None:
            channel_list = in_database[0].split(" ")
            for channelid in channel_list:
    
                try:
                    channel_id_int = int(channelid)
                except ValueError:
                    continue
    
                if int(message.channel.id) == channel_id_int:
                    return False
    
    
    async def _get_role_count(self, message, pool):
        async with pool.acquire() as conn:
            async with conn.cursor() as cursor:
                await cursor.execute(
                    "SELECT ignore_role_id, bonus_role_id FROM guild_role_settings WHERE guild_id = %s",
                    (message.author.guild.id,),
                )
                in_database = await cursor.fetchone()
        if in_database:
            first_item, second_item, *_ = in_database
            if first_item is not None:
                role_list = first_item.split(" ")
                for roleid in role_list:
                    try:
                        roleid_int = int(roleid)
                    except ValueError:
                        continue
    
                    role = message.author.guild.get_role(roleid_int)
                    if role is None:
                        continue
                    if role in message.author.roles:
                        return False
    
            if second_item is not None:
                role_list = second_item.split(" ")
                count = 0
                for roleid in role_list:
                    try:
                        roleid_int = int(roleid)
                    except ValueError:
                        continue
    
                    role = message.author.guild.get_role(roleid_int)
                    if role is None:
                        continue
                    if role in message.author.roles:
                        count += 1
                return count
    
    
    @commands.Cog.listener("on_message")
    async def on_message(self, message):
        if message.author.bot:
            return
        if message.type != discord.MessageType.default:
            return
        if isinstance(message.channel, discord.channel.DMChannel):
            return
    
        # Cooldown
    
        self.member_cooldown_list = [
            i
            for i in self.member_cooldown_list
            if i[1] + self.cooldown_val > int(time.time())
        ]
        member_index = next(
            (
                i
                for i, v in enumerate(self.member_cooldown_list)
                if v[0] == message.author.id
            ),
            None,
        )
        if member_index is not None:
            if self.member_cooldown_list[member_index][1] + self.cooldown_val > int(
                time.time()
            ):
                return
    
        self.member_cooldown_list.append((message.author.id, int(time.time())))
    
        loop = asyncio.get_running_loop()
        db_pool = await aiomysql.create_pool(
            minsize=3,
            host="<host>",
            port=3306,
            user="<user>",
            password="<password>",
            db="<db_name>",
            autocommit=False,
            loop=loop,
        )
        count = 1
    
        check_channel_task = asyncio.create_task(
            self._check_channel(self, message, db_pool)
        )
        role_count_task = asyncio.create_task(self._get_role_count(self, message, db_pool))
    
        # write to database
    
        mydb = await db_pool.acquire()
        mycursor = await mydb.cursor()
        await mycursor.execute(
            "SELECT * FROM guild_message_count WHERE guild_id = %s AND user_id = %s",
            (message.author.guild.id, message.author.id),
        )
        in_database = await mycursor.fetchone()
    
        role_count = await role_count_task
        check_channel = await check_channel_task
        if False in (role_count, check_channel):
            await mycursor.close()
            db_pool.release(mydb)
            db_pool.close()
            await db_pool.wait_closed()
            return
        if role_count:
            count += role_count
        if in_database:
            await mycursor.execute(
                "INSERT INTO guild_message_count (user_id, message_count, guild_id) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE message_count = message_count + 1",
                (message.author.id, count, message.author.guild.id),
            )
    
        await mydb.commit()
        await mycursor.close()
        db_pool.release(mydb)
        db_pool.close()
        await db_pool.wait_closed()
    

    I've created two private async methods with code from part of the on_message method to make progress concurrent. While on_message is blocked in an await, the refactored methods may progress independent of on_message method. To make this happen I create two tasks out of the two new coroutines. asyncio.create_tasks schedules tasks to be run negating the need for an await. These tasks may run as soon as on_message yields control back to event loop on any await following the tasks creation.

    I didn't run the code. This is best effort. You have to try experimenting by moving the block which awaits the tasks around. And also run it with client.run to avoid got Future attached to a different loop error.