pythonsqlalchemyormfastapi

SQLAlchemy Error: InvalidRequestError - Can't operate on closed transaction inside context manager


I'm encountering an issue while working with SQLAlchemy in a FastAPI project. I've set up a route that's supposed to add items to a database using a context manager and nested transactions. If a single item is failed to be added (due to constraints or any reason) it should not be included in the commit. However the remaining items, added both before or after, should be included.

When using nested transactions, I would expect to be able to keep track of my failed and succesful additions. However, I keep running into the following error:

sqlalchemy.exc.InvalidRequestError: Can't operate on a closed transaction inside a context manager.

I've provided the relevant code below:

router = APIRouter()

@lru_cache()
def get_session_maker() -> sessionmaker:
    # create a reusable factory for new AsyncSession instances
    engine = create_engine(SQLALCHEMY_DATABASE_URI, echo=True)
    return sessionmaker(engine)

def get_session() -> Generator[Session, None, None]:
    cached_sessionmaker = get_session_maker()
    with cached_sessionmaker.begin() as session:
        yield session

@router.post("/items")
def add_items(
    session: Session = Depends(get_database.get_session),
) -> Dict[str, Any]:

    request_inputs = [
        RequestInput(name="chair", used_for="sitting"),
        RequestInput(name="table", used_for="dining"),
        RequestInput(name="tv", used_for="watching"),
    ]

    uploaded_items = []
    failed_items = []
    for request_input in request_inputs:
        try:
            with session.begin_nested():
                item= Item(
                    **request_input.dict()
                )
                session.add(item)
                session.refresh(item)

                uploaded_items += 1

        except IntegrityError as e:
            # Handle any integrity constraint violations here
            session.rollback()
            failed_items += 1
        except Exception as e:
            # Handle other exceptions
            session.rollback()
            failed_items += 1

        session.commit()

    return {
        "uploaded": uploaded_items,
        "failed": failed_items,
    }

It is obviously caused by my session to be closed prematurely, however I cannot figure out where I am closing the transaction to early, whilst trying to add all non failed items to my db. Can someone please help me understand why I'm encountering this error and how to fix it?

Thank you in advance for your assistance.

I tried to use session.begin_nested() to keep track of the status of my transaction, however it seems to close somewhere. if not used the begin_nested(), I only commit the items before the failed instance. All items afterwards are excluded.


Solution

  • Summary

    The error is caused by the rollbacks inside the except blocks. They roll back the outer transaction, rendering the outer context manager unusable.

    The inner transaction created by begin_nested() will roll back automatically if an exception occurs, so there is no need to roll back the outer transaction.


    Detail

    What is happening is that the line

    session.refresh(item)
    

    raises an exception

    InvalidRequestError: Instance '<Item at 0x7faa69941070>' is not persistent within this Session

    because only instances that be committed can be refreshed.

    This exception is trapped by second except clause, which calls the outer session's rollback() method. The rollback closes the outer session's transaction, triggering the reported error

    InvalidRequestError: Can't operate on a closed transaction inside a context manager.

    when the session's commit() method is called.

    In fact the explicit commits and rollbacks are unnecessary, because the begin() methods of the inner and outer transactions commit and rollback automatically. The code (leaving out the FastAPI-specific parts) can be reduced to this:

    import sqlalchemy as sa
    from sqlalchemy import orm
    from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
    
    class Base(DeclarativeBase):
        pass
    
    class Item(Base):
        __tablename__ = "items"
    
        id: Mapped[int] = mapped_column(primary_key=True)
        name: Mapped[str] = mapped_column(unique=True)
        used_for: Mapped[str]
    
    engine = sa.create_engine("sqlite://", echo=True)
    Base.metadata.create_all(engine)
    Session = orm.sessionmaker(engine)
    
    with Session.begin() as session:
        request_inputs = [
            dict(name="chair", used_for="sitting"),
            dict(name="table", used_for="dining"),
            dict(name="chair", used_for="sitting"), # Force an IntegrityError
        ]
    
        uploaded_items = failed_items = 0
        for request_input in request_inputs:
            try:
                with session.begin_nested():
                    item = Item(**request_input)
                    session.add(item)
                    uploaded_items += 1
    
            # If an exception is raised inside the begin_nested() method
            # the inner transaction will be rolled back and the exception
            # will be re-raised. Trap it inside the outer session to prevent
            # the outer session from being rolled back.
            except sa.exc.IntegrityError as e:
                # Handle any integrity constraint violations here
                failed_items += 1
            except Exception as e:
                # Handle other exceptions
                failed_items += 1
    
        result = {
            "uploaded": uploaded_items,
            "failed": failed_items,
        }
        print(f"{result = }")