pytransitions

pytransitions, timeout, in-memory, persisting to database


Question regarding the pyTransitions package, which I am currently using in one of my projects.

I have tested in the very beginning when evaluating different packages, among others also the timeout functionality which l knew I would need somewhere down the road.

I then little by little chose to persist all my finite state machines (in fact just an id and state) to disk using sqlalchemy, and only reload them when required to trigger transitions - which works pretty nicely.

Unfortunately, now comes the need for timeout handling again, and even before trying to integrate this into my code, I am pretty sure that this cannot work for me. I guess I am stating the obvious: in order for timeouts to be handled properly on a (potentially larger) set of fsms, they have to be in memory as living objects and not just loaded from database?

Is this something that you have already met as a use case? Is there some way to also access this timeout counter in order to persist and reload it with appropriate fixing at any time, in order to allow the timeout mechanism to fall back on its feet, even if the object hadn't been alive in RAM during the actual timeout?

If there is no easy built-in alternative, I guess I will create a pool of resident objects in RAM, persist them periodically and reload them if ever my app went down? (my specific scenario uses sqlalchemy, but I guess the same could also apply to pickle)

cheers and thanx in advance for any idea or suggestion Joel


Solution

  • There is no built-in functionality to resume timeouts so far. However, there are means to convert machines from and to (dictionary) configurations. The extension is called MarkupMachine and is mentioned in the FAQ notebook.

    What we need is a Timeout state class that stores information about when it should be triggered and can be resumed based on this information. We also need MarkupMachine to handle these custom state information. MarkupMachine._convert_models converts models and their current states to dictionaries and MarkupMachine._add_markup_model will get the dictionary to instantiate models again. Thus, we need to extend both methods.

    I will cut some corners to keep the code short and focus on the concept. No postulate is mandatory though. I will assume that a) you can deal with configurations in the sense that you can adjust them to be stored and retrieved from your database. Furthermore, I will assume b) that your machine is your stateful model, c) you use the default model_attribute 'state', d) you do not use a nested/hierarchical machine and e) that you do not pass vital custom information when you trigger an event. And at last, that f) you don't mind that the resumed state is entered and potential on_enter_<callbacks> will be triggered and g) you do not need an accuracy in the dimension of (fractions of) milliseconds. This sounds like a lot. But again, nothing of this is a deal breaker but would just require more complex case handling.

    from transitions.extensions.markup import MarkupMachine
    from transitions.extensions.states import Timeout
    from transitions.core import EventData, Event
    import time
    from datetime import datetime
    
    
    class MarkupTimeout(Timeout):
    
        def __init__(self, *args, **kwargs):
            # Timeout expects a number but MarkupMachine passes values to states as strings right now
            kwargs['timeout'] = int(kwargs.get('timeout', 0))
            super(MarkupTimeout, self).__init__(*args, **kwargs)
            # we store trigger times in a dictionary with the model id as keys
            self.timeout_at = {}
            self.timeout = int(self.timeout)
    
        def resume(self, timeout_at, event_data):
            # since we want to give our MarkupMachine some time to instantiate we postulate that
            # the earliest possible trigger time is in a second.
            trigger_time = time.time() + 1
            timeout_at = trigger_time if timeout_at < trigger_time else timeout_at
            # we store the default timeout time ...
            tmp = self.timeout
            # ... and temporary override it with the delta of the intended trigger time and the current time
            self.timeout = timeout_at - time.time()
            # ... enter the state and trigger the creation of the timer
            self.enter(event_data)
            # restore the timeout for any future enter event
            self.timeout = tmp
    
        def enter(self, event_data):
            # a timeout will only be initiated if the timeout value is greater than 0
            if self.timeout > 0:
                # calculate the time when the timeout will trigger (approximately) ...
                timeout_time = time.time() + self.timeout
                # and store it in the previously created dictionary
                self.timeout_at[id(event_data.model)] = timeout_time
                print(f"I should timeout at: {datetime.utcfromtimestamp(timeout_time)}")
            super(MarkupTimeout, self).enter(event_data)
    
        def exit(self, event_data):
            super(MarkupTimeout, self).exit(event_data)
            # remove the timeout time when the state is exited
            self.timeout_at[id(event_data.model)] = None
    
    
    class DBMachine(MarkupMachine):
    
        # DBMachine will use this class when states are created
        state_cls = MarkupTimeout
    
        # we customize our model definition and add 'timeout_at' to it
        # usually MarkupMachine would iterate over all models but since we assume the model is just
        # the machine itself, we can skip that part
        def _convert_models(self):
            state = self.get_state(self.state)
            timeout_at = state.timeout_at.get(id(self), None)
            model_def = {'state': state.name,
                         'name': 'DBMachine',
                         'class-name': 'self',
                         'timeout_at': str(timeout_at) if timeout_at is not None else ''}
            return [model_def]
    
        def _add_markup_model(self, markup):
            initial = markup.get('state', None)
            timeout_at = markup.get('timeout_at', '')
            self.add_model(self, initial)
            if timeout_at:
                state = self.get_state(self.state)
                # as mentioned above, every configuration value is a string right now
                ms = float(timeout_at)
                # since we did not store event data, we need to create a temporary event with a minimal EventData object
                # that can be passed to state callbacks
                state.resume(ms, EventData(state=state,
                                           event=Event(name="resume", machine=self),
                                           machine=self,
                                           model=self,
                                           args=[],
                                           kwargs={}))
    
    
    # we pass a timeout only for 'pending'
    states = ['init', dict(name='pending', timeout=5, on_timeout='cancel'), 'done', 'cancelled']
    transitions = [
        dict(trigger='request', source='init', dest='pending'),
        dict(trigger='cancel', source='pending', dest='cancelled'),
        dict(trigger='result', source='pending', dest='done')
    ]
    
    m = DBMachine(states=states, transitions=transitions, initial='init')
    # transition to 'pending' and initiate timer
    m.request()
    assert m.is_pending()
    config = m.markup  # [1]
    # remove old machine
    del m
    # create new machine from configuration
    m2 = DBMachine(markup=config)
    assert m2.is_pending()
    time.sleep(10)
    assert m2.is_cancelled()
    

    The configuration [1] would look like this:

    { 'after_state_change': [],
      'auto_transitions': True,
      'before_state_change': [],
      'finalize_event': [],
      'ignore_invalid_triggers': None,
      'initial': 'init',
      'models': [ { 'class-name': 'self',
                    'name': 'DBMachine',
                    'state': 'pending',
                    'timeout_at': '1617958918.6320097'}],
      'prepare_event': [],
      'queued': False,
      'send_event': False,
      'states': [ {'name': 'init'},
                  {'name': 'pending', 'on_timeout': ['cancel'], 'timeout': '5'},
                  {'name': 'done'},
                  {'name': 'cancelled'}],
      'transitions': [ {'dest': 'pending', 'source': 'init', 'trigger': 'request'},
                       { 'dest': 'cancelled',
                         'source': 'pending',
                         'trigger': 'cancel'},
                       {'dest': 'done', 'source': 'pending', 'trigger': 'result'}]}
    
    

    I assume that this configuration could be reorganized to enable SQL queries to filter for imminent timeouts and instantiate machines if necessary. timeout_at could also be stored as a datetime strings rather than unix timestamps if that makes queries easier. You could also just store the models part and rather than creating a DBMachine from a configuration, create it the 'common' way:

    # reuse the states and transitions and only create the model from configuration
    # 'model=None' prevents the machine from adding itself as a model too early
    m2 = DBMachine(model=None, states=states, transitions=transitions, initial='init')
    m2._add_markup_model(config['models'][0])