I'm trying to use the hierarchical machine with three nesting levels called main -> nested -> deeper
. I would expect, the state machines to be executed one after the other and then the states are remapped back to the first machine. So I would expect the final state is done
, but it is nested_deeper_working
, so obviously I'm missing something.
The workaround here is to use queued=False
, then it works as expected. But the downside is the call stack is really long and the traceback in case of some error is long as hell.
Sorry for the long example I was not able to make it shorter. In real life I'm using the MainMachine
as overall production control, it launches smaller machines to erase, flash, calibrate or test the device. These are represented by NestedMachine
. Inside these machines are the smallest machines used eg. for hard-reset, one test sequence, or so. This is the DeeperMachine
in this case.
pytransitions 0.8.10
python 3.7.3
GenericMachine
class is just an abstract class. Here I define default states initial
and done
and also basic configuration.
from transitions.extensions import HierarchicalMachine
class GenericMachine(HierarchicalMachine):
def __init__(self, states, transitions, model=None):
generic_states = [
{"name": "initial", "on_enter": self.entry_initial},
{"name": "done", "on_enter": self.entry_done},
]
states += generic_states
super().__init__(
states=states,
transitions=transitions,
model=model,
send_event=True,
queued=True,
)
def entry_initial(self, event_data):
raise NotImplementedError
def entry_done(self, event_data):
raise NotImplementedError
MainMachine
is the highest machine in the hierarchy and it launches the NestedMachine
. It is expected that after all nested machines are done, the done
state is executed.
class MainMachine(GenericMachine):
def __init__(self):
nested = NestedMachine()
remap = {"done": "done"}
states = [
{"name": "nested", "children": nested, "remap": remap},
]
transitions = [
["go", "initial", "nested"],
]
super().__init__(states, transitions, model=self)
def entry_done(self, event_data):
print("job finished")
NestedMachine
acts as the second level of nesting. It launches the DeeperMachine
and remaps the done
state.
class NestedMachine(GenericMachine):
def __init__(self):
deeper = DeeperMachine()
remap = {"done": "done"}
states = [
{"name": "deeper", "children": deeper, "remap": remap},
]
transitions = [
["go", "initial", "deeper"],
]
super().__init__(states, transitions)
def entry_initial(self, event_data):
event_data.model.go()
The third level of nesting is implemented by DeeperMachine
. After the work is done it triggers the go
event to transit to the done
state and jumps back through the NestedMachine
to MainMachine
class DeeperMachine(GenericMachine):
def __init__(self):
states = [
{"name": "working", "on_enter": self.entry_working},
]
transitions = [
["go", "initial", "working"],
["go", "working", "done"],
]
super().__init__(states, transitions, model=self)
def entry_initial(self, event_data):
event_data.model.go()
def entry_working(self, event_data):
event_data.model.go()
Test instantiates MainMachine
and triggers the first event. It is expected the nested machines will be called and after the job is done, it will be remapped through the done
states, back to the MainMachine
.
import logging as log
def main():
log.basicConfig(level=log.DEBUG)
log.getLogger("transitions").setLevel(log.INFO)
machine = MainMachine()
machine.go()
assert machine.state == "done"
if __name__ == "__main__":
main()
Confirmed as a bug
https://github.com/pytransitions/transitions/issues/554
Resolved in dev-0.9, example and production code works well for me.