pythonsqlitesqlalchemyconcurrencyfastapi

How to make concurrent writes in SQLite with FastAPI + SQLAlchemy without "database is locked" error?


What Happens I trigger /session_a and /session_b almost simultaneously (e.g. with Postman or curl).

/session_b usually succeeds.

/session_a fails on db.commit() with

sqlite3.OperationalError: database is locked.

I found that the issue is caused by session A reaching commit earlier than session B, but session B is already holding the write lock — which means A will inevitably fail with a database is locked error.

Reproducible Code

import asyncio
import sqlite3
import threading
import time
import uuid
from loguru import logger
 
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
 
DATABASE_URL = "sqlite:///./test_locked.db"
 
engine = create_engine(
    DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine)
Base = declarative_base()
 
app = FastAPI()
 
class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
 
Base.metadata.create_all(bind=engine)
 
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
 
@app.post("/session_a")
async def session_a(db: Session = Depends(get_db)):
    # 会话A启动事务
    logger.info("A start")
    uuid_str = str(uuid.uuid4())
    item = Item(name=f"session_a{uuid_str}")
    db.add(item)
    await asyncio.sleep(0.5)  # 模拟长事务,占用锁
    logger.info(f"A commit {uuid_str}")
    db.commit()
    return {"status": "A committed"}
 
@app.post("/session_b")
async def session_b(db: Session = Depends(get_db)):
    logger.info("B start")
    # 会话B尽快获取锁
    await asyncio.sleep(0.1)
    uuid_str = str(uuid.uuid4())
    item = Item(name=f"session_b{uuid_str}")
    db.add(item)
    db.flush()
    logger.info(f"B flush {uuid_str}")
    await asyncio.sleep(1)
    db.commit()
    logger.info(f"B commit {uuid_str}")
    return {"status": "B committed"}
 
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Request code

import requests
import threading
import time

def test_session_a():
    res = requests.post("http://127.0.0.1:8000/session_a", timeout=10)
    print(f"session_a: {res.status_code}")

def test_session_b():
    res = requests.post("http://127.0.0.1:8000/session_b", timeout=10)
    print(f"session_b: {res.status_code}")


th1 = threading.Thread(target=test_session_a)
th2 = threading.Thread(target=test_session_b)
 
th1.start()
time.sleep(0.1)
th2.start()
 
th1.join()
th2.join()

I think it's illogical that db.commit() is synchronously blocking. Using StaticPool seems to solve the issue, but it's unsafe. At the same time, I'm afraid to set autoflush=False in the project, as it might cause some endpoints to crash. What should I do?


Solution

  • If you convert all operations to perform asynchronously (using aiosqlite) , the issue will be mitigated. When a route in FastAPI is listed as asynchronous (async def), all operations will occur within the main event loop. In contrast, when they are synchronous (def), they are delegated to a threadpool to be executed asynchronously. For more details on this front, there's a good section in the FastAPI documentation.

    In the original logic, the database operations (db.commit, db.flush) were all synchronous, meaning the event loop is blocked until they are completed. When converted to asynchronous, they can hand control back to the event loop. While that doesn't fully solve the inherent single writer problem of Sqlite, it does address these other pain points that were contributing to the behavior, and gets the application operational. Here's a fully operational update of the FastAPI server logic:

    import asyncio
    import uuid
    import logging
    
    from fastapi import FastAPI, Depends
    from sqlalchemy import Column, Integer, String, text
    from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
    from sqlalchemy.ext.declarative import declarative_base
    
    logger = logging.getLogger("uvicorn.error")
    DATABASE_URL = "sqlite+aiosqlite:///./test_locked.db"
    
    engine = create_async_engine(
        DATABASE_URL, connect_args={"check_same_thread": False}
    )
    
    SessionLocal = async_sessionmaker(autocommit=False, autoflush=True, bind=engine)
    Base = declarative_base()
    
    app = FastAPI()
    
    
    class Item(Base):
        __tablename__ = "items"
        id = Column(Integer, primary_key=True, index=True)
        name = Column(String, index=True)
    async def init_models():
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.drop_all)
            await conn.run_sync(Base.metadata.create_all)
    
    async def get_db() -> AsyncSession:
        async with SessionLocal() as session:
            yield session
    
    
    @app.post("/session_a")
    async def session_a(db: AsyncSession = Depends(get_db)):
        # 会话A启动事务
        logger.info("A start")
        uuid_str = str(uuid.uuid4())
        item = Item(name=f"session_a{uuid_str}")
        db.add(item)
        await asyncio.sleep(0.5)  # 模拟长事务,占用锁
        logger.info(f"A commit {uuid_str}")
        await db.commit()
        return {"status": "A committed"}
    
    
    @app.post("/session_b")
    async def session_b(db: AsyncSession = Depends(get_db)):
        logger.info("B start")
        # 会话B尽快获取锁
        await asyncio.sleep(0.1)
        uuid_str = str(uuid.uuid4())
        item = Item(name=f"session_b{uuid_str}")
        db.add(item)
        await db.flush()
        logger.info(f"B flush {uuid_str}")
        await asyncio.sleep(1)
        await db.commit()
        logger.info(f"B commit {uuid_str}")
        return {"status": "B committed"}
    
    
    if __name__ == "__main__":
        import uvicorn
        asyncio.run(init_models())
    
        uvicorn.run(app, host="0.0.0.0", port=8000)
    

    The output when running locally is, as expected:

    session_b: 200, response: {"status":"B committed"}

    session_a: 200, {"status":"A committed"}

    To the point made in the above response, you can also update the journal mode to write-ahead-log (WAL) as follows, which is probably better suited for this case (faster, more concurrency, etc. per the docs):

    from sqlalchemy import text
    async def set_journal_mode():
        async with engine.connect() as conn:
            result = await conn.execute(text("PRAGMA journal_mode=WAL;"))
            print("Journal mode:", result.scalar())async def set_journal_mode():
    
    # then run asyncio.run(set_journal_mode()) in the __main__