What Happens
I trigger /session_a
and /session_b
almost simultaneously (e.g. with Postman
or curl
).
/session_b
usually succeeds.
/session_a
fails on db.commit()
with
sqlite3.OperationalError: database is locked.
I found that the issue is caused by session A reaching commit earlier than session B, but session B is already holding the write lock — which means A will inevitably fail with a database is locked error.
Reproducible Code
import asyncio
import sqlite3
import threading
import time
import uuid
from loguru import logger
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
DATABASE_URL = "sqlite:///./test_locked.db"
engine = create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine)
Base = declarative_base()
app = FastAPI()
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/session_a")
async def session_a(db: Session = Depends(get_db)):
# 会话A启动事务
logger.info("A start")
uuid_str = str(uuid.uuid4())
item = Item(name=f"session_a{uuid_str}")
db.add(item)
await asyncio.sleep(0.5) # 模拟长事务,占用锁
logger.info(f"A commit {uuid_str}")
db.commit()
return {"status": "A committed"}
@app.post("/session_b")
async def session_b(db: Session = Depends(get_db)):
logger.info("B start")
# 会话B尽快获取锁
await asyncio.sleep(0.1)
uuid_str = str(uuid.uuid4())
item = Item(name=f"session_b{uuid_str}")
db.add(item)
db.flush()
logger.info(f"B flush {uuid_str}")
await asyncio.sleep(1)
db.commit()
logger.info(f"B commit {uuid_str}")
return {"status": "B committed"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Request code
import requests
import threading
import time
def test_session_a():
res = requests.post("http://127.0.0.1:8000/session_a", timeout=10)
print(f"session_a: {res.status_code}")
def test_session_b():
res = requests.post("http://127.0.0.1:8000/session_b", timeout=10)
print(f"session_b: {res.status_code}")
th1 = threading.Thread(target=test_session_a)
th2 = threading.Thread(target=test_session_b)
th1.start()
time.sleep(0.1)
th2.start()
th1.join()
th2.join()
I think it's illogical that db.commit()
is synchronously blocking. Using StaticPool
seems to solve the issue, but it's unsafe. At the same time, I'm afraid to set autoflush=False
in the project, as it might cause some endpoints to crash. What should I do?
If you convert all operations to perform asynchronously (using aiosqlite
) , the issue will be mitigated. When a route in FastAPI is listed as asynchronous (async def
), all operations will occur within the main event loop. In contrast, when they are synchronous (def
), they are delegated to a threadpool to be executed asynchronously. For more details on this front, there's a good section in the FastAPI documentation.
In the original logic, the database operations (db.commit
, db.flush
) were all synchronous, meaning the event loop is blocked until they are completed. When converted to asynchronous, they can hand control back to the event loop. While that doesn't fully solve the inherent single writer problem of Sqlite, it does address these other pain points that were contributing to the behavior, and gets the application operational. Here's a fully operational update of the FastAPI server logic:
import asyncio
import uuid
import logging
from fastapi import FastAPI, Depends
from sqlalchemy import Column, Integer, String, text
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
logger = logging.getLogger("uvicorn.error")
DATABASE_URL = "sqlite+aiosqlite:///./test_locked.db"
engine = create_async_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = async_sessionmaker(autocommit=False, autoflush=True, bind=engine)
Base = declarative_base()
app = FastAPI()
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
async def init_models():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async def get_db() -> AsyncSession:
async with SessionLocal() as session:
yield session
@app.post("/session_a")
async def session_a(db: AsyncSession = Depends(get_db)):
# 会话A启动事务
logger.info("A start")
uuid_str = str(uuid.uuid4())
item = Item(name=f"session_a{uuid_str}")
db.add(item)
await asyncio.sleep(0.5) # 模拟长事务,占用锁
logger.info(f"A commit {uuid_str}")
await db.commit()
return {"status": "A committed"}
@app.post("/session_b")
async def session_b(db: AsyncSession = Depends(get_db)):
logger.info("B start")
# 会话B尽快获取锁
await asyncio.sleep(0.1)
uuid_str = str(uuid.uuid4())
item = Item(name=f"session_b{uuid_str}")
db.add(item)
await db.flush()
logger.info(f"B flush {uuid_str}")
await asyncio.sleep(1)
await db.commit()
logger.info(f"B commit {uuid_str}")
return {"status": "B committed"}
if __name__ == "__main__":
import uvicorn
asyncio.run(init_models())
uvicorn.run(app, host="0.0.0.0", port=8000)
The output when running locally is, as expected:
session_b: 200, response: {"status":"B committed"}
session_a: 200, {"status":"A committed"}
To the point made in the above response, you can also update the journal mode to write-ahead-log (WAL) as follows, which is probably better suited for this case (faster, more concurrency, etc. per the docs):
from sqlalchemy import text
async def set_journal_mode():
async with engine.connect() as conn:
result = await conn.execute(text("PRAGMA journal_mode=WAL;"))
print("Journal mode:", result.scalar())async def set_journal_mode():
# then run asyncio.run(set_journal_mode()) in the __main__