import asyncio
from threading import Thread
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types
API_TOKEN = ''
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
chat_ids = {}
@dp.message_handler()
async def echo(message: types.Message):
# old style:
# await bot.send_message(message.chat.id, message.text)
chat_ids[message.message_id] = message.from_user
text = f'{message.message_id} {message.from_user} {message.text}'
await message.reply(text, reply=False)
async def periodic(sleep_for, queue):
while True:
await asyncio.sleep(sleep_for)
now = datetime.utcnow()
print(f"{now}")
for id in chat_ids:
queue.put_nowait((id, f"{now}"))
# await bot.send_message(id, f"{now}", disable_notification=True)
def run_tick(queue):
newloop = asyncio.new_event_loop()
asyncio.set_event_loop(newloop)
asyncio.run(periodic(3, queue))
if __name__ == '__main__':
queue = asyncio.Queue()
Thread(target=run_tick, args=(queue,), daemon=True).start()
executor.start_polling(dp, skip_updates=True)
I want to send messages to registered users by bot.send_message when there is a event but failed for now. Here are what I tried.
Is there any simple way to do this?
Edit: 2020-1-3
Here's working example as per @user4815162342.
import asyncio
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types
API_TOKEN = ''
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
chat_ids = {}
@dp.message_handler()
async def echo(message: types.Message):
chat_ids[message.from_user.id] = message.from_user
text = f'{message.message_id} {message.from_user} {message.text}'
await message.reply(text, reply=False)
async def periodic(sleep_for):
while True:
await asyncio.sleep(sleep_for)
now = datetime.utcnow()
print(f"{now}")
for id in chat_ids:
await bot.send_message(id, f"{now}", disable_notification=True)
if __name__ == '__main__':
dp.loop.create_task(periodic(10))
executor.start_polling(dp)
The initial problem was that you tried to call asyncio code from a different thread. To fix the resulting error you have created a new event loop while keeping the additional thread. As the saying goes, now you have two problems.
The queue idea looks unfinished because there is no code that reads from the queue; and even if there were, it wouldn't work because asyncio queues are not designed to be shared between event loops or between threads. To untangle the mess, you need to find a way to run your periodic updates from within the event loop, i.e. re-examine this assumption:
but there is no way to add my own task into executor.
Looking at the source of Executor
, it appears to pick up the event loop from the dispatcher, which holds it in the publicly accessible loop
attribute. That means that you can create a task simply by invoking the create_task
method on that loop. For example:
if __name__ == '__main__':
dp.loop.create_task(periodic())
executor.start_polling(dp, skip_updates=True)
Now periodic
can be formulated as in your initial attempt:
async def periodic(sleep_for, queue):
while True:
await asyncio.sleep(sleep_for)
now = datetime.utcnow()
for id in chat_ids:
await bot.send_message(id, f"{now}",
disable_notification=True)
Please note that I haven't tested this because I do not use aiogram
. A potential issue that you might need to address is that your chat_ids
dict appears to contain message.message_id
as key, whereas bot.send_message
accepts a message.chat.id
.