I'm trying to set up an asynchronous state machine using transitions that can be controlled via MQTT using aiomqtt. I managed to get a minimum working example running that works if there are no repetitive actions:
Script for the state machine:
import asyncio
import aiomqtt
from transitions.extensions import AsyncMachine
import sys
import os
import logging
logging.basicConfig(level=logging.DEBUG)
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
set_event_loop_policy(WindowsSelectorEventLoopPolicy())
class MQTTStateMachine:
states = ['init','A','B',{'name': 'stopped', 'final':True}]
def __init__(self,client):
self.client = client
self.machine = AsyncMachine(model=self, states=MQTTStateMachine.states, initial='init')
self.machine.add_transition(trigger='init', source='init', dest='A')
self.machine.add_transition(trigger='stop', source=['A','B'], dest='stopped')
async def update_state(self):
await self.client.publish("MQTTstatemachine/machine/state", str(self.state))
async def receiveMQTT(self):
await self.client.subscribe("MQTTstatemachine/controller/transition")
async for message in self.client.messages:
if message.topic.matches("MQTTstatemachine/controller/transition"):
await self.trigger(message.payload.decode())
async def on_enter_A(self):
await self.update_state()
print("I'm now in state A.")
async def on_enter_B(self):
await self.update_state()
print("I'm now in state B.")
async def on_enter_stopped(self):
await self.update_state()
print("I'm now in state stopped.")
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
MQTTmachine = MQTTStateMachine(client)
await MQTTmachine.init()
await asyncio.create_task(MQTTmachine.receiveMQTT())
if __name__ == "__main__":
asyncio.run(main())
Controller script:
import asyncio
import aiomqtt
import sys
import os
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
set_event_loop_policy(WindowsSelectorEventLoopPolicy())
async def publishTransitions(client):
await asyncio.sleep(5)
await client.publish("MQTTstatemachine/controller/transition","to_B")
print("Transition: to_B")
await asyncio.sleep(5)
await client.publish("MQTTstatemachine/controller/transition","to_A")
print("Transition: to_A")
await asyncio.sleep(5)
await client.publish("MQTTstatemachine/controller/transition","to_B")
print("Transition: to_B")
await asyncio.sleep(5)
await client.publish("MQTTstatemachine/controller/transition","stop")
print("Transition: stop")
await asyncio.sleep(5)
async def receiveStates(client):
await client.subscribe("MQTTstatemachine/machine/state")
async for message in client.messages:
if message.topic.matches("MQTTstatemachine/machine/state"):
print(f"Statemachine now in state {message.payload.decode()}")
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
tasks = [publishTransitions(client),receiveStates(client)]
pending = [asyncio.create_task(t) for t in tasks]
done, pending = await asyncio.wait(pending,return_when=asyncio.FIRST_COMPLETED)
pendingTask = pending.pop()
pendingTask.cancel()
try:
await pendingTask
except asyncio.CancelledError:
print(f"Finsihed.")
if __name__ == "__main__":
asyncio.run(main())
I tried doing some repetitive action by replacing on_enter_B:
async def on_enter_B(self):
while self.is_B():
await self.update_state()
print("I'm now in state B.")
await asyncio.sleep(1)
but then it gets stuck in state B and doesn't respond to state changes via MQTT anymore.
I tried implementing the repetitive task with a reflexive transition but that doesn't work either:
async def on_enter_B(self):
await self.update_state()
print("I'm now in state B.")
await asyncio.sleep(1)
await self.to_B()
As far as I can tell the problem is here:
async def receiveMQTT(self):
await self.client.subscribe("MQTTstatemachine/controller/transition")
async for message in self.client.messages:
if message.topic.matches("MQTTstatemachine/controller/transition"):
await self.trigger(message.payload.decode()) # [1]
self.trigger
in [1]
will not return when any callback blocks and thus the for loop never reaches the next element and receiveMQTT
blocks itself.
Solution: Don't await trigger
but make it a task [2]. Keep track of running tasks [3] to prevent tasks being stopped by the garbage collector (see the python documentation) for details. When a new trigger arrives AsyncMachine
should cancel the running trigger. This will call the done callback which you can use to remove references from your task list [4]
. In my example self.task
is a set (see [5]).
def __init__(self, client):
self.client = client
self.tasks = set() # [5]
# ...
async def receiveMQTT(self):
await self.client.subscribe("MQTTstatemachine/controller/transition")
async for message in self.client.messages:
if message.topic.matches("MQTTstatemachine/controller/transition"):
task = asyncio.create_task(self.trigger(message.payload.decode())) # [2]
self.tasks.add(task) # [3]
task.add_done_callback(self.tasks.discard) # [4]
This, however, may delay state transitions which means that when your mqtt client returns the actual transition might not be done yet. So right after client.publish("...", "to_B")
the state could still be A
.
There might be a way to tell aiomqtt
to not wait for receiveMQTT
to return instead.