pythonformattingmarkdownaiogramgoogle-generativeai

How to stream a Gemini (Google Generative AI) response in a Telegram bot message?


I'm developing a telegram bot using aiogram which takes user's message as a prompt and generates a response using google-generativeai GenerativeModel. It works fine in normal mode, where I wait for a complete response, but I want to yield chunks of response with the stream=True parameter and add new chunks to a telegram message with message.edit_text() function. The problem is that I'm encountering an error: aiogram.exceptions.TelegramBadRequest: Telegram server says - Bad Request: can't parse entities: Can't find end of the entity starting at byte offset 73

Here are my functions:

from aiogram import Bot, Dispatcher, Router, types
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from google.api_core.exceptions import BadRequest as GoogleBadRequest

from app.database import engine
from app.models import Language, TelegramUser
from app.services.ai import generate_response
from app.services.users import get_or_create_telegram_user, get_translated_message as _, refresh_last_interaction_date


bot = Bot(token=TOKEN,
          default=DefaultBotProperties())

dp = Dispatcher()

telegram_router = Router()

dp.include_router(telegram_router)

...

@dp.message()
async def handle_prompt(message: types.Message):
    async with AsyncSession(engine) as db:
        user, __ = await get_or_create_telegram_user(message, db)
        prompt = message.text

        temp_message_text = await _("timer_1", user.language_id, db)
        temp_message = await message.answer(temp_message_text)

        # Accumulate the complete response
        accumulated_text = ""

        try:
            async for chunk in generate_response(prompt):
                accumulated_text += chunk

                # Send the complete response with correct formatting
                await temp_message.edit_text(
                    accumulated_text,
                    disable_web_page_preview=True,
                    parse_mode=ParseMode.MARKDOWN
                )

        except Exception as e:
            print(f"Error in handle_prompt: {e}")
            await temp_message.edit_text("An error occurred during processing.")

        finally:
            await refresh_last_interaction_date(user.chat_id, db)
# ai.py

from typing import AsyncGenerator

import google.generativeai as genai

from app.constants import GEMINI_API_KEY
from app.utils.helpers import sync_to_async_iterable

# Set up the Gemini API configuration
genai.configure(api_key=GEMINI_API_KEY)

# Initialize the Gemini model
model = genai.GenerativeModel(model_name="gemini-1.5-flash")


async def generate_response(prompt: str, **kwargs) -> AsyncGenerator[str, None]:
    try:
        response = sync_to_async_iterable(model.generate_content(prompt, stream=True, **kwargs))
        async for content in response:
            yield content.text
    except Exception as e:
        print(f"Error in generate_response: {e}")
        yield "An error occurred during processing."  # Yield an error message

The problem is that when I get chunks of the response, sometimes a markdown opening symbol is present in the chunk, but the closing symbol is not. If this happens, the error aiogram.exceptions.TelegramBadRequest: Telegram server says - Bad Request: can't parse entities: Can't find end of the entity starting at byte offset .. occurs. I've tried both ParseMode.MARKDOWN and ParseMode.MARKDOWN_V2, but the problem is the same. I've tried to remove parse_mode parameter if this exception is raised, but it didn't seem to help.

I've found out that generate_contents() returns a text formatted with markdown2 and that Telegram requires strict formatting.

I'm not very familiar with markdown formatting and asynchronous programming and I need help with this problem.

Python version: 3.10.8
Packages used:

aiogram==3.15.0
google-generativeai==0.8.3

Solution

  • So, after looking for workarounds I've come up with a function that closes all open markdown tags in accumulated text to ensure that a parsing error is not raised by Telegram.

    # helpers.py
    
    def ensure_valid_markdown(text: str) -> str:
        stack = []  # Stack to track opening tags
        result = []  # Accumulated characters
        i = 0  # Current character index
    
        # Markdown symbols: single-character and multi-character
        single_char_symbols = {'*', '`', '~'}
        multi_char_symbols = {'```'}
    
        while i < len(text):
            # Handle multi-character tags
            if text[i:i + 3] in multi_char_symbols:
                if stack and stack[-1] == '```':  # Close an open code block
                    stack.pop()
                    result.append('```')
                    i += 3
                else:  # Open a new code block
                    stack.append('```')
                    result.append('```')
                    i += 3
            elif text[i] in single_char_symbols:  # Handle single-character tags
                if stack and stack[-1] == text[i]:  # Close the tag
                    stack.pop()
                    result.append(text[i])
                else:  # Open a new tag
                    stack.append(text[i])
                    result.append(text[i])
                i += 1
            else:
                # Append normal characters
                result.append(text[i])
                i += 1
    
        # Close any unbalanced tags at the end of the stream
        while stack:
            unmatched = stack.pop()
            if unmatched == '```':
                result.append('```')  # Close a code block
            else:
                result.append(unmatched)  # Close single-character tag
    
        return ''.join(result)
    
    # main.py
    
    from app.utils.helpers import ensure_valid_markdown
    ...
    
    @dp.message()
    async def handle_prompt(message: types.Message):
        async with AsyncSession(engine) as db:
            user, __ = await get_or_create_telegram_user(message, db)
            prompt = message.text
    
            temp_message_text = await _("timer_1", user.language_id, db)
            temp_message = await message.answer(temp_message_text)
    
            # Accumulate the complete response
            accumulated_text = ""
            
            try:
                async for chunk in generate_response(prompt):
                    accumulated_text += chunk
    
                    valid_markdown_text = ensure_valid_markdown(accumulated_text) # make sure each markdown tag is closed
                    
    
                    # Send the complete response with correct formatting
                    await temp_message.edit_text(
                        valid_markdown_text, # send a valid markdown text
                        disable_web_page_preview=True,
                        parse_mode=ParseMode.MARKDOWN
                    )
    
            except Exception as e:
                print(f"Error in handle_prompt: {e}")
                await temp_message.edit_text("An error occurred during processing.")
    
            finally:
                await refresh_last_interaction_date(user.chat_id, db)