I'm trying to make gpiozero
play nicely with asyncio
in my RPi 4 but ran into an issue that I can't solve. Essentially I'm trying to get the turn signal (clock wise, counter clock wise) af a rotary enoder as well es detect if the button of the RE has been pressed.
I'm using asyncio to gather coroutines and run them simultaneously (the other corountines so far working as expected). But when I try to run the button detection and rotary detection, only the first one in the list ever gets started.
Here's the excerpt of the code that's not working for me:
import asyncio
from gpiozero import RotaryEncoder, Button
button = Button(7)
encoder = RotaryEncoder(a=10, b=9, wrap=False, max_steps=0)
async def rotary_detection():
print("rotary_detection started")
last_rotary_value = 0
try:
while True:
current_rotary_value = encoder.steps
if last_rotary_value != current_rotary_value:
rotDiff = last_rotary_value - current_rotary_value
if rotDiff > 0:
print("turned CW")
else:
print("turned CCW")
last_rotary_value = current_rotary_value
asyncio.sleep(0.01)
except KeyboardInterrupt:
print("Program terminated")
except Exception as e:
print(e)
async def button_detection():
print("button_detection started")
try:
while True:
if button.is_pressed:
print("Button pressed")
except KeyboardInterrupt:
print("Program terminated")
except Exception as e:
print(e)
async def main():
await asyncio.gather(
rotary_detection(),
button_detection()
)
if __name__ == "__main__":
asyncio.run(main())
No matter what order in the asyncio.gather()
list, only the first one ever gets started.
How do I implement my use case using gpiozero and asyncio in a way that they play nicely with each other?
Neither of your async methods ever calls await
, which means they never yield control back to the scheduler so that another async task can run. You have effectively two synchronous (that is, non-async) methods.
Your rotary_detection
method would be correct if you remembered to await
on asyncio.sleep
:
async def rotary_detection():
print("rotary_detection started")
last_rotary_value = 0
try:
while True:
...
# Don't forget to await on async tasks
await asyncio.sleep(0.01)
except KeyboardInterrupt:
print("Program terminated")
except Exception as e:
print(e)
However, you will need to introduce an await
into your button_pressed
method. Possibly just adding another call to asyncio.sleep
would make things work, but you may want to consider other options:
gpiozero
when_held
and when_pressed
methods to implement your logic.