postgresqlsqlalchemypython-asynciofastapialembic

Async SQLAlchemy Engine Alembic migrations


Problem Statement

I am converting my SQLAlchemy PostgreSQL drivers to async, and I need to perform Alembic migrations using an async engine. It's failing.

What I have do so far

Error

2023-12-24 14:36:22 Starting entrypoint.sh
2023-12-24 14:36:29 Database is up and running
2023-12-24 14:36:29 Generating migrations
2023-12-24 14:36:38 Generating /app/alembic/versions/9a4735888d4b_initial_migration.py ...  done
2023-12-24 14:36:41 Running migrations
2023-12-24 14:36:45 Migration completed successfully.
2023-12-24 14:36:45 Seeding with test user
2023-12-24 14:36:49 An error occurred while seeding the expressions: AsyncConnection context has not been started and object has not been awaited.
2023-12-24 14:36:50 Inside start_server function
2023-12-24 14:36:50 Starting ngrok
Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml                                
2023-12-24 14:36:22 wait-for-it.sh: waiting 60 seconds for db:5432
2023-12-24 14:36:29 wait-for-it.sh: db:5432 is available after 7 seconds
2023-12-24 14:36:38 INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
2023-12-24 14:36:38 INFO  [alembic.runtime.migration] Will assume transactional DDL.
2023-12-24 14:36:45 INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
2023-12-24 14:36:45 INFO  [alembic.runtime.migration] Will assume transactional DDL.
2023-12-24 14:36:45 INFO  [alembic.runtime.migration] Running upgrade  -> 9a4735888d4b, Initial migration
2023-12-24 14:36:49 /usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py:775: RuntimeWarning: coroutine 'AsyncConnection.close' was never awaited
2023-12-24 14:36:49   conn.close()

alembic/env.py

from logging.config import fileConfig
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
from alembic import context
from database.database_config import Base, db_url
from services.utils import logger
import traceback

config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata

if db_url:
    config.set_main_option("sqlalchemy.url", db_url)

def do_run_migrations(connection):
    try:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()
    except Exception as e:
        logger.error(traceback.format_exc())
        raise

async def run_async_migrations():
    connectable = create_async_engine(db_url, poolclass=NullPool)

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

def run_migrations_online():
    asyncio.run(run_async_migrations())

run_migrations_online()

Code below is used to set up the async engine

database_config.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

from services.utils import logger
import traceback
from config.env_var import *

DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')

Base = declarative_base()


db_url = f'postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}'

try:
    engine = create_async_engine(db_url, echo=True)
except Exception as e:
    logger.info(f"Error creating database engine: {e}")
    logger.info(traceback.format_exc())
    raise

AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db():
    db = AsyncSessionLocal()
    try:
        yield db
    except Exception as e:
        logger.info(f"Failed with db_url: {db_url}")
        logger.info(f"Database session error: {e}")
        logger.info(traceback.format_exc())
        raise
    finally:
        await db.close()

Then finally my init_db.py, which I run once, in a local dev mode:

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

from database.models import *
from database.enums import *
from database.database_config import Base, engine, db_url

async def create_tables():
    # Use the async engine from your database configuration
    async_engine = create_async_engine(db_url)

    # Asynchronous table creation
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

if __name__ == "__main__":
    asyncio.run(create_tables())

How do I properly create async migrations?


Solution

  • There were several misunderstandings compounded in this problem.

    Firstly, init_db.py is redundant, because Alembic takes care of the initial migration. So removed that from my flow.

    Then, I imported all of my SQL Alchemy models and enum types in my database_config.py, reran the script and migration was successful.