pythonpython-asynciotelegramaiogram

How to integrate custom task into aiogram executor?


import asyncio
from threading import Thread
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types

API_TOKEN = ''

bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)

chat_ids = {}

@dp.message_handler()
async def echo(message: types.Message):
    # old style:
    # await bot.send_message(message.chat.id, message.text)

    chat_ids[message.message_id] = message.from_user
    text = f'{message.message_id} {message.from_user} {message.text}'
    await message.reply(text, reply=False)


async def periodic(sleep_for, queue):
    while True:
        await asyncio.sleep(sleep_for)
        now = datetime.utcnow()
        print(f"{now}")
        for id in chat_ids:
            queue.put_nowait((id, f"{now}"))
            # await bot.send_message(id, f"{now}", disable_notification=True)


def run_tick(queue):
    newloop = asyncio.new_event_loop()
    asyncio.set_event_loop(newloop)
    asyncio.run(periodic(3, queue))


if __name__ == '__main__':
    queue = asyncio.Queue()
    Thread(target=run_tick, args=(queue,), daemon=True).start()
    executor.start_polling(dp, skip_updates=True)

I want to send messages to registered users by bot.send_message when there is a event but failed for now. Here are what I tried.

  1. bot.send_message crashes because it is called from another thread. (Timeout context manager should be used inside a task)
  2. So, I tried to workaround this by using queue, but there is no way to add my own task into executor.

Is there any simple way to do this?


Edit: 2020-1-3

Here's working example as per @user4815162342.

import asyncio
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types

API_TOKEN = ''

bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)

chat_ids = {}

@dp.message_handler()
async def echo(message: types.Message):
    chat_ids[message.from_user.id] = message.from_user
    text = f'{message.message_id} {message.from_user} {message.text}'
    await message.reply(text, reply=False)

async def periodic(sleep_for):
    while True:
        await asyncio.sleep(sleep_for)
        now = datetime.utcnow()
        print(f"{now}")
        for id in chat_ids:
            await bot.send_message(id, f"{now}", disable_notification=True)

if __name__ == '__main__':
    dp.loop.create_task(periodic(10))
    executor.start_polling(dp)


Solution

  • The initial problem was that you tried to call asyncio code from a different thread. To fix the resulting error you have created a new event loop while keeping the additional thread. As the saying goes, now you have two problems.

    The queue idea looks unfinished because there is no code that reads from the queue; and even if there were, it wouldn't work because asyncio queues are not designed to be shared between event loops or between threads. To untangle the mess, you need to find a way to run your periodic updates from within the event loop, i.e. re-examine this assumption:

    but there is no way to add my own task into executor.

    Looking at the source of Executor, it appears to pick up the event loop from the dispatcher, which holds it in the publicly accessible loop attribute. That means that you can create a task simply by invoking the create_task method on that loop. For example:

    if __name__ == '__main__':
        dp.loop.create_task(periodic())
        executor.start_polling(dp, skip_updates=True)
    

    Now periodic can be formulated as in your initial attempt:

    async def periodic(sleep_for, queue):
        while True:
            await asyncio.sleep(sleep_for)
            now = datetime.utcnow()
            for id in chat_ids:
                await bot.send_message(id, f"{now}",
                                       disable_notification=True)
    

    Please note that I haven't tested this because I do not use aiogram. A potential issue that you might need to address is that your chat_ids dict appears to contain message.message_id as key, whereas bot.send_message accepts a message.chat.id.