pythoneventspython-multithreadingpython-asyncio

asyncio: Wait for event from other thread


I'm designing an application in Python which should access a machine to perform some (lengthy) tasks. The asyncio module seems to be a good choice for everything that is network-related, but now I need to access the serial port for one specific component. I've implemented kind of an abstraction layer for the actual serial port stuff, but can't figure out how to sensibly integrate this with asyncio.

Following setup: I have a thread running a loop, which regularly talks to the machine and decodes the responses. Using a method enqueue_query(), I can put a query string into a queue, which will then be sent to the machine by the other thread and cause a response. By passing in a threading.Event (or anything with a set() method), the caller can perform a blocking wait for the response. This can then look something like this:

f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
f.wait()
print(ch.get_query_responses())

My goal is now to put those lines into a coroutine and have asyncio handle this waiting, so that the application can do something else in the meantime. How could I do this? It would probably work by wrapping the f.wait() into an Executor, but this seems to be a bit stupid, as this would create a new thread only to wait for another thread to do something.


Solution

  • By passing in a threading.Event (or anything with a set() method), the caller can perform a blocking wait for the response.

    Given the above behavior of your query function, all you need is a thread-safe version of asyncio.Event. It's just 3 lines of code:

    import asyncio
    class Event_ts(asyncio.Event):
        #TODO: clear() method
        def set(self):
            #FIXME: The _loop attribute is not documented as public api!
            self._loop.call_soon_threadsafe(super().set)
    

    A test for functionality:

    def threaded(event):
        import time
        while True:
            event.set()
            time.sleep(1)
    

    async def main():
        import threading
        e = Event_ts()
        threading.Thread(target=threaded, args=(e,)).start()
        while True:
            await e.wait()
            e.clear()
            print('whatever')
    

    asyncio.ensure_future(main())
    asyncio.get_event_loop().run_forever()