pythonpython-asynciogpiogpiozero

asyncio only running first gpiozero coroutine


I'm trying to make gpiozero play nicely with asyncio in my RPi 4 but ran into an issue that I can't solve. Essentially I'm trying to get the turn signal (clock wise, counter clock wise) af a rotary enoder as well es detect if the button of the RE has been pressed. I'm using asyncio to gather coroutines and run them simultaneously (the other corountines so far working as expected). But when I try to run the button detection and rotary detection, only the first one in the list ever gets started.

Here's the excerpt of the code that's not working for me:

import asyncio
from gpiozero import RotaryEncoder, Button

button = Button(7)
encoder = RotaryEncoder(a=10, b=9, wrap=False, max_steps=0)

async def rotary_detection():
  print("rotary_detection started")
  last_rotary_value = 0
  try:
    while True:
      current_rotary_value = encoder.steps
      if last_rotary_value != current_rotary_value:
        rotDiff = last_rotary_value - current_rotary_value
        if rotDiff > 0:
          print("turned CW")
        else:
          print("turned CCW")
        last_rotary_value = current_rotary_value
      asyncio.sleep(0.01)
  except KeyboardInterrupt:
    print("Program terminated")
  except Exception as e:
    print(e)

async def button_detection():
  print("button_detection started")
  try:
    while True:
      if button.is_pressed:
        print("Button pressed")
  except KeyboardInterrupt:
    print("Program terminated")
  except Exception as e:
    print(e)

async def main():
  await asyncio.gather(
    rotary_detection(),
    button_detection()
  )

if __name__ == "__main__":
  asyncio.run(main())

No matter what order in the asyncio.gather() list, only the first one ever gets started.

How do I implement my use case using gpiozero and asyncio in a way that they play nicely with each other?


Solution

  • Neither of your async methods ever calls await, which means they never yield control back to the scheduler so that another async task can run. You have effectively two synchronous (that is, non-async) methods.

    Your rotary_detection method would be correct if you remembered to await on asyncio.sleep:

    async def rotary_detection():
      print("rotary_detection started")
      last_rotary_value = 0
      try:
        while True:
          ...
    
          # Don't forget to await on async tasks
          await asyncio.sleep(0.01)
      except KeyboardInterrupt:
        print("Program terminated")
      except Exception as e:
        print(e)
    

    However, you will need to introduce an await into your button_pressed method. Possibly just adding another call to asyncio.sleep would make things work, but you may want to consider other options:

    1. There are asyncio-supporting libaries for PGIO access, such as asyncpio.
    2. Ditch asyncio for a threaded model, and use the gpiozero when_held and when_pressed methods to implement your logic.