pythontelegrammiddlewaremediaaiogram

MediaGrop Middleware Aiogram 3


How do I write a middleware to forward media groups to aiogram 3? Users in the group send a command to the bot. The bot will send them a message, to which they must respond with a media group, which the bot will then forward to everyone who uses the bot. But since sending media groups for a bot is several messages, I need a middleware to collect them all and then send them to users

I tried to use the code that is already available for aiogram 2, but I don't understand how to redo it for version 3


Solution

  • Simple Media middleware:

    import asyncio
    from typing import *
    from aiogram import BaseMiddleware
    from aiogram.types import Message, CallbackQuery
    
    class MediaMiddleware(BaseMiddleware):
        def __init__(self, latency: Union[int, float] = 0.01):
            self.medias = {}
            self.latency = latency
            super(MediaMiddleware, self).__init__()
    
    
        async def __call__(
            self,
            handler: Callable[[Union[Message, CallbackQuery], Dict[str, Any]], Awaitable[Any]],
            event: Union[Message, CallbackQuery],
            data: Dict[str, Any]
        ) -> Any:
    
            if isinstance(event, Message) and event.media_group_id:
                try:
                    self.medias[event.media_group_id].append(event)
                    return
                except KeyError:
                    self.medias[event.media_group_id] = [event]
                    await asyncio.sleep(self.latency)
    
                    data["media_events"] = self.medias.pop(event.media_group_id)
    
            return await handler(event, data) 
    

    Usage:

    @router.message(F.media_group_id != None)
    def on_media_group_id(message: Message, media_events: List[Message] = []):
        # message - last sended message
        # media_events - all events with the same media_group_id
        ...