Problem Statement
I am converting my SQLAlchemy
PostgreSQL
drivers to async, and I need to perform Alembic
migrations using an async engine. It's failing.
What I have do so far
Error
2023-12-24 14:36:22 Starting entrypoint.sh
2023-12-24 14:36:29 Database is up and running
2023-12-24 14:36:29 Generating migrations
2023-12-24 14:36:38 Generating /app/alembic/versions/9a4735888d4b_initial_migration.py ... done
2023-12-24 14:36:41 Running migrations
2023-12-24 14:36:45 Migration completed successfully.
2023-12-24 14:36:45 Seeding with test user
2023-12-24 14:36:49 An error occurred while seeding the expressions: AsyncConnection context has not been started and object has not been awaited.
2023-12-24 14:36:50 Inside start_server function
2023-12-24 14:36:50 Starting ngrok
Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml
2023-12-24 14:36:22 wait-for-it.sh: waiting 60 seconds for db:5432
2023-12-24 14:36:29 wait-for-it.sh: db:5432 is available after 7 seconds
2023-12-24 14:36:38 INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
2023-12-24 14:36:38 INFO [alembic.runtime.migration] Will assume transactional DDL.
2023-12-24 14:36:45 INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
2023-12-24 14:36:45 INFO [alembic.runtime.migration] Will assume transactional DDL.
2023-12-24 14:36:45 INFO [alembic.runtime.migration] Running upgrade -> 9a4735888d4b, Initial migration
2023-12-24 14:36:49 /usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py:775: RuntimeWarning: coroutine 'AsyncConnection.close' was never awaited
2023-12-24 14:36:49 conn.close()
alembic/env.py
from logging.config import fileConfig
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
from alembic import context
from database.database_config import Base, db_url
from services.utils import logger
import traceback
config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata
if db_url:
config.set_main_option("sqlalchemy.url", db_url)
def do_run_migrations(connection):
try:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
except Exception as e:
logger.error(traceback.format_exc())
raise
async def run_async_migrations():
connectable = create_async_engine(db_url, poolclass=NullPool)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online():
asyncio.run(run_async_migrations())
run_migrations_online()
Code below is used to set up the async engine
database_config.py
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from services.utils import logger
import traceback
from config.env_var import *
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
Base = declarative_base()
db_url = f'postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}'
try:
engine = create_async_engine(db_url, echo=True)
except Exception as e:
logger.info(f"Error creating database engine: {e}")
logger.info(traceback.format_exc())
raise
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
db = AsyncSessionLocal()
try:
yield db
except Exception as e:
logger.info(f"Failed with db_url: {db_url}")
logger.info(f"Database session error: {e}")
logger.info(traceback.format_exc())
raise
finally:
await db.close()
Then finally my init_db.py, which I run once, in a local dev mode:
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from database.models import *
from database.enums import *
from database.database_config import Base, engine, db_url
async def create_tables():
# Use the async engine from your database configuration
async_engine = create_async_engine(db_url)
# Asynchronous table creation
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
if __name__ == "__main__":
asyncio.run(create_tables())
How do I properly create async migrations?
There were several misunderstandings compounded in this problem.
Firstly, init_db.py
is redundant, because Alembic takes care of the initial migration. So removed that from my flow.
Then, I imported all of my SQL Alchemy models and enum types in my database_config.py, reran the script and migration was successful.