pythonsqlalchemywonderware

SQLAlchemy session management in long-running process


Scenario:

The Python functions are part of a system to track Work-In-Progress on the factory floor. The purpose of the system is to track the produced widgets along the process, ensure that the widgets go through the process in the correct order, and check that certain conditions are met along the process. The widget production history and widget state is stored in a relational database, this is where SQLAlchemy plays its part.

For example, when a widget passes a scanner, the automation software triggers the following script (written in the application server's custom scripting language):

' wiget_id and scanner_id provided by automation object
' ExecFunction() takes care of calling a CPython function
retval = ExecFunction("WidgetScanned", widget_id, scanner_id);
' if the python function raises an Exception, ErrorOccured will be true
' in this case, any errors should cause the production line to stop.
if (retval.ErrorOccured) then
    ProductionLine.Running = False;
    InformationBoard.DisplayText = "ERROR: " + retval.Exception.Message;
    InformationBoard.SoundAlarm = True
end if;

The script calls the WidgetScanned python function:

# pywip/functions.py
from pywip.database import session
from pywip.model import Widget, WidgetHistoryItem
from pywip import validation, StatusMessage
from datetime import datetime

def WidgetScanned(widget_id, scanner_id):
    widget = session.query(Widget).get(widget_id)
    validation.validate_widget_passed_scanner(widget, scanner) # raises exception on error

    widget.history.append(WidgetHistoryItem(timestamp=datetime.now(), action=u"SCANNED", scanner_id=scanner_id))
    widget.last_scanner = scanner_id
    widget.last_update = datetime.now()

    return StatusMessage("OK")

# ... there are a dozen similar functions

My question is: How do I best manage SQLAlchemy sessions in this scenario? The application server is a long-running process, typically running months between restarts. The application server is single-threaded.

Currently, I do it the following way:

I apply a decorator to the functions I make avaliable to the application server:

# pywip/iasfunctions.py
from pywip import functions

def ias_session_handling(func):
    def _ias_session_handling(*args, **kwargs):
        try:
            retval = func(*args, **kwargs)
            session.commit()
            return retval
        except:
            session.rollback()
            raise
    return _ias_session_handling

# ... actually I populate this module with decorated versions of all the functions in pywip.functions dynamically
WidgetScanned = ias_session_handling(functions.WidgetScanned)

Question: Is the decorator above suitable for handling sessions in a long-running process? Should I call session.remove()?

The SQLAlchemy session object is a scoped session:

# pywip/database.py
from sqlalchemy.orm import scoped_session, sessionmaker

session = scoped_session(sessionmaker())

I want to keep the session management out of the basic functions. For two reasons:

  1. There is another family of functions, sequence functions. The sequence functions call several of the basic functions. One sequence function should equal one database transaction.
  2. I need to be able to use the library from other environments. a) From a TurboGears web application. In that case, session management is done by TurboGears. b) From an IPython shell. In that case, commit/rollback will be explicit.

(I am truly sorry for the long question. But I felt I needed to explain the scenario. Perhaps not necessary?)


Solution

  • The described decorator is suitable for long running applications, but you can run into trouble if you accidentally share objects between requests. To make the errors appear earlier and not corrupt anything it is better to discard the session with session.remove().

    try:
        try:
            retval = func(*args, **kwargs)
            session.commit()
            return retval
        except:
            session.rollback()
            raise
    finally:
        session.remove()
    

    Or if you can use the with context manager:

    try:
        with session.registry().transaction:
            return func(*args, **kwargs)
    finally:
        session.remove()
    

    By the way, you might want to use .with_lockmode('update') on the query so your validate doesn't run on stale data.