pythonsqlalchemypython-asynciolangchainagent

How can I create a tool for langgaph's agent to save data in db?


I created an agent using Langgraph in Python, and I developed a tool for them to save a todo in a database. But that tool doesn't work and raises an error!

I use SQLAlchemy to connect to the DB.

This is my DB code:

from datetime import datetime

from sqlalchemy import DateTime
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

engine = create_async_engine(
    settings.DATABASE_URL,
    pool_size=20,
    max_overflow=40,
    pool_timeout=30,
    pool_pre_ping=True,
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autoflush=False,
    autocommit=False,
)

and this is my agent:

async def run_agent(thread_id:int, human_message:str) -> MessagesState:
    """
    talk with your agent or execute agent

    Args:
        thread_id (int): thread id
        human_message (str): message of human

    Returns:
        MessagesState: return MessagesState
    """

    config = {"configurable": {"thread_id": thread_id}}

    async with AsyncSqliteSaver.from_conn_string(settings.AI_MEMORY_DB) as checkpointer:

        agent = create_agent(
            model=chat_model,
            tools=TOOLS,
            system_prompt=settings.SYSTEM_PROMPT,
            checkpointer=checkpointer
        )

        res = await agent.ainvoke({"messages": HumanMessage(human_message)}, config=config)

        return res

and this is my tools:

@tool
async def create_todo(date:str, time:str, title:str) -> str:
    """
    Use this tool IMMEDIATELY and WITHOUT EXCEPTION whenever the user wants to
    create a new todo, reminder, appointment, plan, or schedule anything for the future.

    This tool stores the user's schedule in a database so that it can be better recalled later and reports can be generated on the tasks.

    You MUST call this tool when you see phrases like:
    - Remind me to ...
    - Set a reminder for ...
    - Create a todo for ...
    - I have a meeting/appointment/plan on ...
    - Remember to call someone tomorrow
    - Schedule something
    - Don't let me forget to ...
    - Put in my calendar ...

    Even if the user speaks in Persian (Farsi) or gives the date in Persian calendar (Shamsi), 
    you MUST convert it to Gregorian (YYYY-MM-DD) yourself and call this tool.

    Examples that MUST trigger this tool:
    - "Remind me to go to the doctor next Monday at 3 PM"

    - "Set a reminder for December 25, 2025 at 14:30 for team meeting"
    
    Args:
        date (str): Date in EXACT YYYY-MM-DD format (example: 2025-11-26, 2025-12-25)
        time (str): Time in 24-hour HH:MM format with leading zeros (example: 09:30, 17:00, 08:15)
        title (str): Full title or description of the task/reminder exactly as the user wants it saved

    Returns:
        str: Success or error message
    """
    try:
        async with AsyncSessionLocal() as session:
            async with session.begin():
                _datetime = f"{date} {time}"
                datetime_to_do_it = await sync_to_async(datetime.strptime)(_datetime, "%Y-%m-%d %H:%M")
                todo = Todo(title=title, datetime_to_do_it=datetime_to_do_it)
                session.add(todo)

        
        return "todo sucessfuly saved"
    except Exception as ex:
        return f"something wrong! {ex}"


 
TOOLS = [
    create_todo,
]

But when I want my agent to save my todo, that doesn't work and raises an error:

Exception terminating connection <AdaptedConnection <asyncpg.connection.Connection object at 0x7f1f5017dc70>>
Traceback (most recent call last):
  File "/home/user/Documents/python_projects/Diana/venv/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 372, in _close_connection
    self._dialect.do_terminate(connection)
  File "/home/user/Documents/python_projects/Diana/venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 1127, in do_terminate
    dbapi_connection.terminate()
  File "/home/user/Documents/python_projects/Diana/venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py", line 402, in terminate
    self.await_(asyncio.shield(self._terminate_graceful_close()))  # type: ignore[attr-defined] # noqa: E501
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/Documents/python_projects/Diana/venv/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/Documents/python_projects/Diana/venv/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
            ^^^^^^^^^^^^
  File "/home/user/Documents/python_projects/Diana/venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 912, in _terminate_graceful_close
    await self._connection.close(timeout=2)
  File "/home/user/Documents/python_projects/Diana/venv/lib/python3.12/site-packages/asyncpg/connection.py", line 1504, in close
    await self._protocol.close(timeout)
  File "asyncpg/protocol/protocol.pyx", line 627, in close
  File "asyncpg/protocol/protocol.pyx", line 660, in asyncpg.protocol.protocol.BaseProtocol._request_cancel
  File "/home/user/Documents/python_projects/Diana/venv/lib/python3.12/site-packages/asyncpg/connection.py", line 1673, in _cancel_current_command
    self._cancellations.add(self._loop.create_task(self._cancel(waiter)))
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/base_events.py", line 456, in create_task
    self._check_closed()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
^C^Csys:1: RuntimeWarning: coroutine 'Connection._cancel' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

I know this error raises when my agent wants to use it because I tested it in IPython.


Solution

  • I solved it, and I'm not sure if this is best practice or not. I just use while to keep the tool alive until the process finishes.

    
    @tool
    async def create_todo(date:str, time:str, title:str) -> str:
        """
        Use this tool IMMEDIATELY and WITHOUT EXCEPTION whenever the user wants to
        create a new todo, reminder, appointment, plan, or schedule anything for the future.
    
        This tool stores the user's schedule in a database so that it can be better recalled later and reports can be generated on the tasks.
    
        You MUST call this tool when you see phrases like:
        - Remind me to ...
        - Set a reminder for ...
        - Create a todo for ...
        - I have a meeting/appointment/plan on ...
        - Remember to call someone tomorrow
        - Schedule something
        - Don't let me forget to ...
        - Put in my calendar ...
    
        Even if the user speaks in Persian (Farsi) or gives the date in Persian calendar (Shamsi), 
        you MUST convert it to Gregorian (YYYY-MM-DD) yourself and call this tool.
    
        Examples that MUST trigger this tool:
        - "Remind me to go to the doctor next Monday at 3 PM"
    
        - "Set a reminder for December 25, 2025 at 14:30 for team meeting"
        
        Args:
            date (str): Date in EXACT YYYY-MM-DD format (example: 2025-11-26, 2025-12-25)
            time (str): Time in 24-hour HH:MM format with leading zeros (example: 09:30, 17:00, 08:15)
            title (str): Full title or description of the task/reminder exactly as the user wants it saved
    
        Returns:
            str: Success or error message
        """
        while True:
            try:
                async with AsyncSessionLocal() as session:
                    async with session.begin():
                        _datetime = f"{date} {time}"
                        datetime_to_do_it = await sync_to_async(datetime.strptime)(_datetime, "%Y-%m-%d %H:%M")
                        todo = Todo(title=title, datetime_to_do_it=datetime_to_do_it)
                        session.add(todo)
    
                
                return "todo sucessfuly saved"
            except Exception as ex:
                return f"something wrong! {ex}"