I'm writing a Python program to interact with a device based on a CAN Bus. I'm using the python-can module successfully for this purpose. I'm also using asyncio to react to asynchronous events. I have written a "CanBusManager" class that is used by the "CanBusSequencer" class. The "CanBusManager" class takes care of generating/sending/receiving messages, and the CanBusSequencer drives the sequence of messages to be sent.
At some point in the sequence I want to wait until a specific message is received to "unlock" the remaining messages to be sent in the sequence. Overview in code:
main.py
async def main():
event = asyncio.Event()
sequencer = CanBusSequencer(event)
task = asyncio.create_task(sequencer.doSequence())
await task
asyncio.run(main(), debug=True)
canBusSequencer.py
from canBusManager import CanBusManager
class CanBusSequencer:
def __init__(self, event)
self.event = event
self.canManager = CanBusManager(event)
async def doSequence(self):
for index, row in self.df_sequence.iterrows():
if:...
self.canManager.sendMsg(...)
else:
self.canManager.sendMsg(...)
await self.event.wait()
self.event.clear()
canBusManager.py
import can
class CanBusManager():
def __init__(self, event):
self.event = event
self.startListening()
**EDIT**
def startListening(self):
self.msgNotifier = can.Notifier(self.canBus, self.receivedMsgCallback)
**EDIT**
def receivedMsgCallback(self, msg):
if(msg == ...):
self.event.set()
For now my program stays by the await self.event.wait(), even though the relevant message is received and the self.event.set() is executed. Running the program with debug = True reveals an
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
that I don't really get. It has to do with the asyncio event loop, somehow not properly defined/managed. I'm coming from the C++ world and I'm currently writing my first large program with Python. Any guidance would be really appreciated:)
Your question doesn't explain how you arrange for receivedMsgCallback
to be invoked.
If it is invoked by a classic "async" API which uses threads behind the scenes, then it will be invoked from outside the thread that runs the event loop. According to the documentation, asyncio primitives are not thread-safe, so invoking event.set()
from another thread doesn't properly synchronize with the running event loop, which is why your program doesn't wake up when it should.
If you want to do anything asyncio-related, such as invoke Event.set
, from outside the event loop thread, you need to use call_soon_threadsafe
or equivalent. For example:
def receivedMsgCallback(self, msg):
if msg == ...:
self.loop.call_soon_threadsafe(self.event.set)
The event loop object should be made available to the CanBusManager
object, perhaps by passing it to its constructor and assigning it to self.loop
.
On a side note, if you are creating a task only to await it immediately, you don't need a task in the first place. In other words, you can replace task = asyncio.create_task(sequencer.doSequence()); await task
with the simpler await sequencer.doSequence()
.